'concurrent'에 해당되는 글 5건

  1. 2017.12.12 [Java] Executors 로 간단 multithread 테스트
  2. 2017.11.24 [Java] Excutors 에서 제공하는 ExecutorService
  3. 2017.11.24 [Java] java.util.concurrent 관련 예제
  4. 2016.09.28 [HttpClient] Async Http Request Docs.
  5. 2013.04.09 [java] java.util.concurrent.* 링크

[Java] Executors 로 간단 multithread 테스트

ITWeb/개발일반 2017. 12. 12. 11:54

이전 글의 참고 문서들을 먼저 보시면 좋습니다.

http://jjeong.tistory.com/1296

package hello.executors;

import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class HelloExecutors {

public static class HelloCallableThread implements Callable<Integer> {
int input;

public HelloCallableThread(int input) {
this.input = input;
}

@Override
public Integer call() throws Exception {
return input + 1;
}
}

public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(4);
Set<Callable<Integer>> callables = new HashSet<Callable<Integer>>();

callables.add(new HelloCallableThread(1));
callables.add(new HelloCallableThread(2));
callables.add(new HelloCallableThread(3));
callables.add(new HelloCallableThread(4));

List<Future<Integer>> futures = executorService.invokeAll(callables);

for(Future<Integer> future : futures){
System.out.println("future.get = " + future.get());
}

executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.SECONDS);
}
}


:

[Java] Excutors 에서 제공하는 ExecutorService

ITWeb/개발일반 2017. 11. 24. 15:36

구글링 하기 귀찮아서 소소 코드에 있는 주석이랑 코드 가져 왔습니다.

/**
* Creates a thread pool that reuses a fixed number of threads
* operating off a shared unbounded queue. At any point, at most
* {@code nThreads} threads will be active processing tasks.
* If additional tasks are submitted when all threads are active,
* they will wait in the queue until a thread is available.
* If any thread terminates due to a failure during execution
* prior to shutdown, a new one will take its place if needed to
* execute subsequent tasks. The threads in the pool will exist
* until it is explicitly {@link ExecutorService#shutdown shutdown}.
*
* @param nThreads the number of threads in the pool
* @return the newly created thread pool
* @throws IllegalArgumentException if {@code nThreads <= 0}
*/
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
/**
* Creates a thread pool that maintains enough threads to support
* the given parallelism level, and may use multiple queues to
* reduce contention. The parallelism level corresponds to the
* maximum number of threads actively engaged in, or available to
* engage in, task processing. The actual number of threads may
* grow and shrink dynamically. A work-stealing pool makes no
* guarantees about the order in which submitted tasks are
* executed.
*
* @param parallelism the targeted parallelism level
* @return the newly created thread pool
* @throws IllegalArgumentException if {@code parallelism <= 0}
* @since 1.8
*/
public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool
(parallelism,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
/**
* Creates a work-stealing thread pool using all
* {@link Runtime#availableProcessors available processors}
* as its target parallelism level.
* @return the newly created thread pool
* @see #newWorkStealingPool(int)
* @since 1.8
*/
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
/**
* Creates a thread pool that reuses a fixed number of threads
* operating off a shared unbounded queue, using the provided
* ThreadFactory to create new threads when needed. At any point,
* at most {@code nThreads} threads will be active processing
* tasks. If additional tasks are submitted when all threads are
* active, they will wait in the queue until a thread is
* available. If any thread terminates due to a failure during
* execution prior to shutdown, a new one will take its place if
* needed to execute subsequent tasks. The threads in the pool will
* exist until it is explicitly {@link ExecutorService#shutdown
* shutdown}.
*
* @param nThreads the number of threads in the pool
* @param threadFactory the factory to use when creating new threads
* @return the newly created thread pool
* @throws NullPointerException if threadFactory is null
* @throws IllegalArgumentException if {@code nThreads <= 0}
*/
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
/**
* Creates an Executor that uses a single worker thread operating
* off an unbounded queue. (Note however that if this single
* thread terminates due to a failure during execution prior to
* shutdown, a new one will take its place if needed to execute
* subsequent tasks.) Tasks are guaranteed to execute
* sequentially, and no more than one task will be active at any
* given time. Unlike the otherwise equivalent
* {@code newFixedThreadPool(1)} the returned executor is
* guaranteed not to be reconfigurable to use additional threads.
*
* @return the newly created single-threaded Executor
*/
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
/**
* Creates an Executor that uses a single worker thread operating
* off an unbounded queue, and uses the provided ThreadFactory to
* create a new thread when needed. Unlike the otherwise
* equivalent {@code newFixedThreadPool(1, threadFactory)} the
* returned executor is guaranteed not to be reconfigurable to use
* additional threads.
*
* @param threadFactory the factory to use when creating new
* threads
*
* @return the newly created single-threaded Executor
* @throws NullPointerException if threadFactory is null
*/
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}


