'ThreadPool'에 해당되는 글 4건

  1. 2017.11.24 [Java] Excutors 에서 제공하는 ExecutorService
  2. 2015.10.01 [Kibana] kibana 를 이용한 모니터링 대쉬보드 사용시 주의 할 점 - Search Thread.
  3. 2015.04.30 [Elasticsearch] NodeBuilder 이용 시 connection pool 주의.
  4. 2013.08.13 [Elasticsearch] threadpool 설정.

[Java] Excutors 에서 제공하는 ExecutorService

ITWeb/개발일반 2017. 11. 24. 15:36

구글링 하기 귀찮아서 소소 코드에 있는 주석이랑 코드 가져 왔습니다.

/**
* Creates a thread pool that reuses a fixed number of threads
* operating off a shared unbounded queue. At any point, at most
* {@code nThreads} threads will be active processing tasks.
* If additional tasks are submitted when all threads are active,
* they will wait in the queue until a thread is available.
* If any thread terminates due to a failure during execution
* prior to shutdown, a new one will take its place if needed to
* execute subsequent tasks. The threads in the pool will exist
* until it is explicitly {@link ExecutorService#shutdown shutdown}.
*
* @param nThreads the number of threads in the pool
* @return the newly created thread pool
* @throws IllegalArgumentException if {@code nThreads <= 0}
*/
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
/**
* Creates a thread pool that maintains enough threads to support
* the given parallelism level, and may use multiple queues to
* reduce contention. The parallelism level corresponds to the
* maximum number of threads actively engaged in, or available to
* engage in, task processing. The actual number of threads may
* grow and shrink dynamically. A work-stealing pool makes no
* guarantees about the order in which submitted tasks are
* executed.
*
* @param parallelism the targeted parallelism level
* @return the newly created thread pool
* @throws IllegalArgumentException if {@code parallelism <= 0}
* @since 1.8
*/
public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool
(parallelism,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
/**
* Creates a work-stealing thread pool using all
* {@link Runtime#availableProcessors available processors}
* as its target parallelism level.
* @return the newly created thread pool
* @see #newWorkStealingPool(int)
* @since 1.8
*/
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
/**
* Creates a thread pool that reuses a fixed number of threads
* operating off a shared unbounded queue, using the provided
* ThreadFactory to create new threads when needed. At any point,
* at most {@code nThreads} threads will be active processing
* tasks. If additional tasks are submitted when all threads are
* active, they will wait in the queue until a thread is
* available. If any thread terminates due to a failure during
* execution prior to shutdown, a new one will take its place if
* needed to execute subsequent tasks. The threads in the pool will
* exist until it is explicitly {@link ExecutorService#shutdown
* shutdown}.
*
* @param nThreads the number of threads in the pool
* @param threadFactory the factory to use when creating new threads
* @return the newly created thread pool
* @throws NullPointerException if threadFactory is null
* @throws IllegalArgumentException if {@code nThreads <= 0}
*/
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
/**
* Creates an Executor that uses a single worker thread operating
* off an unbounded queue. (Note however that if this single
* thread terminates due to a failure during execution prior to
* shutdown, a new one will take its place if needed to execute
* subsequent tasks.) Tasks are guaranteed to execute
* sequentially, and no more than one task will be active at any
* given time. Unlike the otherwise equivalent
* {@code newFixedThreadPool(1)} the returned executor is
* guaranteed not to be reconfigurable to use additional threads.
*
* @return the newly created single-threaded Executor
*/
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
/**
* Creates an Executor that uses a single worker thread operating
* off an unbounded queue, and uses the provided ThreadFactory to
* create a new thread when needed. Unlike the otherwise
* equivalent {@code newFixedThreadPool(1, threadFactory)} the
* returned executor is guaranteed not to be reconfigurable to use
* additional threads.
*
* @param threadFactory the factory to use when creating new
* threads
*
* @return the newly created single-threaded Executor
* @throws NullPointerException if threadFactory is null
*/
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}


