Overview

이번시간에는 자바에서 제공하는 Executor 프레임웍과 Thread 관련해서 유용한 클래스들을 알아보는 시간을 갖도록 하겠다.

Thread Pool 사용하기

자바를 처음 배웠을때는 다음과 같이 스레드를 생성하는 방법을 사용했다.

new Thread(() -> {
    try {
	    //do something ...
    } catch (InterruptedException e) {
	    e.printStackTrace();
    }
}).start();

위 방법처럼 Thread를 직접생성해서 사용하는 방법은 다음과 같은 문제점이 있다.

  • 생성된 스레드를 효율적으로 관리할 수 없음
  • 하드웨어에 장착되어있는 프로세서보다 많은 수의 스레드가 만들어져 동작중이라면 실제로는 대부분의 스레드가 대기상태에 머무름. 대기하는 스레드가 많으면 많을수록 cpu를 사용하기 위한 경쟁상태가 더욱 심화되기때문에 더 많은 자원을 소모함
  • 너무 많은 스레드가 생성되어 OOM이 발생한 상황이면 이미 손 쓸 방법이 없음.

위와 같은 문제점의 핵심은 스레드의 수가 특정 수준을 넘어간다면 성능이 떨어진다. 라는 것이다. 따라서 애플리케이션이 만들어 낼 수 있는 스레드의 수에 제한을 두는 것이 현병한 방법이고 높은 양의 작업 요청이 왔을때도 자원이 고갈되어 멈추는 경우가 발생하지 않도록 하는게 중요하다.

자바에서는 스레드를 조금 더 효율적으로 관리하기 위한 Executor 프레임웍을 제공한다.

그중에서도 클라이언트 레벨에서 사용하기 좋은 ThreadPoolExecutor를 알아볼 예정인데 자세한 내용은 https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/ThreadPoolExecutor.html 에서 찾아볼 수 있다.

ThreadPoolExecutor를 사용하면 위에서 언급한 문제점들을 해결할 수 있고 그외에도 아래와 같은 기능들을 사용할 수 있다.

  • Core and maximum pool sizes
  • On-demand construction
  • Creating new threads
  • Keep-alive times
  • Queuing
  • Rejected tasks
  • Hook methods
  • Queue maintenance
  • Reclamation

직접 ThreadPoolExecutor을 커스터마이징하여 사용하는 방법도 있지만 이미 자바에서는 여러가지 팩토리 메서드들을 제공해주고 있다.

Executors 클래스는 여러가지 ThreadPoolExecutor 구현체를 생성해주는데 아래와 같이 필요한경우에 따라 사용하면된다.

  • Executors.newFixedThreadPool(): 처리할 작업이 등록되면 그에 따라 실제 작업할 스레드를 하나씩 생성. 생성할 수 있는 스레드의 최대 개수는 제한되어 있고 제한된 개수까지 스레드를 생성하고 나면 더 이상 생성하지 않고 스레드 수를 유지함
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
  • Executors.newCachedThreadPool(): 캐시 스레드 풀은 현재 풀에 갖고 있는 스레드의 수가 처리할 작업의 수보다 많아서 쉬는 스레드가 많이 발생할 때 쉬는 스레드를 종료시켜 훨씬 우연하게 대응할 수 있고 처리할 작업의 수가 많아지면 필요한 만큼 스레드를 새로 생성함. 반면에 스레드의 수에는 제한을 두지 않음
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
  • Executors.newSingleThreadExecutor(): 단일 스레드로 동작하는 Executor로서 작업을 처리하는 스레드가 단 하나뿐 만약 작업 중에 비정상적으로 종료되면 다시 하나를 생성해 나머지 작업을 실행함 등록된 작업은 설정된 큐의 우선순위 FIFO, LIFO 에 따라 반드시 순차적으로 처리됨
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
  • Executors.newSingleThreadExecutor(): 일정 시간 이후에 실행하거나 주기적으로 작업을 실행할 수 있고 스레드의 수가 고정되어 있는 형태
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }


...
    

    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE,
              DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
              new DelayedWorkQueue());
    }


간단한 예제로 Executors.newScheduledThreadPool()를 사용하여 스케쥴을 구성하는 예제이다.

public class ScheduledExecutorRunnable {

    public static void main(String[] args) {

        ScheduledExecutorService ses = Executors.newScheduledThreadPool(1);

        Runnable task2 = () -> System.out.println("Running task2...");

        task1();

        //run this task after 5 seconds, nonblock for task3
        ses.schedule(task2, 5, TimeUnit.SECONDS);

        task3();

        ses.shutdown();

    }

    public static void task1() {
        System.out.println("Running task1...");
    }

    public static void task3() {
        System.out.println("Running task3...");
    }

}

Running task1...
Running task3...
Running task2... //display after 5 seconds

출처 : https://mkyong.com/java/java-scheduledexecutorservice-examples/

CountDownLatch

CountDownLatch는 다른 스레드에서 수행되는 작업 집합이 완료 될 때까지 하나 이상의 스레드가 대기 할 수 있도록 지원해주는 클래스이다. 이 Latch는 일종의 관문과 같은 형태로 동작하는데 Latch가 터미널 상태에 다다르면 관문이 열리고 모든 스레드가 통과한다고 이해한다면 된다.

CountDownLatch는 다음과 같은 상황에서 유용하게 사용할 수 있다.

  • 특정 자원을 확보하기 전에는 시작하지 말아야할 작업이 있는 경우
  • 의존성을 갖고 있는 다른 서비스가 시작하기 전에는 특정 서비스가 실행되지 않도록 막아야하는 경우
  • 특정 작업에 필요한 모든 객체가 실행할 준비를 갖출 때까지 기다리는 경우

아래는 CountDownLatch의 간단한 예제이다.

package me.sup2is;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CountDownLatchExam {

    public static void main(String[] args) {

        final int nThreads = 10;
        final ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
        final CountDownLatch startGate = new CountDownLatch(1);

        for (int i = 0; i < nThreads; i++) {
            executorService.submit(() -> {
                try {
                    startGate.await(); // <- 여기에서 해당 runnable은 잠금상태
                    System.out.println("run!");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }

        System.out.println("all thread ready!");
        startGate.countDown();
        executorService.shutdown();
    }

}

CountDownLatch는 초기화한 count의 개수만큼 countDown() 메서드가 호출이 되어야 실행이 가능한 상태가 된다. 위 예제에서는 10개의 스레드를 만들고 실행시키려하지만 startGate.await() 메서드에 의해 대기?상태가되고 모든 스레드의 준비가 완료되었다는 “all thread ready!” 이후 startGate.countDown()를 이용해 초기화한 count를 0으로 만들기 때문에 생성된 Runnable들이 모두 실행되는 구조이다.

all thread ready!
run!
run!
run!
run!
run!
run!
run!
run!
run!
run!

ThreadLocal

Java에서 Thread-Safe하게 프로그래밍하기에서 소개한 방법중에 하나로 stack 한정 프로그래밍 방식이 있었다. Thread내에서 stack 공간은 다른 Thread가 침범하지 못하기 때문에 thread-safe하다고 소개한 방법인데 이 방법의 가장 큰 단점은 정말 해당 스택을 벗어나면 해당 값이 사라지게 된다는 것이다.

위 단점을 해결하기 위한 방법으로 자바는 ThreadLocal이라는 클래스를 제공한다.

ThreadLocal을 사용하면 set메서드로 저장했던 값을 현재 실행중인 스레드에서 가져올 수 있다.

자세한 사용방법은 https://javacan.tistory.com/entry/ThreadLocalUsage 에서 잘 소개하고 있다.

추가적으로 Spring Security를 사용해보았다면 SecurityContextHolder.getContext() 와 같은형태로 현재 유저에 저장된 Context를 꺼내오는 작업을 수행해본적이 있을텐데 이 방법도 ThreadLocal을 사용하는 방법이다.

SecurityContextHolder을 별다른 설정없이 사용한다면 security 전략에 ThreadLocalSecurityContextHolderStrategy라는 구현체가 들어가게 되는데 ThreadLocalSecurityContextHolderStrategy 내부엔 아래와 같은 ThreadLocal 멤버변수가 있다.

package org.springframework.security.core.context;

import org.springframework.util.Assert;

final class ThreadLocalSecurityContextHolderStrategy implements SecurityContextHolderStrategy {
    private static final ThreadLocal<SecurityContext> contextHolder = new ThreadLocal();

    ThreadLocalSecurityContextHolderStrategy() {
    }

    public void clearContext() {
        contextHolder.remove();
    }

    public SecurityContext getContext() {
        SecurityContext ctx = (SecurityContext)contextHolder.get();
        if (ctx == null) {
            ctx = this.createEmptyContext();
            contextHolder.set(ctx);
        }

        return ctx;
    }

    public void setContext(SecurityContext context) {
        Assert.notNull(context, "Only non-null SecurityContext instances are permitted");
        contextHolder.set(context);
    }

    public SecurityContext createEmptyContext() {
        return new SecurityContextImpl();
    }
}

그렇기 때문에 현재 로그인한 유저의 SecurityContextHolder를 request 내에서 쉽게 가져올 수 있다.

FutureTask

어떤 작업 A가 있고 이 작업 A가 시간이 많이 필요한 작업이라면 FutureTask를 사용해서 비동기적으로 미리 실행시켜 놓고 이후에 결과값을 받는 형태로 사용할 수 있다.

FutureTask를 사용하는 방법은 두가지가 있다.

    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }

    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }

Runnable과 함께 V 객체를 넣어준 뒤 특정 결과 이후에 다시 그 V 객체를 반환받거나 애초에 Callable<V>을 넣어줘서 특정한 객체를 받아내면 된다. Callable<V>은 리턴타입이 있는 Runnable로 생각하면 쉽다.

@FunctionalInterface
public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}

아래 예제를 살펴보자.

package me.sup2is;

import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;

public class FutureTaskExam {

    public static void main(String[] args) throws InterruptedException, ExecutionException {

        final ExecutorService executorService = Executors.newSingleThreadExecutor();

        final FutureTask<SomeJob> futureTask = new FutureTask<>(() -> {
            //work in external api ...
            Thread.sleep(4000);
            System.out.println("api call done!");
            return new SomeJob();
        });

        final Future<?> future = executorService.submit(futureTask);

        for (int i = 0; i < 3; i++) {
            Thread.sleep(1000);
            System.out.println("작업 " + (i + 1) + " 종료");
        }

        Objects.nonNull(future.get());
        executorService.shutdown();
    }

    static class SomeJob {
        public SomeJob() {
            System.out.println("created SomeJob");
        }
    }
}

위 예제의 결과는 아래와 같다.

작업 1 종료
작업 2 종료
작업 3 종료
api call done!
created SomeJob

만약 비동기 FutureTask를 사용하지 않았을경우 3초 + 4초 = 7초가 걸리는 작업이 될테지만 시간이 오래걸리는 작업을 미리 실행시켜놓는 방법으로 보다 빠르게 약 3초정도의 시간을 앞당길 수 있다.

마무리

이 글은 대부분 자바병렬프로그래밍을 참조했다.



포스팅은 여기까지 하겠습니다. 퍼가실때는 출처를 반드시 남겨주세요!


References