/**
* Creates a thread pool that creates new threads as needed, but
* will reuse previously constructed threads when they are
* available. These pools will typically improve the performance
* of programs that execute many short-lived asynchronous tasks.
* Calls to {@code execute} will reuse previously constructed
* threads if available. If no existing thread is available, a new
* thread will be created and added to the pool. Threads that have
* not been used for sixty seconds are terminated and removed from
* the cache. Thus, a pool that remains idle for long enough will
* not consume any resources. Note that pools with similar
* properties but different details (for example, timeout parameters)
* may be created using {@link ThreadPoolExecutor} constructors.
*
* @return the newly created thread pool
*/
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
/**
* Creates a thread pool that creates new threads as needed, but
* will reuse previously constructed threads when they are
* available, and uses the provided
* ThreadFactory to create new threads when needed.
* @param threadFactory the factory to use when creating new threads
* @return the newly created thread pool
* @throws NullPointerException if threadFactory is null
*/
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
/**
* Creates a single-threaded executor that can schedule commands
* to run after a given delay, or to execute periodically.
* (Note however that if this single
* thread terminates due to a failure during execution prior to
* shutdown, a new one will take its place if needed to execute
* subsequent tasks.) Tasks are guaranteed to execute
* sequentially, and no more than one task will be active at any
* given time. Unlike the otherwise equivalent
* {@code newScheduledThreadPool(1)} the returned executor is
* guaranteed not to be reconfigurable to use additional threads.
* @return the newly created scheduled executor
*/
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
/**
* Creates a single-threaded executor that can schedule commands
* to run after a given delay, or to execute periodically. (Note
* however that if this single thread terminates due to a failure
* during execution prior to shutdown, a new one will take its
* place if needed to execute subsequent tasks.) Tasks are
* guaranteed to execute sequentially, and no more than one task
* will be active at any given time. Unlike the otherwise
* equivalent {@code newScheduledThreadPool(1, threadFactory)}
* the returned executor is guaranteed not to be reconfigurable to
* use additional threads.
* @param threadFactory the factory to use when creating new
* threads
* @return a newly created scheduled executor
* @throws NullPointerException if threadFactory is null
*/
public static ScheduledExecutorService newSingleThreadScheduledExecutor( ThreadFactory threadFactory) {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1, threadFactory));
}
/**
* Creates a thread pool that can schedule commands to run after a
* given delay, or to execute periodically.
* @param corePoolSize the number of threads to keep in the pool,
* even if they are idle
* @return a newly created scheduled thread pool
* @throws IllegalArgumentException if {@code corePoolSize < 0}
*/
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}