/**
* Creates a thread pool that creates new threads as needed, but
* will reuse previously constructed threads when they are
* available. These pools will typically improve the performance
* of programs that execute many short-lived asynchronous tasks.
* Calls to {@code execute} will reuse previously constructed
* threads if available. If no existing thread is available, a new
* thread will be created and added to the pool. Threads that have
* not been used for sixty seconds are terminated and removed from
* the cache. Thus, a pool that remains idle for long enough will
* not consume any resources. Note that pools with similar
* properties but different details (for example, timeout parameters)
* may be created using {@link ThreadPoolExecutor} constructors.
*
* @return the newly created thread pool
*/
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
/**
* Creates a thread pool that creates new threads as needed, but
* will reuse previously constructed threads when they are
* available, and uses the provided
* ThreadFactory to create new threads when needed.
* @param threadFactory the factory to use when creating new threads
* @return the newly created thread pool
* @throws NullPointerException if threadFactory is null
*/
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
/**
* Creates a single-threaded executor that can schedule commands
* to run after a given delay, or to execute periodically.
* (Note however that if this single
* thread terminates due to a failure during execution prior to
* shutdown, a new one will take its place if needed to execute
* subsequent tasks.) Tasks are guaranteed to execute
* sequentially, and no more than one task will be active at any
* given time. Unlike the otherwise equivalent
* {@code newScheduledThreadPool(1)} the returned executor is
* guaranteed not to be reconfigurable to use additional threads.
* @return the newly created scheduled executor
*/
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
/**
* Creates a single-threaded executor that can schedule commands
* to run after a given delay, or to execute periodically. (Note
* however that if this single thread terminates due to a failure
* during execution prior to shutdown, a new one will take its
* place if needed to execute subsequent tasks.) Tasks are
* guaranteed to execute sequentially, and no more than one task
* will be active at any given time. Unlike the otherwise
* equivalent {@code newScheduledThreadPool(1, threadFactory)}
* the returned executor is guaranteed not to be reconfigurable to
* use additional threads.
* @param threadFactory the factory to use when creating new
* threads
* @return a newly created scheduled executor
* @throws NullPointerException if threadFactory is null
*/
public static ScheduledExecutorService newSingleThreadScheduledExecutor( ThreadFactory threadFactory) {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1, threadFactory));
}
/**
* Creates a thread pool that can schedule commands to run after a
* given delay, or to execute periodically.
* @param corePoolSize the number of threads to keep in the pool,
* even if they are idle
* @return a newly created scheduled thread pool
* @throws IllegalArgumentException if {@code corePoolSize < 0}
*/
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}


/**
* Creates a thread pool that can schedule commands to run after a
* given delay, or to execute periodically.
* @param corePoolSize the number of threads to keep in the pool,
* even if they are idle
* @param threadFactory the factory to use when the executor
* creates a new thread
* @return a newly created scheduled thread pool
* @throws IllegalArgumentException if {@code corePoolSize < 0}
* @throws NullPointerException if threadFactory is null
*/
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
/**
* Returns an object that delegates all defined {@link
* ExecutorService} methods to the given executor, but not any
* other methods that might otherwise be accessible using
* casts. This provides a way to safely "freeze" configuration and
* disallow tuning of a given concrete implementation.
* @param executor the underlying implementation
* @return an {@code ExecutorService} instance
* @throws NullPointerException if executor null
*/
public static ExecutorService unconfigurableExecutorService(ExecutorService executor) {
if (executor == null)
throw new NullPointerException();
return new DelegatedExecutorService(executor);
}
/**
* Returns an object that delegates all defined {@link
* ScheduledExecutorService} methods to the given executor, but
* not any other methods that might otherwise be accessible using
* casts. This provides a way to safely "freeze" configuration and
* disallow tuning of a given concrete implementation.
* @param executor the underlying implementation
* @return a {@code ScheduledExecutorService} instance
* @throws NullPointerException if executor null
*/
public static ScheduledExecutorService unconfigurableScheduledExecutorService( ScheduledExecutorService executor) {
if (executor == null)
throw new NullPointerException();
return new DelegatedScheduledExecutorService(executor);
}


  • newFixedThreadPool
    • 정해준 크기 만큼의 쓰레드를 생성하고 재사용 합니다. 명시적으로 shutdown() 하지 않는 한 쓰레드 중 하나가 종료 되면 다시 생성을 하게 됩니다.


  • newWorkStealingPool
    • 작업 순서에 대한 보장은 하지 않습니다, parallelism 수준에 따라 쓰레드를 충분히 지원 하지만 다중큐를 사용하는 것이 좋습니다. 쓰레드의 크기는 동적으로 늘었다 줄었다 합니다.


  • newSingleThreadExecutor
    • 쓰레드를 하나만 생성해서 사용합니다. 만약 종료 되면 다시 쓰레드가 생성이 되며 작업에 대한 연속성을 보장해 줍니다.


  • newCachedThreadPool
    • 필요한 만큼 쓰레드를 생성 하게 됩니다. 하지만 60초 동안 사용되지 않으면 풀에서 제거 됩니다.
    • 60초가 기본 설정 값 이며, 생성된 쓰레드는 재사용 됩니다.


  • newSingleThreadScheduledExecutor
    • 스케쥴링이 가능한 하나의 쓰레드를 생성 합니다. 스케쥴 기능을 빼고는 newSingleThreadExecutor 와 비슷 하다고 보시면 됩니다.


  • newScheduledThreadPool
    • 스케쥴링이 가능한 쓰레드 풀을 생성 합니다. 쓰레드가 idle 상태에 있더라도 종료 되거나 소멸 되지 않고 풀에 그대로 남아 있습니다.


