Reference/Effective-Java

[Effective-Java] Chapter11 #81. wait와 notify보다는 동시성 유틸리티를 애용하라

나죽못고나강뿐 2023. 8. 11. 13:55
📌 As-is. wait & notify
💡 wait와 notify는 올바르게 사용하기가 아주 까다로우니 고수준 동시성 유틸리티를 사용하라
public class ThreadB extends Thread{
    // 해당 쓰레드가 실행되면 자기 자신의 모니터링 락을 획득
    // 5번 반복하면서 0.5초씩 쉬면서 total에 값을 누적
    // 그후에 notify()메소드를 호출하여 wiat하고 있는 쓰레드를 깨움
    @Override
    public void run(){
        synchronized(this){
            for(int i=0; i<5 ; i++){
                System.out.println(i + "를 더합니다.");
                total += i;
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            notify(); // wait하고 있는 쓰레드를 깨움
        }
    }
}
public class ThreadA {
    public static void main(String[] args){
        // 앞에서 만든 쓰레드 B를 만든 후 start 
        // 해당 쓰레드가 실행되면, 해당 쓰레드는 run메소드 안에서 자신의 모니터링 락을 획득
        ThreadB b = new ThreadB();
        b.start();

        // b에 대하여 동기화 블럭을 설정
        // 만약 main쓰레드가 아래의 블록을 위의 Thread보다 먼저 실행되었다면 wait를 하게 되면서 모니터링 락을 놓고 대기       
        synchronized(b){
            try{
                // b.wait()메소드를 호출.
                // 메인쓰레드는 정지
                // ThreadB가 5번 값을 더한 후 notify를 호출하게 되면 wait에서 깨어남
                System.out.println("b가 완료될때까지 기다립니다.");
                b.wait();
            }catch(InterruptedException e){
                e.printStackTrace();
            }

            //깨어난 후 결과를 출력
            System.out.println("Total is: " + b.total);
        }
    }
}
b가 완료될때까지 기다립니다.
0를 더합니다.
1를 더합니다.
2를 더합니다.
3를 더합니다.
4를 더합니다.
Total is: 10
  • Thread의 상태 제어를 위한 메서드
    • wait() : 가지고 있던 고유 Lock을 해제하고, Thread를 잠들게 하는 역할
    • notify() : 잠들어 있던 Thread 중 임의로 하나를 골라 깨우는 역할
  • 여전히 올바르게 사용하는 방법은 유효하며, 알아두어야 하지만 중요도가 예전만 못하다.
  • wait과 notify를 사용해야 한다면, 동시성 유틸리티를 먼저 고려해보라

 

📌 고수준 유틸리티(java.util.concurrent)
  1. 실행자 프레임워크(Executor Service, Item 80)
  2. 동시성 컬렉션(Concurrent Collection)
  3. 동기화 장치(synchronizer)

 

📌 Concurrent Collection
  • List, Queue, Map 같은 표준 컬렉션 인터페이스에 동시성을 가미해 구현한 고성능 Collection
  • 높은 동시성 도달을 위해 synchronize를 각자의 내부에서 수행한다. (Item 79)
  • Concurrent Collection에서 동시성을 무력화하는 것은 불가능하다.
  • 외부에서 Lock을 추가로 사용하면 오히려 속도가 느려진다.

 

🟡 상태 의존적 수정 메서드

public interface Map<K, V> {
    // 키에 매핑된 값이 없을 때에만 새 값을 집어넣고, 없으면 그 값을 반환한다.
    default V putIfAbsent(K key, V value);
}
  • 여러 기본 동작을 하나의 원자적 동작으로 묶는 메서드
    • Concurrent Collection에서 동시성 무력화가 불가능하므로 여러 메서드를 원자적으로 묶어 호출할 수 없기 때문이다.
    • Java 8에서는 일반 Collection Interface에도 default method(Item 21)형태로 추가되었다.

 

private static final ConcurrentMap<String, String> map =
        new ConcurrentHashMap<>();
        
public static String intern(String s) {
    String previousValue = map.putIfAbsent(s, s);
    return previousValue == null ? s : previousValue;
}
// ConcurrentHashMap은 get 같은 검색에 더 최적화 되어 있다.
// get을 먼저 호출하면 성능을 좀 더 개선시킬 수 있다.
public static String intern(String s) {
    String result = map.get(s);
    if (result == null) {
        result = map.putIfAbsent(s, s);
        if (result == null)
            result = s;
    }
    return result;
}
  • Map Interface의 putIfAbsent() 메서드 덕에 Thread-safe한 정규화 맵(canonicalizing map)을 쉽게 구현할 수 있다.
  • 동시성이 뛰어나며 String.intern와 비교해도 속도 매우 빠르다. (단, String.intern은 메모리 누수 방지 기술도 들어가 있다.)

 

🟡 동기화한 컬렉션(Synchronized Collection)보다 동시성 컬렉션을 사용하라

@Test
void synchronizedMap() {
    // given
    Map<String, Integer> counts = new HashMap<>();
    Map<String, Integer> synchronizedMap = Collections.synchronizedMap(counts);

    synchronizedMap.put("park", 100);
    synchronizedMap.put("lee", 200);
    synchronizedMap.put("kim", 300);

    // when
    List<String> names = Arrays.asList("park", "lee", "kim");

    long start = System.nanoTime();
    IntStream.rangeClosed(1, 100_000_000)
            .forEach(number -> {
                int index = number % 3;
                synchronizedMap.get(names.get(index));
            });
    long end = System.nanoTime();

    System.out.println("synchronizedMap execute time: " + (end - start) + "ns");
}

@Test
void concurrentHashMap() {
    // given
    ConcurrentHashMap<String, Integer> concurrentHashMap = new ConcurrentHashMap<>();

    concurrentHashMap.put("park", 100);
    concurrentHashMap.put("lee", 200);
    concurrentHashMap.put("kim", 300);

    // when
    List<String> names = Arrays.asList("park", "lee", "kim");

    long start = System.nanoTime();
    IntStream.rangeClosed(1, 100_000_000)
            .forEach(number -> {
                int index = number % 3;
                concurrentHashMap.get(names.get(index));
            });
    long end = System.nanoTime();

    System.out.println("concurrentHashMap execute time: " + (end - start) + "ns");
}

@Test
void hashMap() {
    // given
    Map<String, Integer> map = new HashMap<>();

    map.put("park", 100);
    map.put("lee", 200);
    map.put("kim", 300);

    // when
    List<String> names = Arrays.asList("park", "lee", "kim");

    long start = System.nanoTime();
    IntStream.rangeClosed(1, 100_000_000)
            .forEach(number -> {
                int index = number % 3;
                map.get(names.get(index));
            });
    long end = System.nanoTime();

    System.out.println("HashMap execute time: " + (end - start) + "ns");
}
synchronizedMap execute time: 1728714623ns
concurrentHashMap execute time: 672566310ns
HashMap execute time: 844882515ns
  • 동기화한 컬렉션(Collections.synchronized~)은 낡은 유산이 되었다.
  • synchronizedMap을 concurrentMap으로 바꾸는 것만으로도 Concurrent Application 성능은 극적으로 개선된다. 

 

🟡 Blocking Collection

public interface BlockingQueue<E> extends Queue<E> { ... }
  • Collection Interface 중 일부는 작업이 성공적으로 수행될 때까지 기다리도록 확장되었다.
    • BlockingQueue의 경우 Queue의 원소를 꺼낼 때, queue가 비어있으면 새로운 원소가 추가될 때까지 대기한다.
  • 이런 특성은 Job Queue로 쓰기에 적합하다.
    • 생산자-소비자 큐
    • 하나 이상의 생산자(producer) Thread가 작업을 queue에 추가한다.
    • 하나 이상의 소비자(consumer) Thread가 queue에 있는 작업을 꺼내 처리한다.
  • ThreadPoolExecutor를 포함한 대부분의 ExcutorService 구현체에서 사용하고 있다.

 

📌 동기화 장치
  • Thread가 다른 Thread를 기다릴 수 있게 한다. (작업 조율)
  • 가장 강력한 동기화 장치 Phaser, 가장 자주 쓰이는 장치 CountDownLatch, Semaphore, 그 외에 CyclicBarrier와 Exchanger가 있다.

 

🟡 CountDownLatch

public class CountDownLatch {
    ...
    public CountDownLatch(int count) { // 유일한 생성자
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }
    ...
}
  • count값이 Latch의 countDown 메서드를 몇 번 호출해야 대기 중인 Thread들을 깨우는지 결정한다.

 

public class CountDownLatchTest {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        try {
            long result = time(executorService, 3, () -> System.out.println("hello"));
            System.out.println("총 걸린 시간 : " + result);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            executorService.shutdown();
        }
    }

    public static long time(Executor executor, int concurrency, Runnable action) throws InterruptedException {
        CountDownLatch ready = new CountDownLatch(concurrency);
        CountDownLatch start = new CountDownLatch(1);
        CountDownLatch done = new CountDownLatch(concurrency);

        for (int i = 0; i < concurrency; i++) {
            executor.execute(() -> {
                ready.countDown(); // 타이머에게 준비가 됐음을 알린다.
                try {
                    // 모든 작업자 스레드가 준비될 때까지 기다린다.
                    start.await();
                    action.run();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    // 타이머에게 작업을 마쳤음을 알린다.
                    done.countDown();
                }
            });
        }
    
        ready.await(); // 모든 작업자가 준비될 때까지 기다린다.
        long startNanos = System.nanoTime();
        start.countDown(); // 작업자들을 깨운다.
        done.await(); // 모든 작업자가 일을 끝마치기를 기다린다.
        return System.nanoTime() - startNanos;
    }
}

InterruptedException이 발생했을 때, Thread.currentThread().interrupt()로 Interrupt를 되살리고

자신은 run 메서드에서 빠져나와야 Executor가 Interrupt를 적절하게 처리할 수 있다.

hello
hello
hello
총 걸린 시간 : 316400

어떤 동작들을 동시에 시작해 모두 완료하기까지 시간을 재는 간단한 프레임워크를 구현했다.

  1. Executor와 동시성 수준(concurrency, 동작을 몇 개나 동시에 수행할 수 있는지)을 매개변수로 받는다.
  2. concurrency 개수만큼 Executor Thread를 생성하고 Thread Pool에 할당한다.
    • ready.countDown() : 타이머에게 준비가 되었음을 알린다.
    • start.await() : 모든 Executor Thread가 시작될 때까지 기다린다
    • action.run() : Runnable action을 실핸한다
    • done.countDown() : 타이머에게 작업을 마쳤음을 알린다.
  3. ready.await() : 모든 Executor Thread가 준비될 때까지 대기한다.
  4. ready.countDown()이 시작되면 대기하던 Thread들이 동작한다.
  5. 마지막 Executor Thread가 작업을 마치면 시계가 멈춘다.

위 코드는 CyclicBarrier(혹은 Phaser) 인스턴스 하나로 대체할 수 있으나, 이해하기가 힘들어질 것이다.

 

🟡 쓰레드 기아 교착상태(Thread Starvation Deadlock)

  • time 메서드에 넘겨진 Executor는 concurrency 매개변수로 지정한 동시성 수준만큼의 Thread를 생성할 수 있어야 한다.
  • 그렇지 않으면 countDown이 0에 도달하지 못하므로 메서드가 결코 끝나지 않는다.

 

✒️ 시간 측정

💡 시간 간격을 잴 때는 항상 System.currentTimeMillis가 아닌 System.nanoTime을 사용하라
  • 더 정확하고 정밀하며 시스템의 실시간 시계의 시간 보정에 영향을 받지 않는다.
  • 1초 미만의 시간이 걸리는 작업이라면 jmh같은 특수 프레임워크를 사용하라

 

📌 Legacy Code

어쩔 수 없이 wait & notify를 사용해야 하는 경우

synchronized (obj) {
    while (조건이 충족되지 않았다) {
        obj.wait(); // 락을 놓고, 깨어나면 다시 잡는다.
    }

    ... // 조건이 충족됐을 때의 동작을 수행한다.
}
  • wait()
    • Thread가 어떤 조건이 충족되기를 기다리게 할 때 사용한다.
    • Lock 객체의 wait 메서드는 반드시 객체를 잠근 synchronized 영역 안에서 호출해야 한다.
  • wait 메서드를 사용할 때는 반드시 대기 반복문(wait loop) 관용구를 사용하고, 반복문 밖에서는 절대 호출하지 마라
  • 반복문은 wait 호출 전후로 조건이 만족하는지 검사한다.
    1. 호출 전 검사 
      • 조건이 충족되었다면 wait을 건너뛰게 하여응답 불가 상태를 예방한다.
      • 만약 조건이 충족되었는데 notify가 먼저 호출되고 wait을 호출해버리면 Thread를 다시 깨울 수 있다고 보장할 수 없다.
    2. 호출 후 검사
      • Safety Failure를 예방한다.
      • 조건이 충족되지 않았는데 Thread가 동작을 이어가면 Lock이 보호하는 불변식을 깨뜨릴 위험이 있다.
        • Thread가 notify를 호출 → 대기 중이던 Thread가 깨어나는 사이 다른 Thread가 Lock 획득 → 해당 Lock이 보호하는 상태를 변경해버린다.
        • 조건이 만족되지 않았음에도 다른 Thread에서 실수 혹은 악의적으로 notify를 호출하는 경우(공개된 객체를 Lock으로 사용해 대기하는 클래스는 이런 위험에 노출될 수 있다. 외부에 노출된 객체의 synchronized method 안에서 호출하는 wait은 모두 이 문제에 영향을 받는다.)
        • 대기 중인 Thread 중 일부만 조건이 충족해도 notifyAll을 호출해버린 경우
        • 허위 각성(spurious wakeup)으로 인해 notify 없이 깨어난 Thread
  • 일반적으로 notify보다 notifyAll을 사용하는 게 합리적이고 안전하다.
    • 깨어나야 하는 모든 Thread가 깨어남을 보장하므로 항상 정확한 결과를 얻는다.
    • 다른 Thread까지 깨어날 수 있으나, 조건이 충족되지 않으면 다시 잠들 것이므로 프로그램 정확성에 큰 문제는 없다.
    • notifyAll은 모든 Thread를 깨워버리므로, notify로 인해 관련 없는 Thread가 실수 혹은 악의적으로 호출되는 경우를 막을 수 있다.
    • 모든 Thread가 같은 조건을 기다리고, 조건이 한 번 충족될 때 단 하나의 Thread만 혜택을 받는 특수한 경우엔 notify로 최적화할 수 있다.