/**
* Creates a thread pool that can schedule commands to run after a
* given delay, or to execute periodically.
* @param corePoolSize the number of threads to keep in the pool,
* even if they are idle
* @param threadFactory the factory to use when the executor
* creates a new thread
* @return a newly created scheduled thread pool
* @throws IllegalArgumentException if {@code corePoolSize < 0}
* @throws NullPointerException if threadFactory is null
*/
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
/**
* Returns an object that delegates all defined {@link
* ExecutorService} methods to the given executor, but not any
* other methods that might otherwise be accessible using
* casts. This provides a way to safely "freeze" configuration and
* disallow tuning of a given concrete implementation.
* @param executor the underlying implementation
* @return an {@code ExecutorService} instance
* @throws NullPointerException if executor null
*/
public static ExecutorService unconfigurableExecutorService(ExecutorService executor) {
if (executor == null)
throw new NullPointerException();
return new DelegatedExecutorService(executor);
}
/**
* Returns an object that delegates all defined {@link
* ScheduledExecutorService} methods to the given executor, but
* not any other methods that might otherwise be accessible using
* casts. This provides a way to safely "freeze" configuration and
* disallow tuning of a given concrete implementation.
* @param executor the underlying implementation
* @return a {@code ScheduledExecutorService} instance
* @throws NullPointerException if executor null
*/
public static ScheduledExecutorService unconfigurableScheduledExecutorService( ScheduledExecutorService executor) {
if (executor == null)
throw new NullPointerException();
return new DelegatedScheduledExecutorService(executor);
}


  • newFixedThreadPool
    • 정해준 크기 만큼의 쓰레드를 생성하고 재사용 합니다. 명시적으로 shutdown() 하지 않는 한 쓰레드 중 하나가 종료 되면 다시 생성을 하게 됩니다.


  • newWorkStealingPool
    • 작업 순서에 대한 보장은 하지 않습니다, parallelism 수준에 따라 쓰레드를 충분히 지원 하지만 다중큐를 사용하는 것이 좋습니다. 쓰레드의 크기는 동적으로 늘었다 줄었다 합니다.


  • newSingleThreadExecutor
    • 쓰레드를 하나만 생성해서 사용합니다. 만약 종료 되면 다시 쓰레드가 생성이 되며 작업에 대한 연속성을 보장해 줍니다.


  • newCachedThreadPool
    • 필요한 만큼 쓰레드를 생성 하게 됩니다. 하지만 60초 동안 사용되지 않으면 풀에서 제거 됩니다.
    • 60초가 기본 설정 값 이며, 생성된 쓰레드는 재사용 됩니다.


  • newSingleThreadScheduledExecutor
    • 스케쥴링이 가능한 하나의 쓰레드를 생성 합니다. 스케쥴 기능을 빼고는 newSingleThreadExecutor 와 비슷 하다고 보시면 됩니다.


  • newScheduledThreadPool
    • 스케쥴링이 가능한 쓰레드 풀을 생성 합니다. 쓰레드가 idle 상태에 있더라도 종료 되거나 소멸 되지 않고 풀에 그대로 남아 있습니다.


:

[Java] java.util.concurrent 관련 예제

ITWeb/개발일반 2017. 11. 24. 15:03

구글링 하기 귀찮을 땐.

http://tutorials.jenkov.com/java-util-concurrent/index.html

http://winterbe.com/posts/2015/04/07/java8-concurrency-tutorial-thread-executor-examples/


그리고 java 8 관련해서는 위에 링크 중 아래 블로그 쥔장이 잘 정리해 둔것 같내요.

http://winterbe.com/java/

:

[HttpClient] Async Http Request Docs.

ITWeb/개발일반 2016. 9. 28. 10:23


async http client 기능 구현을 위해 문서 keep 합니다.


https://hc.apache.org/httpcomponents-asyncclient-dev/quickstart.html


https://github.com/AsyncHttpClient/async-http-client

:

[java] java.util.concurrent.* 링크

ITWeb/개발일반 2013. 4. 9. 10:28

[원본글] http://blog.naver.com/windziel?Redirect=Log&logNo=60058329824


java.util.concurrent 패키지는 자바 1.5에 추가된 패키지로서 concurrent 프로그래밍에 유용한 클래스들을 모아둔 것이다.

그중 유용한 것들을 몇가지 정리

참고 - 자바 언어로 배우는 디자인 패턴 입문 멀티 쓰레드 편 - 유키 히로시

 

java.util.concurrent.Semaphore  클래스

사용할 수 있는 리소스의 수가 최대 N개인데 N개보다 많은 수의 쓰레드가 그 리소스를 필요로 하는 경우 Semaphore를 이용하여 리소스 사용을 제어한다.

 

생서자에서 리소스의 수(permits)를 지정

acquire() - 리소스를 확보. 리소스에 빈자리가 생겼을 경우 바로 쓰레드가 acquire 메소드로부터 곧바로 돌아오게 되고 세마포어 내부에서는

                      리소스의 수가 하나 감소. 리소스에 빈자리가 없을 경우 쓰레드는 acquire 메소드 내부에서 블록.
release() - 리소스를 해제. 세마포어 내부에서는 리소스가 하나 증가. acquire 메소드 안에서 대기중인 쓰레드가 있으면

                      그 중 한 개가 깨어나 acquire 메소드로부터 돌아올 수 있다.

 

<예제>

import java.util.Random;