:

[Kibana] kibana 를 이용한 모니터링 대쉬보드 사용시 주의 할 점 - Search Thread.

Elastic/Kibana 2015. 10. 1. 13:56

ELK를 이용해서 매트릭 수집이나 시스템 모니터링 등을 하고 계신 분들이 많은 걸로 압니다.

이미 경험해 보신 분들이 많을 것 같기는 하지만 그래도 공유 차원에서 글 작성해 보겠습니다.


보통 ELK 를 이용해서 데이터를 수집 후 kibana 로 dashboard 구성을 해서 지표를 보게 됩니다.

잘 아시겠지만 kibana의 기본 설정은 logstash-* 로 모든 index를 대상으로 질의를 하게 됩니다.

이와 같은 이유로 시간이 지날 수록 성능이 떨어지고 에러가 발생하게 되는데요.


잘 아시겠지만 elasticsearch 에서의 모든 action 은 thread 단위로 동작 하게 됩니다.

그렇기 때문에 kibana를 이용한 dashboard 를 계속 띄워 놓고 auto refresh 를 사용하게 되면 해당 주기 동안 계속 search request 가 실행 됩니다.


하나의 예를 들어 보면 index 당 shard 크기를 5개로 했다고 가정 하겠습니다. (replica 0)

현재까지 생성된 index 는 30일치 30개 입니다.

그렇다면 총 shard 수는 5 x 30 = 150개가 됩니다.


kibana 를 이용해서 한 화면에 8개의 지표를 볼 수 있는 dashboard를 구성했다고 가정 하겠습니다.

이와 같이 했을 경우 dashboard 에서 실행 되는 질의는 총 8개 입니다.

이 8개의 질의에 대한 elasticsearch에서의 실행 되는 search thread 수는 얼마나 될까요?


8 x 5 x 30 = 1200 개의 search thread 가 실행 되게 됩니다.


이게 무슨 문제를 일으킬 수 있을까요?


elasticsearch에서는 threadpool 설정을 통해서 search thread 크기를 조정 할 수 있습니다.

아래는 ThreadPool.java 의 code snippet 입니다.


defaultExecutorTypeSettings = ImmutableMap.<String, Settings>builder()

....

    .put(Names.SEARCH, settingsBuilder().put("type", "fixed").put("size", ((availableProcessors * 3) / 2) + 1).put("queue_size", 1000).build())

....


위에서 보시는 것 처럼 기본 runnable size 는 (availableProcessors * 3) / 2) + 1 입니다.

그리고 queue_size 는 1000 개로 정해져 있습니다.


CPU가 4 core 라고 가정하면,

runnable thread size = ( ( 4 x 3 ) / 2 ) + 1 = 7

queue thread size = 1000

이와 같습니다.


8개의 지표로 구성된 dashboard를 한번 호출 할 때마다 1200 개의 search request 가 발생을 하게 되고 이 시점에 일부 시스템 리소스가 부족하게 된다면 해당 elasticsearch 로 다른 applicaiton 에서 aggregation과 같은 질의를 실행 했을 때 잘 못된 정보가 return 될 수도 있습니다.


실제 이런 경우 elasticsearch의 error log 를 보면 아래와 같은 메시지를 보실 수 있습니다.


[2015-09-29 00:08:40,896][DEBUG][action.search.type       ] [Madame Masque] [....][7], node[zXJSZ4IYS2KwPhj190hhEQ], [P], s[STARTED]: Failed

 to execute [org.elasticsearch.action.search.SearchRequest@542e2e00] lastShard [true]

org.elasticsearch.common.util.concurrent.EsRejectedExecutionException: rejected execution (queue capacity 1000) on org.elasticsearch.search.action.SearchServiceTransp

ortAction$23@76a71853

at org.elasticsearch.common.util.concurrent.EsAbortPolicy.rejectedExecution(EsAbortPolicy.java:62)

at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:821)

at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1372)

at org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor.execute(EsThreadPoolExecutor.java:79)

......중략.....

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:745)


에러 메시지는 queue capacity 1000 을 넘었기 때문에 reject 했다는 내용입니다.


해결 방법은 간단 합니다.

- kibana의 dashboard 화면을 단순화 하고 필요한 시점에만 띄워 놓고 봅니다.

- kibana의 auto-refresh 를 off 하거나 주기를 길게 줍니다.

- threadpool.search.queue_size 를 늘려 줍니다.


경험 할 수도 있고 안할 수도 있지만 운영하면서 알고 있으면 그래도 도움은 되지 않을까 싶어서 공유 합니다.

:

[Elasticsearch] NodeBuilder 이용 시 connection pool 주의.

Elastic/Elasticsearch 2015. 4. 30. 18:58

별건 아니고 간혹 실수 하는 경우가 있어서 잊지 않기 위해 적어 봅니다.

 그 동안 TransportClient 만 이야기 한것 같아 NodeBuilder 사용 시 주의해야 하는 점을 공유해 봅니다.


NodeBuilder는 응용프로그램을 통한 자체 Cluster 환경 구성이나 Client 노드를 구성하는데 아주 유용합니다.

또한 TestCode 작성 시 프로그램적으로 Elasticsearch Mini Cluster 구성을 하기도 쉽죠.


오늘 공유하는 내용은 NodeBuilder 이용 시 connection pool 설정을 할 때 size 를 잘못하는 오류에 대해서 입니다.


[소스코드 : NettyTransport.java]

// we want to have at least 1 for reg/state/ping

if (this.connectionsPerNodeReg == 0) {

    throw new ElasticsearchIllegalArgumentException("can't set [connection_per_node.reg] to 0");

}

if (this.connectionsPerNodePing == 0) {

    throw new ElasticsearchIllegalArgumentException("can't set [connection_per_node.ping] to 0");

}

if (this.connectionsPerNodeState == 0) {

    throw new ElasticsearchIllegalArgumentException("can't set [connection_per_node.state] to 0");

}


위 코드를 보면 명확 합니다.

적어도 1 이상 설정을 해주셔야 한다는 이야기 입니다.

가끔 TestCode 작성 시 너무 많은 Thread의 생성과 리소스를 낭비하게 하는 원이이 될 수도 있으니 참고 하셔서 사용 하시면 좋을 것 같습니다.


위 설정에서 node.local 이 true 로 설정 되어 있다면 당연히 모두 0 으로 설정 하셔도 됩니다.

:

[Elasticsearch] threadpool 설정.

Elastic/Elasticsearch 2013. 8. 13. 18:51

[Server Side Setting]

- elasticsearch.yml

- http://edgeofsanity.net/article/2012/12/26/elasticsearch-for-logging.html


[threadpool bounded]

indices.memory.index_buffer_size: 50%


# Search pool

threadpool.search.type: fixed

threadpool.search.size: 20

threadpool.search.queue_size: 100

 

# Bulk pool

threadpool.bulk.type: fixed

threadpool.bulk.size: 60

threadpool.bulk.queue_size: 300

 

# Index pool

threadpool.index.type: fixed

threadpool.index.size: 20

threadpool.index.queue_size: 100


[threadpool unbounded]

threadpool.index.queue_size: -1


: