Thread 모델 발전 과정

maro's avatar
Sep 04, 2024
Thread 모델 발전 과정
요청을 처리하는 스레드 모델의 발전 과정을 통해 스레드 풀에 대해 이해하고, 스레드 풀을 올바르게 구성하는 방법을 알아봅니다.

Single Threaded

싱글 스레드에서의 서버의 요청 처리를 코드로 표현하면 다음과 같다.
ServerSocket listener = new ServerSocket(8080); try { while (true) { Socket socket = listener.accept(); try { handleRequest(socket); } catch (IOException e) { e.printStackTrace(); } } } finally { listener.close(); } final static String response = "HTTP/1.0 200 OK\r\n" + "Content-type: text/plain\r\n" + "\r\n" + "Hello World\r\n"; public static void handleRequest(Socket socket) throws IOException { // Read the input stream, and return "200 OK" try { BufferedReader in = new BufferedReader( new InputStreamReader(socket.getInputStream())); log.info(in.readLine()); OutputStream out = socket.getOutputStream(); out.write(response.getBytes(StandardCharsets.UTF_8)); } finally { socket.close(); } }
8080 port를 읽는 소켓에서 요청을 받아들이고 요청을 처리하도록 handleRequest에게 전달한다. 소켓이 받아들인 요청은 동기화 작업이기 때문에 전체 동작이 완료될 때까지 다른 요청은 처리하지 못한다.
 

Multi Threaded

멀티 스레드 환경에서는 I/O로 인해 스레드가 Blocking되더라도 다른 스레드를 사용하여 비동기로 처리할 수 있다.
// Main loop here ServerSocket listener = new ServerSocket(8080); try { while (true) { Socket socket = listener.accept(); new Thread( new HandleRequestRunnable(socket) ).start(); } } finally { listener.close(); } public static class HandleRequestRunnable implements Runnable { final Socket socket; public HandleRequestRunnable(Socket socket) { this.socket = socket; } public void run() { try { handleRequest(socket); } catch (IOException e) { e.printStackTrace(); } } }
위와 같은 기술을 ‘thread per request’라고 한다. 이런 요청별 스레드 접근 방식에는 JVM과 OS 모두 리소스를 할당해야 하므로 새 스레드를 생성하고 나중에 이를 삭제하는 데 비용이 많이 들 수 있다. 또한 생성되는 스레드 수에 제한이 없으므로, 한 번에 대량의 요청이 들어올 시 리소스가 고갈될 수 있다.
 

Thread Pool

스레드 연결 비용이나 무제한 스레드 같은 이슈를 해결하기 위해 일반적으로 스레드 풀을 사용한다. 풀은 모든 스레드를 추적하여 필요할 때 상한까지 새 스레드를 만들고 가능한 경우 유휴 스레드를 재 사용한다.
ServerSocket listener = new ServerSocket(8080); ExecutorService executor = Executors.newFixedThreadPool(4); try { while (true) { Socket socket = listener.accept(); executor.submit( new HandleRequestRunnable(socket) ); } } finally { listener.close(); }
스레드를 직접 만드는 대신, 스레드 풀에서 실행할 작업을 제출하는 ExecutorService 를 사용한다. 위 코드에서는 newFixedThreadPool을 사용하고 있지만, 그외에도 newCachedThreadPool을 사용하거나, ThreadPoolExecutor 생성자로 직접 생성할 수 있다.
 
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
newFixedThreadPoolnewCachedThreadPool는 각각 LinkedBlockingQueueSynchronousQueue를 인자로 전달하고 있다. SynchronousQueue는 내부 용량이 없어서 put 연산과 take 연산이 동시에 일어나야 한다. 때문에 사용 가능한 스레드가 없다면 새로운 스레드를 생성하고, 만일 최대 스레드 수에 도달했다면 RejectedExecutionHandler의 정책에 따라 처리한다.
newFixedThreadPoolLinkedBlockingQueue의 사이즈를 지정하지 않아서 Integer.MAX_VALUE까지 대기시킨다.
정리하자면, newFixedThreadPool은 스레드 수는 고정하는 대신 요청을 Integer.MAX_VALUE까지 큐에 쌓아 놓고, newCachedThreadPool는 요청을 큐에 저장하지 않는 대신 Integer.MAX_VALUE까지 스레드 수를 늘려서 처리한다.
 

Work Queue

public static ExecutorService newBoundedFixedThreadPool(int nThreads, int capacity) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(capacity), new ThreadPoolExecutor.DiscardPolicy()); } public static void boundedThreadPoolServerSocket() throws IOException { ServerSocket listener = new ServerSocket(8080); ExecutorService executor = newBoundedFixedThreadPool(4, 16); try { while (true) { Socket socket = listener.accept(); executor.submit( new HandleRequestRunnable(socket) ); } } finally { listener.close(); } }
앞서 newFixedThreadPool은 요청을 대기시킬 수 있는 수가 너무 많다. 때문에 ThreadPoolExecutor를 생성할 때 큐의 크기를 제한할 수 있다. 위에서는 16으로 크기를 제한하고 있다.
큐가 꽉 찼을 때, 이후 들어오는 요청들을 처리하는 정책을 정할 수 있는데 여기서는 DiscardPolicy를 사용하고 있다. 그 외에 AbortPolicyCallerRunsPolicy도 있다. 기본 설정은 예외를 던지는 AbortPolicy이다.
  • AbortPolicy
    • RejectedExecutionException을 발생
  • DiscardOldestPolicy
    • 오래된 작업을 skip. 모든 task가 무조건 처리되어야 할 필요가 없을 경우 사용.
  • DiscardPolicy
    • 처리하려는 작업을 skip. 역시 모든 task가 무조건 처리되어야 할 필요가 없을 경우 사용.
  • CallerRunsPolicy
    • shutdown 상태가 아니라면 ThreadPoolTaskExecutor에 요청한 thread에서 직접 처리
    • 예외와 누락없이 최대한 처리하려할 때 사용
 

Multiple Thread pools

MSA 환경에서는 여러 백엔드 서비스가 서로 연결하는 것이 일반적이다. 이 경우, 한 서비스의 오류가 다른 서비스로 전파되지 않도록 하는 것이 중요하다. 연결하는 서비스 별로 스레드 풀을 제공하여 각 서비스의 오류를 격리할 수 있다.
또한, 다중 스레드 풀은 교착 상태를 방지하는 데 도움이 된다. 오래 걸리는 작업에 별도 스레드 풀을 할당하면 느린 작업으로 인해 다른 작업들까지 대기해야하는 문제를 완화할 수 있다.
 
이 글은 아래 기사를 토대로 정리 및 부가 설명을 추가하여 작성하였습니다.
Share article

maro