import java.util.concurrent.Semaphore;

/**
 * 수 제한이 있는 리소스
 */
class BoundedResource {
    private final Semaphore semaphore;
    private final int permits;
    private final static Random random = new Random(314159);
    
    public BoundedResource(int permits) {
        this.semaphore = new Semaphore(permits);
        this.permits = permits;
    }
    
    public void use() throws InterruptedException {
        semaphore.acquire();
        
        try {
            doUse();
        } finally {
            semaphore.release();
        }
    }
    
    protected void doUse() throws InterruptedException {
        System.out.println("BEGIN: used = " + (permits - semaphore.availablePermits()));
        Thread.sleep(random.nextInt(500));
        System.out.println("END: used = " + (permits - semaphore.availablePermits()));
    }
}


/**
 * 리소스를 사용하는 쓰레드
 */
class UserThread extends Thread {
    private final static Random random = new Random(26535);
    private final BoundedResource resource;
    
    public void run() {
        try {
            while (true) {
                resource.use();
                Thread.sleep(random.nextInt(3000));
            }
        } catch (InterruptedException e) {
        
        }
    }
}

public class Main {
    public static void main(String[] args) {
        // 3개의 리소스 준비
        BoundedResource resource = new BoundedResource(3);
        
        // 10개의 쓰레드가 사용
        for (int i = 0; i < 10; i++) {
            new UserThread(resource).start();
        }
    }
}

 

java.util.concurrent.locks 패키지의 ReadWriteLock 인터페이스와 ReentrantReadWriteLock 클래스

인스턴스 생성시 락의 취득 순서를 공평(fair)하게 할 것인지를 선택할 수 있다.
공평한 인스턴스를 만든 경우, 대기시간이 긴 쓰레드가 최우선적으로 락을 취득할 수 있도록 조정된다.
재입장 가능하다. 즉, 읽기 역할의 쓰레드가 쓰기 위한 락을 취하거나, 쓰기 역할의 쓰레드가 읽기 위한 락을 취하는 것이 가능하다.
쓰기 위한 락을 읽기 위한 락으로 다운그레이드 할 수 있다. 반대는 안된다.

 

<예제>

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class Data {
    private final char[] buffer;
    private final ReadWriteLock lock = new ReentrantReadWriteLock(ture /* fair */);
    private final Lock readLock = lock.readLock();
    private final Lock writeLock = lock.writeLock();
    
    public Data(int size) {
        this.buffer = new char[size];
        
        for (int i = 0; i < buffer.length; i++) {
            buffer[i] = '*';
        }
    }
    
    public char[] read() throws InterruptedException {
        readLock.lock();
        try {
            return doRead();
        } finally {
            readLock.unlock();
        }
    }
    
    public void write(char c) throws InterruptedException {
        writeLock.lock();
        try {
            return doWrite(c);
        } finally {
            writeLock.unlock();
        }
    }
    
    private char[] doRead() {
        char[] newbuf = new char[buffer.length];
        
        for (int i = 0; i < buffer.length; i++) {
            newbuf[i] = buffer[i];
        }
        slowly();
        return newbuf;
    }
    
    private void doWrite(char c) {
        for (int i = 0; i < buffer.length; i++) {
            buffer[i] = c;
            slowly();
        }
    }
    
    private void slowly() {
        try {
            Thread.sleep(50);
        } catch (InterruptedException e) {
        }
    }
}

import java.util.Random;

public class WriterThread extends Thread {
    private static final Random random = new Random();
    private final Data data;
    private final String filter;
    private int index = 0;
    
    public WriterThread(Data data, String filter) {
        this.data = data;
        this.filter = filter;
    }
    
    public void run() {
        try {
            while (true) {
                char c = nextchar();
                data.write(c);
                Thread.sleep(random.nextInt(3000));
            }
        } catch (InterruptedException e) {
        
        }
    }
    
    private char nextchar() {
        char c = filter.charAt(index);
        index++;
        if (index >= filter.length()) {
            index = 0;
        }
        
        return c;
    }
}

public class ReaderThread extends Thread {
    private final Data data;
    
    public ReaderThread(Data data) {
        this.data = data;
    }
    
    public void run() {
        try {
            while (true) {
                char[] readbuf = data.read();
                System.out.println(Thread.currentThread().getName() + " reads "
                    + String.valueOf(readbuf));
            }
        } catch (InterruptedException e) {
        
        }
    }
}

public class Main {
    public static void main(String[] args) {
        Data data = new Data(10);
        new ReaderThread(data).start();
        new ReaderThread(data).start();
        new ReaderThread(data).start();
        new ReaderThread(data).start();
        new ReaderThread(data).start();
        new WriterThread(data, "ABCDEFGHIJKLMNOPQRSTUVWXYZ").start();
        new WriterThread(data, "abcdefghijklmnopqrstuvwxyz").start();
    }
}

 

 

java.util.concurrent.ThreadFactory 인터페이스

쓰레드 생성을 추상화한 인터페이스

Thread newThread(Runnable r);

ThreadFactory 인터페이스를 구현한 객체를 바꾸는 것 만으로 쓰레드의 생성을 제어
간단한 구현에서는 이것을 사용할 필요없이 Executors.defaultThreadFactory()를 사용하여 기본 ThreadFactory를 구할수 있다.

 

java.util.concurrent.Executor 인터페이스

void execute(Runnable r);

어떤 "처리를 실행"하는 것을 추상화한 인터페이스이며 인수로 부여되는 Runnable 객체는 "실행하는 처리"의 내용을 나타낸다.

 

java.util.concurrent.ExecutorService 인터페이스

반복하여 execute할 수 있는 서비스를 추상화
배후에서 쓰레드가 항상 동작하고 있고 execute 메소드를 호출하면 Runnable 객체를 실행해 주는 것이라고 생각하면 됨.
서비스 종료용으로 shutdown() 메소드 제공

Executor의 서브 인터페이스임

보통 Executors 클래스의 유틸리티 메소드중 ExecutorService 타입을 리턴하는 메소드들을 이용하여 ExecutorService를 얻어 사용

 

java.util.concurrent.ScheduledExecutorService 인터페이스

ExecutorService의 서브 인터페이스이며 처리의 실행을 지연시키는 기능

schedule(Runnable r, long delay, TimeUnit unit);

delay는 지연시간의 수치이며, unit는 delay수치의 단위(NANOSECONDS, MICROSECONDS, MILLISECONDS, SECONDS)를 지정
Executors.newScheduledThreadPool() 메소드에 의해 ScheduledExecutorService 객체를 생성하여 사용.

 

java.util.concurrent.Executors 클래스

위의 인터페이스 등을 생성시 사용하는 유틸리티 클래스

 

java.util.concurrent.ThreadPoolExecutor 클래스

쓰레드 풀에 대해 아래와 같은 설정을 할 수 있다.
 - 쓰레드풀의 크기
 - 쓰레드를 미리 생성할 것인지 필요에 따라 생성할 것인지
 - 쓰레드를 생성할 때의 팩토리(java.util.concurrent.ThreadFactory)
 - 필요 없어진 워커 쓰레드를 종료시키기까지의 시간
 - 실행하는 일을 건넬 때의 큐인 방법
 - 실행을 거부하는 방법
 - 실행의 전처리, 후처리
 
이 클래스는 ExecutorService를 구현한 것으로 보통은 Executors 클래스에 준비되어 있는 메소드를 사용하는 것이 편리.

Executors.newCachedThreadPool(), Executors.newFixedThreadPool() 등

세세한 설정이 필요한 경우 직접 생성

 

java.util.concurrent.Callable 인터페이스

반환값이 있는 처리의 호출을 추상화
Runnable 인터페이스의 run 메소드와 비슷하지만 반환값이 있다.
Callable<String>라고 기술하면 call 메소드의 반환값의 형이 String인 Callable 인터페이스를 나타낸다.


java.util.concurrent.Future 인터페이스

값을 취득하는 get 메소드가 선언
Future<String>라고 기술하면 get 메소드의 반환값이 String인 Future 인터페이스를 나타낸다.


java.util.concurrent.FutureTask 클래스

Future 인터페이스를 구현한 표준적인 클래스
값을 취득하는 get()
실행을 중단하는 cancel()
값을 설정하는 set()
예외를 설정하는 setException()
Runnable 인터페이스를 구현하고 있으므로 run()

생성자의 인수에 Callable 객체를 건내고 FutureTask의 run 메소드를 호출하면 생성자의 인수로 건넨 Callable 객체의 call 메소드가 실행된다. call 메소드를 호출한 쓰레드는 call 메소드의 반환값을 동기적으로 구하고 그 반환값을 FutureTask의 set 메소드롤 설정한다. 만약 call 메소드에서 예외가 발생하면 FutureTask의 setException 메소드를 사용하여 예외를 설정한다. 필요에 따라 get 메소드로 값을 취하러 가면 call 메소드의 반환값을 구할 수 있다.

 

java.util.concurrent.CountDownLatch 클래스

지정한 회수만큼 countdown 메소드가 호출되기를 기다린다.

 

<예제>

import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.CountDownLatch;

public class Main {
    private static final int TASKS = 10;
    
    public static void main(String[] args) {
        System.out.println("BEGIN");
        ExecutorService service = Executors.newFixedThreadPool(5);
        CountDownLatch doneLatch = new CountDownLatch(TASKS);
        
        try {
            for (int i = 0; i < TASKS; i++) {
                service.execute(new MyTask(doneLatch, i));
            }
            System.out.println("AWAIT");
            
            doneLatch.await();
        } catch (InterruptedException e) {
        
        } finally {
            service.shutdown();
            System.out.println("END");
        }
    }
}


import java.util.Random;

import java.util.concurrent.CountDownLatch;

public class MyTask implements Runnable {

    private final CountDownLatch donLatch;
    private final int context;
    private static final Random random = new Random(314159);
    
    public MyTask(CountDownLatch doneLatch, int context) {
        this.doneLatch = doneLatch;
        this.context = context;
    }
    
    public void run() {
        doTask();
        doneLatch.countDown();
    }
    
    protected void doTask() {
        String name = Thread.currentThread().getName();
        System.out.println(name + ":MyTask:BEGIN:context = " + context);
        try {
            Thread.sleep(random.nextInt(3000));
        } catch (InterruptedException e) {
        
        } finally {
            System.out.println(name + ":MyTask:END:context = " + context);
        }
    }
}

 

위의 인터페이스, 클래스들 외에도 멀티 쓰레드 프로그램에서 안전하게 사용할 수 있는 자료구조에 대한 인터페이스 및 이를 구현한 클래스들이 있다.

BlockingQueue, BlockingDeque ConcurrentMap 등의 인터페이스를 구현한 클래스들이 있으므로 필요에 따라 가져다 사용하면 된다.

 

장황하게 많은 인터페이스, 클래스들을 설명했지만 자주 사용할 만한 것은

Executors 유틸리티 클래스를 이용해서 단일 쓰레드의 경우 newSingleThreadExecutor(), 쓰레드풀의 경우 newFixedThreadPool() 또는 newCachedThreadPool() 메소드를 이용하여 ExecutorService의 인스턴스를 획득한 후 Runnable을 구현한 클래스를 execute() 메소드에 넘겨주어 시작하고, shutdown() 또는 shutdownNow() 메소드를 사용하여 중지시키는 정도로 사용하면 될 듯하다.

 

<예제 - from JDK API Document>

class NetworkService implements Runnable {
   private final ServerSocket serverSocket;
   private final ExecutorService pool;

   public NetworkService(int port, int poolSize)
       throws IOException {
     serverSocket = new ServerSocket(port);
     pool = Executors.newFixedThreadPool(poolSize);
   }

   public void run() { // run the service
     try {
       for (;;) {
         pool.execute(new Handler(serverSocket.accept()));
       }
     } catch (IOException ex) {
       pool.shutdown();
     }
   }
 }

 class Handler implements Runnable {
   private final Socket socket;
   Handler(Socket socket) { this.socket = socket; }
   public void run() {
     // read and service request on socket
   }
 }

 void shutdownAndAwaitTermination(ExecutorService pool) {
   pool.shutdown(); // Disable new tasks from being submitted
   try {
     // Wait a while for existing tasks to terminate
     if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
       pool.shutdownNow(); // Cancel currently executing tasks
       // Wait a while for tasks to respond to being cancelled
       if (!pool.awaitTermination(60, TimeUnit.SECONDS))
           System.err.println("Pool did not terminate");
     }
   } catch (InterruptedException ie) {
     // (Re-)Cancel if current thread also interrupted
     pool.shutdownNow();
     // Preserve interrupt status
     Thread.currentThread().interrupt();
   }
 }

: