📌 As-is. wait & notify
💡 wait와 notify는 올바르게 사용하기가 아주 까다로우니 고수준 동시성 유틸리티를 사용하라
public class ThreadB extends Thread{
// 해당 쓰레드가 실행되면 자기 자신의 모니터링 락을 획득
// 5번 반복하면서 0.5초씩 쉬면서 total에 값을 누적
// 그후에 notify()메소드를 호출하여 wiat하고 있는 쓰레드를 깨움
@Override
public void run(){
synchronized(this){
for(int i=0; i<5 ; i++){
System.out.println(i + "를 더합니다.");
total += i;
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
notify(); // wait하고 있는 쓰레드를 깨움
}
}
}
public class ThreadA {
public static void main(String[] args){
// 앞에서 만든 쓰레드 B를 만든 후 start
// 해당 쓰레드가 실행되면, 해당 쓰레드는 run메소드 안에서 자신의 모니터링 락을 획득
ThreadB b = new ThreadB();
b.start();
// b에 대하여 동기화 블럭을 설정
// 만약 main쓰레드가 아래의 블록을 위의 Thread보다 먼저 실행되었다면 wait를 하게 되면서 모니터링 락을 놓고 대기
synchronized(b){
try{
// b.wait()메소드를 호출.
// 메인쓰레드는 정지
// ThreadB가 5번 값을 더한 후 notify를 호출하게 되면 wait에서 깨어남
System.out.println("b가 완료될때까지 기다립니다.");
b.wait();
}catch(InterruptedException e){
e.printStackTrace();
}
//깨어난 후 결과를 출력
System.out.println("Total is: " + b.total);
}
}
}
b가 완료될때까지 기다립니다.
0를 더합니다.
1를 더합니다.
2를 더합니다.
3를 더합니다.
4를 더합니다.
Total is: 10
- Thread의 상태 제어를 위한 메서드
- wait() : 가지고 있던 고유 Lock을 해제하고, Thread를 잠들게 하는 역할
- notify() : 잠들어 있던 Thread 중 임의로 하나를 골라 깨우는 역할
- 여전히 올바르게 사용하는 방법은 유효하며, 알아두어야 하지만 중요도가 예전만 못하다.
- wait과 notify를 사용해야 한다면, 동시성 유틸리티를 먼저 고려해보라
📌 고수준 유틸리티(java.util.concurrent)
- 실행자 프레임워크(Executor Service, Item 80)
- 동시성 컬렉션(Concurrent Collection)
- 동기화 장치(synchronizer)
📌 Concurrent Collection
- List, Queue, Map 같은 표준 컬렉션 인터페이스에 동시성을 가미해 구현한 고성능 Collection
- 높은 동시성 도달을 위해 synchronize를 각자의 내부에서 수행한다. (Item 79)
- Concurrent Collection에서 동시성을 무력화하는 것은 불가능하다.
- 외부에서 Lock을 추가로 사용하면 오히려 속도가 느려진다.
🟡 상태 의존적 수정 메서드
public interface Map<K, V> {
// 키에 매핑된 값이 없을 때에만 새 값을 집어넣고, 없으면 그 값을 반환한다.
default V putIfAbsent(K key, V value);
}
- 여러 기본 동작을 하나의 원자적 동작으로 묶는 메서드
- Concurrent Collection에서 동시성 무력화가 불가능하므로 여러 메서드를 원자적으로 묶어 호출할 수 없기 때문이다.
- Java 8에서는 일반 Collection Interface에도 default method(Item 21)형태로 추가되었다.
private static final ConcurrentMap<String, String> map =
new ConcurrentHashMap<>();
public static String intern(String s) {
String previousValue = map.putIfAbsent(s, s);
return previousValue == null ? s : previousValue;
}
// ConcurrentHashMap은 get 같은 검색에 더 최적화 되어 있다.
// get을 먼저 호출하면 성능을 좀 더 개선시킬 수 있다.
public static String intern(String s) {
String result = map.get(s);
if (result == null) {
result = map.putIfAbsent(s, s);
if (result == null)
result = s;
}
return result;
}
- Map Interface의 putIfAbsent() 메서드 덕에 Thread-safe한 정규화 맵(canonicalizing map)을 쉽게 구현할 수 있다.
- 동시성이 뛰어나며 String.intern와 비교해도 속도 매우 빠르다. (단, String.intern은 메모리 누수 방지 기술도 들어가 있다.)
🟡 동기화한 컬렉션(Synchronized Collection)보다 동시성 컬렉션을 사용하라
@Test
void synchronizedMap() {
// given
Map<String, Integer> counts = new HashMap<>();
Map<String, Integer> synchronizedMap = Collections.synchronizedMap(counts);
synchronizedMap.put("park", 100);
synchronizedMap.put("lee", 200);
synchronizedMap.put("kim", 300);
// when
List<String> names = Arrays.asList("park", "lee", "kim");
long start = System.nanoTime();
IntStream.rangeClosed(1, 100_000_000)
.forEach(number -> {
int index = number % 3;
synchronizedMap.get(names.get(index));
});
long end = System.nanoTime();
System.out.println("synchronizedMap execute time: " + (end - start) + "ns");
}
@Test
void concurrentHashMap() {
// given
ConcurrentHashMap<String, Integer> concurrentHashMap = new ConcurrentHashMap<>();
concurrentHashMap.put("park", 100);
concurrentHashMap.put("lee", 200);
concurrentHashMap.put("kim", 300);
// when
List<String> names = Arrays.asList("park", "lee", "kim");
long start = System.nanoTime();
IntStream.rangeClosed(1, 100_000_000)
.forEach(number -> {
int index = number % 3;
concurrentHashMap.get(names.get(index));
});
long end = System.nanoTime();
System.out.println("concurrentHashMap execute time: " + (end - start) + "ns");
}
@Test
void hashMap() {
// given
Map<String, Integer> map = new HashMap<>();
map.put("park", 100);
map.put("lee", 200);
map.put("kim", 300);
// when
List<String> names = Arrays.asList("park", "lee", "kim");
long start = System.nanoTime();
IntStream.rangeClosed(1, 100_000_000)
.forEach(number -> {
int index = number % 3;
map.get(names.get(index));
});
long end = System.nanoTime();
System.out.println("HashMap execute time: " + (end - start) + "ns");
}
synchronizedMap execute time: 1728714623ns
concurrentHashMap execute time: 672566310ns
HashMap execute time: 844882515ns
- 동기화한 컬렉션(Collections.synchronized~)은 낡은 유산이 되었다.
- synchronizedMap을 concurrentMap으로 바꾸는 것만으로도 Concurrent Application 성능은 극적으로 개선된다.
🟡 Blocking Collection
public interface BlockingQueue<E> extends Queue<E> { ... }
- Collection Interface 중 일부는 작업이 성공적으로 수행될 때까지 기다리도록 확장되었다.
- BlockingQueue의 경우 Queue의 원소를 꺼낼 때, queue가 비어있으면 새로운 원소가 추가될 때까지 대기한다.
- 이런 특성은 Job Queue로 쓰기에 적합하다.
- 생산자-소비자 큐
- 하나 이상의 생산자(producer) Thread가 작업을 queue에 추가한다.
- 하나 이상의 소비자(consumer) Thread가 queue에 있는 작업을 꺼내 처리한다.
- ThreadPoolExecutor를 포함한 대부분의 ExcutorService 구현체에서 사용하고 있다.
📌 동기화 장치
- Thread가 다른 Thread를 기다릴 수 있게 한다. (작업 조율)
- 가장 강력한 동기화 장치 Phaser, 가장 자주 쓰이는 장치 CountDownLatch, Semaphore, 그 외에 CyclicBarrier와 Exchanger가 있다.
🟡 CountDownLatch
public class CountDownLatch {
...
public CountDownLatch(int count) { // 유일한 생성자
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
...
}
- count값이 Latch의 countDown 메서드를 몇 번 호출해야 대기 중인 Thread들을 깨우는지 결정한다.
public class CountDownLatchTest {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(5);
try {
long result = time(executorService, 3, () -> System.out.println("hello"));
System.out.println("총 걸린 시간 : " + result);
} catch (Exception e) {
e.printStackTrace();
} finally {
executorService.shutdown();
}
}
public static long time(Executor executor, int concurrency, Runnable action) throws InterruptedException {
CountDownLatch ready = new CountDownLatch(concurrency);
CountDownLatch start = new CountDownLatch(1);
CountDownLatch done = new CountDownLatch(concurrency);
for (int i = 0; i < concurrency; i++) {
executor.execute(() -> {
ready.countDown(); // 타이머에게 준비가 됐음을 알린다.
try {
// 모든 작업자 스레드가 준비될 때까지 기다린다.
start.await();
action.run();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
// 타이머에게 작업을 마쳤음을 알린다.
done.countDown();
}
});
}
ready.await(); // 모든 작업자가 준비될 때까지 기다린다.
long startNanos = System.nanoTime();
start.countDown(); // 작업자들을 깨운다.
done.await(); // 모든 작업자가 일을 끝마치기를 기다린다.
return System.nanoTime() - startNanos;
}
}
InterruptedException이 발생했을 때, Thread.currentThread().interrupt()로 Interrupt를 되살리고
자신은 run 메서드에서 빠져나와야 Executor가 Interrupt를 적절하게 처리할 수 있다.
hello
hello
hello
총 걸린 시간 : 316400
어떤 동작들을 동시에 시작해 모두 완료하기까지 시간을 재는 간단한 프레임워크를 구현했다.
- Executor와 동시성 수준(concurrency, 동작을 몇 개나 동시에 수행할 수 있는지)을 매개변수로 받는다.
- concurrency 개수만큼 Executor Thread를 생성하고 Thread Pool에 할당한다.
- ready.countDown() : 타이머에게 준비가 되었음을 알린다.
- start.await() : 모든 Executor Thread가 시작될 때까지 기다린다
- action.run() : Runnable action을 실핸한다
- done.countDown() : 타이머에게 작업을 마쳤음을 알린다.
- ready.await() : 모든 Executor Thread가 준비될 때까지 대기한다.
- ready.countDown()이 시작되면 대기하던 Thread들이 동작한다.
- 마지막 Executor Thread가 작업을 마치면 시계가 멈춘다.
위 코드는 CyclicBarrier(혹은 Phaser) 인스턴스 하나로 대체할 수 있으나, 이해하기가 힘들어질 것이다.
🟡 쓰레드 기아 교착상태(Thread Starvation Deadlock)
- time 메서드에 넘겨진 Executor는 concurrency 매개변수로 지정한 동시성 수준만큼의 Thread를 생성할 수 있어야 한다.
- 그렇지 않으면 countDown이 0에 도달하지 못하므로 메서드가 결코 끝나지 않는다.
✒️ 시간 측정
💡 시간 간격을 잴 때는 항상 System.currentTimeMillis가 아닌 System.nanoTime을 사용하라
- 더 정확하고 정밀하며 시스템의 실시간 시계의 시간 보정에 영향을 받지 않는다.
- 1초 미만의 시간이 걸리는 작업이라면 jmh같은 특수 프레임워크를 사용하라
📌 Legacy Code
어쩔 수 없이 wait & notify를 사용해야 하는 경우
synchronized (obj) {
while (조건이 충족되지 않았다) {
obj.wait(); // 락을 놓고, 깨어나면 다시 잡는다.
}
... // 조건이 충족됐을 때의 동작을 수행한다.
}
- wait()
- Thread가 어떤 조건이 충족되기를 기다리게 할 때 사용한다.
- Lock 객체의 wait 메서드는 반드시 객체를 잠근 synchronized 영역 안에서 호출해야 한다.
- wait 메서드를 사용할 때는 반드시 대기 반복문(wait loop) 관용구를 사용하고, 반복문 밖에서는 절대 호출하지 마라
- 반복문은 wait 호출 전후로 조건이 만족하는지 검사한다.
- 호출 전 검사
- 조건이 충족되었다면 wait을 건너뛰게 하여응답 불가 상태를 예방한다.
- 만약 조건이 충족되었는데 notify가 먼저 호출되고 wait을 호출해버리면 Thread를 다시 깨울 수 있다고 보장할 수 없다.
- 호출 후 검사
- Safety Failure를 예방한다.
- 조건이 충족되지 않았는데 Thread가 동작을 이어가면 Lock이 보호하는 불변식을 깨뜨릴 위험이 있다.
- Thread가 notify를 호출 → 대기 중이던 Thread가 깨어나는 사이 다른 Thread가 Lock 획득 → 해당 Lock이 보호하는 상태를 변경해버린다.
- 조건이 만족되지 않았음에도 다른 Thread에서 실수 혹은 악의적으로 notify를 호출하는 경우(공개된 객체를 Lock으로 사용해 대기하는 클래스는 이런 위험에 노출될 수 있다. 외부에 노출된 객체의 synchronized method 안에서 호출하는 wait은 모두 이 문제에 영향을 받는다.)
- 대기 중인 Thread 중 일부만 조건이 충족해도 notifyAll을 호출해버린 경우
- 허위 각성(spurious wakeup)으로 인해 notify 없이 깨어난 Thread
- 호출 전 검사
- 일반적으로 notify보다 notifyAll을 사용하는 게 합리적이고 안전하다.
- 깨어나야 하는 모든 Thread가 깨어남을 보장하므로 항상 정확한 결과를 얻는다.
- 다른 Thread까지 깨어날 수 있으나, 조건이 충족되지 않으면 다시 잠들 것이므로 프로그램 정확성에 큰 문제는 없다.
- notifyAll은 모든 Thread를 깨워버리므로, notify로 인해 관련 없는 Thread가 실수 혹은 악의적으로 호출되는 경우를 막을 수 있다.
- 모든 Thread가 같은 조건을 기다리고, 조건이 한 번 충족될 때 단 하나의 Thread만 혜택을 받는 특수한 경우엔 notify로 최적화할 수 있다.