[java] java.util.concurrent.* 링크

ITWeb/개발일반 2013. 4. 9. 10:28

[원본글] http://blog.naver.com/windziel?Redirect=Log&logNo=60058329824


java.util.concurrent 패키지는 자바 1.5에 추가된 패키지로서 concurrent 프로그래밍에 유용한 클래스들을 모아둔 것이다.

그중 유용한 것들을 몇가지 정리

참고 - 자바 언어로 배우는 디자인 패턴 입문 멀티 쓰레드 편 - 유키 히로시

 

java.util.concurrent.Semaphore  클래스

사용할 수 있는 리소스의 수가 최대 N개인데 N개보다 많은 수의 쓰레드가 그 리소스를 필요로 하는 경우 Semaphore를 이용하여 리소스 사용을 제어한다.

 

생서자에서 리소스의 수(permits)를 지정

acquire() - 리소스를 확보. 리소스에 빈자리가 생겼을 경우 바로 쓰레드가 acquire 메소드로부터 곧바로 돌아오게 되고 세마포어 내부에서는

                      리소스의 수가 하나 감소. 리소스에 빈자리가 없을 경우 쓰레드는 acquire 메소드 내부에서 블록.
release() - 리소스를 해제. 세마포어 내부에서는 리소스가 하나 증가. acquire 메소드 안에서 대기중인 쓰레드가 있으면

                      그 중 한 개가 깨어나 acquire 메소드로부터 돌아올 수 있다.

 

<예제>

import java.util.Random;

import java.util.concurrent.Semaphore;

/**
 * 수 제한이 있는 리소스
 */
class BoundedResource {
    private final Semaphore semaphore;
    private final int permits;
    private final static Random random = new Random(314159);
    
    public BoundedResource(int permits) {
        this.semaphore = new Semaphore(permits);
        this.permits = permits;
    }
    
    public void use() throws InterruptedException {
        semaphore.acquire();
        
        try {
            doUse();
        } finally {
            semaphore.release();
        }
    }
    
    protected void doUse() throws InterruptedException {
        System.out.println("BEGIN: used = " + (permits - semaphore.availablePermits()));
        Thread.sleep(random.nextInt(500));
        System.out.println("END: used = " + (permits - semaphore.availablePermits()));
    }
}


/**
 * 리소스를 사용하는 쓰레드
 */
class UserThread extends Thread {
    private final static Random random = new Random(26535);
    private final BoundedResource resource;
    
    public void run() {
        try {
            while (true) {
                resource.use();
                Thread.sleep(random.nextInt(3000));
            }
        } catch (InterruptedException e) {
        
        }
    }
}

public class Main {
    public static void main(String[] args) {
        // 3개의 리소스 준비
        BoundedResource resource = new BoundedResource(3);
        
        // 10개의 쓰레드가 사용
        for (int i = 0; i < 10; i++) {
            new UserThread(resource).start();
        }
    }
}

 

java.util.concurrent.locks 패키지의 ReadWriteLock 인터페이스와 ReentrantReadWriteLock 클래스

인스턴스 생성시 락의 취득 순서를 공평(fair)하게 할 것인지를 선택할 수 있다.
공평한 인스턴스를 만든 경우, 대기시간이 긴 쓰레드가 최우선적으로 락을 취득할 수 있도록 조정된다.
재입장 가능하다. 즉, 읽기 역할의 쓰레드가 쓰기 위한 락을 취하거나, 쓰기 역할의 쓰레드가 읽기 위한 락을 취하는 것이 가능하다.
쓰기 위한 락을 읽기 위한 락으로 다운그레이드 할 수 있다. 반대는 안된다.

 

<예제>

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class Data {
    private final char[] buffer;
    private final ReadWriteLock lock = new ReentrantReadWriteLock(ture /* fair */);
    private final Lock readLock = lock.readLock();
    private final Lock writeLock = lock.writeLock();
    
    public Data(int size) {
        this.buffer = new char[size];
        
        for (int i = 0; i < buffer.length; i++) {
            buffer[i] = '*';
        }
    }
    
    public char[] read() throws InterruptedException {
        readLock.lock();
        try {
            return doRead();
        } finally {
            readLock.unlock();
        }
    }
    
    public void write(char c) throws InterruptedException {
        writeLock.lock();
        try {
            return doWrite(c);
        } finally {
            writeLock.unlock();
        }
    }
    
    private char[] doRead() {
        char[] newbuf = new char[buffer.length];
        
        for (int i = 0; i < buffer.length; i++) {
            newbuf[i] = buffer[i];
        }
        slowly();
        return newbuf;
    }
    
    private void doWrite(char c) {
        for (int i = 0; i < buffer.length; i++) {
            buffer[i] = c;
            slowly();
        }
    }
    
    private void slowly() {
        try {
            Thread.sleep(50);
        } catch (InterruptedException e) {
        }
    }
}

import java.util.Random;

public class WriterThread extends Thread {
    private static final Random random = new Random();
    private final Data data;
    private final String filter;
    private int index = 0;
    
    public WriterThread(Data data, String filter) {
        this.data = data;
        this.filter = filter;
    }
    
    public void run() {
        try {
            while (true) {
                char c = nextchar();
                data.write(c);
                Thread.sleep(random.nextInt(3000));
            }
        } catch (InterruptedException e) {
        
        }
    }
    
    private char nextchar() {
        char c = filter.charAt(index);
        index++;
        if (index >= filter.length()) {
            index = 0;
        }
        
        return c;
    }
}

public class ReaderThread extends Thread {
    private final Data data;
    
    public ReaderThread(Data data) {
        this.data = data;
    }
    
    public void run() {
        try {
            while (true) {
                char[] readbuf = data.read();
                System.out.println(Thread.currentThread().getName() + " reads "
                    + String.valueOf(readbuf));
            }
        } catch (InterruptedException e) {
        
        }
    }
}

public class Main {
    public static void main(String[] args) {
        Data data = new Data(10);
        new ReaderThread(data).start();
        new ReaderThread(data).start();
        new ReaderThread(data).start();
        new ReaderThread(data).start();
        new ReaderThread(data).start();
        new WriterThread(data, "ABCDEFGHIJKLMNOPQRSTUVWXYZ").start();
        new WriterThread(data, "abcdefghijklmnopqrstuvwxyz").start();
    }
}

 

 

java.util.concurrent.ThreadFactory 인터페이스

쓰레드 생성을 추상화한 인터페이스

Thread newThread(Runnable r);

ThreadFactory 인터페이스를 구현한 객체를 바꾸는 것 만으로 쓰레드의 생성을 제어
간단한 구현에서는 이것을 사용할 필요없이 Executors.defaultThreadFactory()를 사용하여 기본 ThreadFactory를 구할수 있다.

 

java.util.concurrent.Executor 인터페이스

void execute(Runnable r);

어떤 "처리를 실행"하는 것을 추상화한 인터페이스이며 인수로 부여되는 Runnable 객체는 "실행하는 처리"의 내용을 나타낸다.

 

java.util.concurrent.ExecutorService 인터페이스

반복하여 execute할 수 있는 서비스를 추상화
배후에서 쓰레드가 항상 동작하고 있고 execute 메소드를 호출하면 Runnable 객체를 실행해 주는 것이라고 생각하면 됨.
서비스 종료용으로 shutdown() 메소드 제공

Executor의 서브 인터페이스임

보통 Executors 클래스의 유틸리티 메소드중 ExecutorService 타입을 리턴하는 메소드들을 이용하여 ExecutorService를 얻어 사용

 

java.util.concurrent.ScheduledExecutorService 인터페이스

ExecutorService의 서브 인터페이스이며 처리의 실행을 지연시키는 기능

schedule(Runnable r, long delay, TimeUnit unit);

delay는 지연시간의 수치이며, unit는 delay수치의 단위(NANOSECONDS, MICROSECONDS, MILLISECONDS, SECONDS)를 지정
Executors.newScheduledThreadPool() 메소드에 의해 ScheduledExecutorService 객체를 생성하여 사용.

 

java.util.concurrent.Executors 클래스

위의 인터페이스 등을 생성시 사용하는 유틸리티 클래스

 

java.util.concurrent.ThreadPoolExecutor 클래스

쓰레드 풀에 대해 아래와 같은 설정을 할 수 있다.
 - 쓰레드풀의 크기
 - 쓰레드를 미리 생성할 것인지 필요에 따라 생성할 것인지
 - 쓰레드를 생성할 때의 팩토리(java.util.concurrent.ThreadFactory)
 - 필요 없어진 워커 쓰레드를 종료시키기까지의 시간
 - 실행하는 일을 건넬 때의 큐인 방법
 - 실행을 거부하는 방법
 - 실행의 전처리, 후처리
 
이 클래스는 ExecutorService를 구현한 것으로 보통은 Executors 클래스에 준비되어 있는 메소드를 사용하는 것이 편리.

Executors.newCachedThreadPool(), Executors.newFixedThreadPool() 등

세세한 설정이 필요한 경우 직접 생성

 

java.util.concurrent.Callable 인터페이스

반환값이 있는 처리의 호출을 추상화
Runnable 인터페이스의 run 메소드와 비슷하지만 반환값이 있다.
Callable<String>라고 기술하면 call 메소드의 반환값의 형이 String인 Callable 인터페이스를 나타낸다.


java.util.concurrent.Future 인터페이스

값을 취득하는 get 메소드가 선언
Future<String>라고 기술하면 get 메소드의 반환값이 String인 Future 인터페이스를 나타낸다.


java.util.concurrent.FutureTask 클래스

Future 인터페이스를 구현한 표준적인 클래스
값을 취득하는 get()
실행을 중단하는 cancel()
값을 설정하는 set()
예외를 설정하는 setException()
Runnable 인터페이스를 구현하고 있으므로 run()

생성자의 인수에 Callable 객체를 건내고 FutureTask의 run 메소드를 호출하면 생성자의 인수로 건넨 Callable 객체의 call 메소드가 실행된다. call 메소드를 호출한 쓰레드는 call 메소드의 반환값을 동기적으로 구하고 그 반환값을 FutureTask의 set 메소드롤 설정한다. 만약 call 메소드에서 예외가 발생하면 FutureTask의 setException 메소드를 사용하여 예외를 설정한다. 필요에 따라 get 메소드로 값을 취하러 가면 call 메소드의 반환값을 구할 수 있다.

 

java.util.concurrent.CountDownLatch 클래스

지정한 회수만큼 countdown 메소드가 호출되기를 기다린다.

 

<예제>

import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.CountDownLatch;

public class Main {
    private static final int TASKS = 10;
    
    public static void main(String[] args) {
        System.out.println("BEGIN");
        ExecutorService service = Executors.newFixedThreadPool(5);
        CountDownLatch doneLatch = new CountDownLatch(TASKS);
        
        try {
            for (int i = 0; i < TASKS; i++) {
                service.execute(new MyTask(doneLatch, i));
            }
            System.out.println("AWAIT");
            
            doneLatch.await();
        } catch (InterruptedException e) {
        
        } finally {
            service.shutdown();
            System.out.println("END");
        }
    }
}


import java.util.Random;

import java.util.concurrent.CountDownLatch;

public class MyTask implements Runnable {

    private final CountDownLatch donLatch;
    private final int context;
    private static final Random random = new Random(314159);
    
    public MyTask(CountDownLatch doneLatch, int context) {
        this.doneLatch = doneLatch;
        this.context = context;
    }
    
    public void run() {
        doTask();
        doneLatch.countDown();
    }
    
    protected void doTask() {
        String name = Thread.currentThread().getName();
        System.out.println(name + ":MyTask:BEGIN:context = " + context);
        try {
            Thread.sleep(random.nextInt(3000));
        } catch (InterruptedException e) {
        
        } finally {
            System.out.println(name + ":MyTask:END:context = " + context);
        }
    }
}

 

위의 인터페이스, 클래스들 외에도 멀티 쓰레드 프로그램에서 안전하게 사용할 수 있는 자료구조에 대한 인터페이스 및 이를 구현한 클래스들이 있다.

BlockingQueue, BlockingDeque ConcurrentMap 등의 인터페이스를 구현한 클래스들이 있으므로 필요에 따라 가져다 사용하면 된다.

 

장황하게 많은 인터페이스, 클래스들을 설명했지만 자주 사용할 만한 것은

Executors 유틸리티 클래스를 이용해서 단일 쓰레드의 경우 newSingleThreadExecutor(), 쓰레드풀의 경우 newFixedThreadPool() 또는 newCachedThreadPool() 메소드를 이용하여 ExecutorService의 인스턴스를 획득한 후 Runnable을 구현한 클래스를 execute() 메소드에 넘겨주어 시작하고, shutdown() 또는 shutdownNow() 메소드를 사용하여 중지시키는 정도로 사용하면 될 듯하다.

 

<예제 - from JDK API Document>

class NetworkService implements Runnable {
   private final ServerSocket serverSocket;
   private final ExecutorService pool;

   public NetworkService(int port, int poolSize)
       throws IOException {
     serverSocket = new ServerSocket(port);
     pool = Executors.newFixedThreadPool(poolSize);
   }

   public void run() { // run the service
     try {
       for (;;) {
         pool.execute(new Handler(serverSocket.accept()));
       }
     } catch (IOException ex) {
       pool.shutdown();
     }
   }
 }

 class Handler implements Runnable {
   private final Socket socket;
   Handler(Socket socket) { this.socket = socket; }
   public void run() {
     // read and service request on socket
   }
 }

 void shutdownAndAwaitTermination(ExecutorService pool) {
   pool.shutdown(); // Disable new tasks from being submitted
   try {
     // Wait a while for existing tasks to terminate
     if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
       pool.shutdownNow(); // Cancel currently executing tasks
       // Wait a while for tasks to respond to being cancelled
       if (!pool.awaitTermination(60, TimeUnit.SECONDS))
           System.err.println("Pool did not terminate");
     }
   } catch (InterruptedException ie) {
     // (Re-)Cancel if current thread also interrupted
     pool.shutdownNow();
     // Preserve interrupt status
     Thread.currentThread().interrupt();
   }
 }

: