
요청을 처리하는 스레드 모델의 발전 과정을 통해 스레드 풀에 대해 이해하고, 스레드 풀을 올바르게 구성하는 방법을 알아봅니다.
Single Threaded
싱글 스레드에서의 서버의 요청 처리를 코드로 표현하면 다음과 같다.
ServerSocket listener = new ServerSocket(8080);
try {
while (true) {
Socket socket = listener.accept();
try {
handleRequest(socket);
} catch (IOException e) {
e.printStackTrace();
}
}
} finally {
listener.close();
}
final static String response =
"HTTP/1.0 200 OK\r\n" +
"Content-type: text/plain\r\n" +
"\r\n" +
"Hello World\r\n";
public static void handleRequest(Socket socket) throws IOException {
// Read the input stream, and return "200 OK"
try {
BufferedReader in = new BufferedReader(
new InputStreamReader(socket.getInputStream()));
log.info(in.readLine());
OutputStream out = socket.getOutputStream();
out.write(response.getBytes(StandardCharsets.UTF_8));
} finally {
socket.close();
}
}
8080 port를 읽는 소켓에서 요청을 받아들이고 요청을 처리하도록 handleRequest에게 전달한다. 소켓이 받아들인 요청은 동기화 작업이기 때문에 전체 동작이 완료될 때까지 다른 요청은 처리하지 못한다.
Multi Threaded
멀티 스레드 환경에서는 I/O로 인해 스레드가 Blocking되더라도 다른 스레드를 사용하여 비동기로 처리할 수 있다.
// Main loop here
ServerSocket listener = new ServerSocket(8080);
try {
while (true) {
Socket socket = listener.accept();
new Thread( new HandleRequestRunnable(socket) ).start();
}
} finally {
listener.close();
}
public static class HandleRequestRunnable implements Runnable {
final Socket socket;
public HandleRequestRunnable(Socket socket) {
this.socket = socket;
}
public void run() {
try {
handleRequest(socket);
} catch (IOException e) {
e.printStackTrace();
}
}
}
위와 같은 기술을 ‘thread per request’라고 한다. 이런 요청별 스레드 접근 방식에는 JVM과 OS 모두 리소스를 할당해야 하므로 새 스레드를 생성하고 나중에 이를 삭제하는 데 비용이 많이 들 수 있다. 또한 생성되는 스레드 수에 제한이 없으므로, 한 번에 대량의 요청이 들어올 시 리소스가 고갈될 수 있다.
Thread Pool
스레드 연결 비용이나 무제한 스레드 같은 이슈를 해결하기 위해 일반적으로 스레드 풀을 사용한다. 풀은 모든 스레드를 추적하여 필요할 때 상한까지 새 스레드를 만들고 가능한 경우 유휴 스레드를 재 사용한다.
ServerSocket listener = new ServerSocket(8080);
ExecutorService executor = Executors.newFixedThreadPool(4);
try {
while (true) {
Socket socket = listener.accept();
executor.submit( new HandleRequestRunnable(socket) );
}
} finally {
listener.close();
}
스레드를 직접 만드는 대신, 스레드 풀에서 실행할 작업을 제출하는
ExecutorService
를 사용한다. 위 코드에서는 newFixedThreadPool을 사용하고 있지만, 그외에도 newCachedThreadPool을 사용하거나, ThreadPoolExecutor 생성자로 직접 생성할 수 있다. public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue)
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
newFixedThreadPool
와 newCachedThreadPool
는 각각 LinkedBlockingQueue
와 SynchronousQueue
를 인자로 전달하고 있다. SynchronousQueue
는 내부 용량이 없어서 put 연산과 take 연산이 동시에 일어나야 한다. 때문에 사용 가능한 스레드가 없다면 새로운 스레드를 생성하고, 만일 최대 스레드 수에 도달했다면 RejectedExecutionHandler
의 정책에 따라 처리한다. newFixedThreadPool
은 LinkedBlockingQueue
의 사이즈를 지정하지 않아서 Integer.MAX_VALUE
까지 대기시킨다.정리하자면,
newFixedThreadPool
은 스레드 수는 고정하는 대신 요청을 Integer.MAX_VALUE
까지 큐에 쌓아 놓고, newCachedThreadPool
는 요청을 큐에 저장하지 않는 대신 Integer.MAX_VALUE
까지 스레드 수를 늘려서 처리한다.Work Queue
public static ExecutorService newBoundedFixedThreadPool(int nThreads, int capacity) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(capacity),
new ThreadPoolExecutor.DiscardPolicy());
}
public static void boundedThreadPoolServerSocket() throws IOException {
ServerSocket listener = new ServerSocket(8080);
ExecutorService executor = newBoundedFixedThreadPool(4, 16);
try {
while (true) {
Socket socket = listener.accept();
executor.submit( new HandleRequestRunnable(socket) );
}
} finally {
listener.close();
}
}
앞서
newFixedThreadPool
은 요청을 대기시킬 수 있는 수가 너무 많다. 때문에 ThreadPoolExecutor
를 생성할 때 큐의 크기를 제한할 수 있다. 위에서는 16으로 크기를 제한하고 있다.큐가 꽉 찼을 때, 이후 들어오는 요청들을 처리하는 정책을 정할 수 있는데 여기서는 DiscardPolicy를 사용하고 있다. 그 외에 AbortPolicy나 CallerRunsPolicy도 있다. 기본 설정은 예외를 던지는 AbortPolicy이다.
- AbortPolicy
- RejectedExecutionException을 발생
- DiscardOldestPolicy
- 오래된 작업을 skip. 모든 task가 무조건 처리되어야 할 필요가 없을 경우 사용.
- DiscardPolicy
- 처리하려는 작업을 skip. 역시 모든 task가 무조건 처리되어야 할 필요가 없을 경우 사용.
- CallerRunsPolicy
- shutdown 상태가 아니라면 ThreadPoolTaskExecutor에 요청한 thread에서 직접 처리
- 예외와 누락없이 최대한 처리하려할 때 사용
Multiple Thread pools
MSA 환경에서는 여러 백엔드 서비스가 서로 연결하는 것이 일반적이다. 이 경우, 한 서비스의 오류가 다른 서비스로 전파되지 않도록 하는 것이 중요하다. 연결하는 서비스 별로 스레드 풀을 제공하여 각 서비스의 오류를 격리할 수 있다.
또한, 다중 스레드 풀은 교착 상태를 방지하는 데 도움이 된다. 오래 걸리는 작업에 별도 스레드 풀을 할당하면 느린 작업으로 인해 다른 작업들까지 대기해야하는 문제를 완화할 수 있다.
이 글은 아래 기사를 토대로 정리 및 부가 설명을 추가하여 작성하였습니다.
Share article