parallel과 sequential 두 메서드 중 최종적으로 호출된 메서드가 전체 파이프라인에 영향을 미친다.
7.1.2 스트림 성능 측정
병렬화를 이용하면 순차나 반복 형식에 비해 성능이 더 좋아질 것이라 추측했지만 성능 최적화 시에는 반드시 측정이 필요하다.
자바 마이크로벤치마크 하니스(JMH) 라이브러리를 이용해 벤치마크를 구현해보도록 한다.
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@Fork(value = 2, jvmArgs = {"-Xms4G", "-Xmx4G"})
@State(Scope.Thread)
public class ParallelStreamBenchmark {
private static final long N = 10_000_000L;
@Benchmark
public long sequentialSum(){
return Stream.iterate(1L, i -> i + 1).limit(N)
.reduce(0L, Long::sum);
}
@Benchmark
public long iterativeSum(){
long result = 0;
for(long i = 1L; i <= N; i++){
result += i;
}
return result;
}
@Benchmark
public long parallelSum(){
return Stream.iterate(1L, i -> i + 1).limit(N)
.parallel().reduce(0L, Long::sum);
}
@TearDown(Level.Invocation)
public void tearDown(){
System.gc();
}
}
병렬 버전인 parallelSum 메서드가 쿼드 코어 CPU를 활용하지 못하고 순차 버전인 sequential 메서드에 비해 느린 결과가 도출되는 원인
반복 결과로 박싱된 객체가 만들어지므로 숫자를 더하려면 언박싱을 해야 한다.
반복 작업은 병렬로 수행할 수 있는 독립 단위로 나누기가 어렵다.
iterate 중간 연산은 이전 연산의 결과에 따라 다음 함수의 입력이 달라지기 때문에 청크로 분할하기가 어렵다.
iterate는 본질적으로 순차적인 작업이다.
리듀싱 과정을 시작하는 시점에 전체 숫자 리스트가 준비되지 않았으므로 스트림을 병렬로 처리할 수 있도록 청크로 분할하는 것이 불가능하다.
스트림이 병렬로 처리되도록 지시했고 각각의 합계가 다른 스레드에서 수행되었지만 결국 실질적으로는 순차 처리 방식과 크게 다른 점이 없으므로 스레드를 할당하는 오버헤드만 증가하게 된다.
병렬 처리와는 거리가 먼 순차 반복 작업과 같은 경우에 병렬 프로그래밍을 오용하면 오히려 전체 프로그램 성능이 나빠질 수 있다. => parallel 메서드를 호출했을 때 내부적으로 어떤 일이 일어나는지 꼭 이해해야 한다.
더 특화된 메서드 사용
LongStream.rangeClosed 메서드를 사용하면 멀티코어 프로세서를 활용해서 효과적으로 합계 연산을 병렬로 실해할 수 있다.
LongStream.rangeClosed가 iterate에 비해 병령 처리에 있어 갖는 장점
LongStream.rangeClosed는 기본형 long을 직접 사용하므로 박싱과 언박싱 오버헤트가 사라진다.
LongStream.rangeClosed는 쉽게 청크로 분할할 수 있는 순차 범위를 생산한다.
@Benchmark
public long rangedSum(){
return LongStream.rangeClosed(1, N).reduce(0L, Long::sum);
}
기존의 iterate 팩토리 메서드로 생성한 순차 버전(sequentialSum)에 비해 rangedSum에서의 숫자 스트림 처리 속도가 훨씬 빠르다.
특화되지 않은 스크림을 처리할 때는 오톡박싱, 언박싱 등의 오버헤드를 수반하기 때문이다.
상황에 따라서는 알고리즘을 병렬화하는 것보다 적절한 자료구조를 선택하는 것이 더 중요하다.
@Benchmark
public long parallelRangedSum(){
return LongStream.rangeClosed(1,N).parallel().reduce(0L, Long::sum);
}
특화된 스트림에 병렬 스트림을 적용하면 순차 실행보다 빠른 성능을 갖는 병렬 리듀싱을 만들 수 있게 된다.
올바른 자료구조를 선택해야 병렬 실행도 최적의 성능을 발휘할 수 있는 것이다.
함수형 프로그래밍을 올바로 사용하면 반복적으로 코드를 실행하는 방법에 비해 최신 멀티 코어 CPU가 제공하는 병렬 실행의 힘을 단순하게 직접적으로 얻을 수 있다.
병렬화가 완전 공짜는 아니다.
병렬화를 이용하려면 스트림을 재귀적으로 분할해야 하고, 각 서브 스트림을 서로 다른 스레드의 리듀싱 연산으로 할당하고, 이들 결과를 하나의 값으로 합쳐야 하는데
멀티코어 간의 데이터 이동은 비용이 비싸다.
코어 간에 데이터 전송 시간보다 훨씬 오래 걸리는 작업만 병렬로 다른 코어에서 수행하는 것이 바람직하다.
상황에 따라 쉽게 병렬화를 이용할 수 있거나 아니면 아예 병렬화를 이용할 수 없는 때도 있다.
7.1.3 병렬 스트림의 올바른 사용법
병렬 스트림을 잘못 사용하면서 발생하는 많은 문제는 공유된 상태를 바꾸는 알고리즘을 사용하기 때문에 일어난다.
ParallelStreams의 sideEffectParallelSum 메서드에서 LongStream.rangeClosed 병렬 스트림에 forEach 연산을 적용할 시 하나의 Accumulator 객체 안의 변수 total에 대해서 데이터 레이스 문제가 발생한다.(Race Condition 발생)
동기화로 문제를 해결하다보면 결국엔 병렬화라는 특성이 없어져 버릴 것이다.
실제로 sideEffectParallelSum 메서드를 실행 시, 메서드의 성능은 둘째 치고, 올바른 결과값(50000005000000)이 나오지 않는다.
여러 스레드에서 동시에 하나의 누적자, 즉 total += value를 실행하면서 이런 문제가 발생한다.
여러 스레드에서 공유하는 객체의 상태를 바꾸는 forEach 블록 내부에서 add 메서드를 호출하면서 이 같은 문제가 발생한다.
7.1.4 병렬 스트림 효과적으로 사용하기
스트림 요소의 양을 기준으로 병렬 스트림 사용을 결절하는 것은 적절하지 않다.
병렬 스트림을 효과적으로 사용하기 위한 기준
확신이 서지 않으면 직접 성능을 측정하라. 언제나 병렬 스트림이 순차 스트림보다 빠른 것은 아니다.
순차 스트림보다 병렬 스트림에서 성능이 떨어지는 연산이 있다. limit이나 findFirst처럼 요소의 순서에 의존하는 연산을 병렬 스트림에서 수행하려면 비싼 비용을 치러야 한다. 정렬된 스트림에 unordered를 호출하면 비정렬된 스트림을 얻을 수 있다.
스트림에서 수행하는 전체 파이프라인 연산 비용을 고려하라. 처리해야 할 요소 수(N)* 하나의 요소를 처리하는 데 드는 비용(Q), Q가 높아지면 병렬 스트림으로 성능을 개선할 수 있다.
소량의 데이터에서는 병렬 스트림이 도움되지 않는다.
스트림을 구성하는 자료구조가 적절한지 확인하라. ArrayList는 LinkedList보다 효율적으로 분할할 수 있다. range 팩토리 메서드로 만든 기본형 스트림도 쉽게 분해할 수 있다.
7.스트림의 특성과 파이프라인의 중간 연산이 스트림의 특성을 어떻게 바꾸는지에 따라 분해 과정의 성능이 달라질 수 있다. SIZED 스트림은 정확히 같은 크기의 두 스트림으로 분할할 수 있지만 filter 연산이 있으면 스트림의 길이를 예측할 수 없으므로 효과적으로 스트림을 병렬 처리할 수 있을지 알 수 없다
최종 연산의 병합 과정 비용을 살펴보라. 병합 과정의 비용이 비싸다면 병렬 스트림으로 얻게 되는 성능의 이익이 서브스트림의 부분 결과를 합치는 과정에서 상쇄될 수 있다.
7.2 포크/조인 프레임워크
7.2.1 RecursiveTask 활용
포크/조인 프레임워크는 병렬화할 수 있는 작업을 재귀적으로 작은 작업으로 분할한 다음에 서브태스크 각각의 결과를 합쳐서 전체 결과를 만들도록 설계
포크/조인 프레임워크에서는 서브태스크를 스레드 풀의 작업자 스레드에 분산 할당하는ExecutorService 인터페이스를 구현한다.(AbstractExecutorService 클래스 이용)
스레드 풀(ForkJoinPool)을 이용하려면 RecursiveTask< V >의 서브클래스를 만들어야 하며, V는는 병렬화된 태스크가 생성하는 결과 형식
결과가 없을 때는 RecursiveAction 타입의 서브클래스를 만들어야 한다.
RecursiveTask를 선언하려면 추상 메서드 compute를 구현해야 한다.
compute 메서드는 태스크를 서브태스크로 분할하는 로직과 더 이상 분할할 수 없을 때 개별 태스크의 결과를 생산할 알고리즘을 구현해야 한다.
@Override
protected V compute() {
if(태스크가 충분히 작거나 더 이상 분할할 수 없으면){
순차적으로 태스크 계산
} else {
태스크를 두 서브태스크로 분할
태스크가 다시 서브태스크로 분할되도록 compute 메서드를 재귀적으로 호출
모든 서브태스크의 연산이 완료될 때까지 기다림
각 서브태스크의 결과를 합침
}
}
public class ForkJoinSumCalculator extends RecursiveTask<Long> {
private final long[] numbers;
private final int start;
private final int end;
public static final long THRESHOLD = 10_000;
public ForkJoinSumCalculator(long[] numbers) {
this(numbers, 0, numbers.length);
}
private ForkJoinSumCalculator(long[] numbers, int start, int end) {
this.numbers = numbers;
this.start = start;
this.end = end;
}
@Override
protected Long compute(){
int length = end - start;
if(length < THRESHOLD) {
return computeSequentially(); // 기준값과 같거나 작으면 순차적으로 계산
}
ForkJoinSumCalculator leftTask =
new ForkJoinSumCalculator(numbers, start, start + length/2);
leftTask.fork();
// ForkJoinPool의 다른 스레드로 새로 생성한 서브태스크를 비동기 실행
ForkJoinSumCalculator rightTask =
new ForkJoinSumCalculator(numbers, start + length/2, end);
Long rightResult = rightTask.compute();
// 두 번째 서브태스크를 동기 실행한다. 추가로 분할이 일어날 수 있다.
Long leftResult = leftTask.join();
// 첫 번째 서브태스크의 결과를 읽거나 아직 결과가 없으면 기다린다.
return leftResult + rightResult;
// 두 서브태스크의 결과를 조합한 값이 이 태스크의 결과다.
}
private long computeSequentially() {
long sum = 0;
for(int i = start; i < end; i++) {
sum += numbers[i];
}
return sum;
}
public static long forkJoinSum(long n) {
long[] numbers = LongStream.rangeClosed(1,n).toArray();
ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);
return new ForkJoinPool().invoke(task);
}
}
ForkJoinPool을 만들면서 인수가 없는 디폴트 생성자를 이용했는데, 이는 JVM에서 이용할 수 있는 모든 프로세서가 자유롭게 풀에 접근할 수 있음을 의미
더 정확하게는 Runtime.availableProcessors의 반환값으로 풀에 사용할 스레드 수를 결정하게 되는 것이다.
ForkJoinSumCalculator 실행
ForkJoinSumCalculator를 ForkJoinPool로 전달하면 풀의 스레드가 ForkJoinSumCalculator의 compute 메서드를 실행하면서 작업을 수행
compute 메서드는 병렬로 실행할 수 있을만큼 태스크의 크기가 충분히 작아졌는지 확인
아직 태스크의 크기가 크다고 판단되면 숫자 배열을 반으로 분할해서 두 개의 새로운 ForkJoinSumCalculator로 할당 => 다시 ForkJoinPool이 새로 생성된 ForkJoinSumCalculator를 실행
위 과정이 재귀적으로 반복되면서 태스크의 크기가 충분히 작아질 때까지 태스크 분할을 반복
최종적으로 각 서브태스크는 순차적으로 처리되며 포킹 프로세스로 만들어진 이진트리의 태스크를 루트로부터 역순으로 방문한다.
각 서브태스크의 부분 결과를 합쳐서 태스크의 최종 결과를 계산하게 되는 것이다.
7.2.2 포크/조인 프레임워크를 제대로 사용하는 방법
join 메서드를 태스크에 호출하면 태스크가 생산하는 결과가 준비될 때까지 호출자를 블록시키기 때문에 두 서브태스크가 모두 시작된 다음에 join을 호출해야 한다. 그렇지 않으면 각각의 서브태스크가 다른 태스크가 끝나길 기다리는 일이 발생한다.
순차 코드에서 병렬 계산을 시작할 때만 ForkJoinPool의 invoke 메서드를 사용한다. RecursiveTask 내에서는 compute나 fork 메서드로 직접 호출할 수 있다.
왼쪽 작업과 오른쪽 작업 모두에 fork 메서드를 호출하는 것이 자연스러울 것 같지만 한쪽 작업에는 fork를 호출하는 것보다 compute를 호출하는 것이 효율적이다. 두 서브태스크 중 한 태스크에는 같은 스레드를 재사용할 수 있으므로 풀에서 불필요한 태스크를 할당하는 오버헤드를 피할 수 있다.
포크/조인 프레임워크에서는 fork라 불리는 다른 스레드에서 compute를 호출하므로 IDE의 디버깅 시의 스택 트레이스가 도움이 되지 않기 때문에 포크/조인 프레임워크를 이용하는 병렬 계산은 디버깅이 어렵다.
병렬 스트림과 마찬가지로 멀티코어에 포크/조인 프레임워크를 사용하는 것이 순차처리보다 무조건 빠를 거라는 생각은 버려야 한다.
7.2.3 작업 훔치기
포크/조인 분할에서는 주어진 서브태스크를 더 분할할 것인지 결정할 기준을 정해야 한다.
ForkJoinSumCalculator 예제에서는 덧셈을 수행할 숫자가 만 개 이하면 서브태스크 분할을 중단
천만 개의 항목을 포함하는 배열을 사용하면 ForkJoinSumCalculator는 천 개 이상의 서브태스크를 포크할 것이다.
대부분의 기기에는 코어가 그리 많지 않으므로 천 개 이상의 서브태스크는 자원만 낭비하는 것 같아 보이지만 실제로는 코어 개수와 관계없이 적절한 크기로 분할된 많은 태스크를 포킹하는 것이 바람직하다.
이론적으로는 코어 개수만큼 병렬화된 태스크로 작업 부하를 분할하면 모든 코어에서 태스크를 실행할 것이고 크기가 같은 각각의 태스크는 같은 시간에 종료될 것이라 생각할 수 있지만 복잡한 시나리오가 사용되는 현실에서는 각각의 서브태스크의 작업완료 시간이 크게 달라질 수 있다.
포크/조인 프레임워크에서는 작업 훔치기라는 기법으로 이 문제를 해결한다.
작업 훔치기 기법에서는 ForkJoinPool의 모든 스레드에 대해 서브태스크를 거의 공정하게 분할
각각의 스레드는 자신에게 할당된 태스크를 포함하는 이중 연결 리스트를 참조하면서 태스크가 하나가 끝날 때마다 큐의 헤드에서 다른 태스크를 가져와서 작업을 처리
한 스레드는 다른 스레드보다 자신에게 할당된 서브태스크들을 더 빨리 처리 가능
즉, 다른 스레드는 바쁘게 일하는 와중에 한 스레드는 할일이 다 떨어진 상황 발생 가능
할 일이 없어진 스레드는 유휴 상태로 바뀌는 것이 아니라 다른 스레드의 큐의 꼬리에서 작업을 훔쳐온다.
모든 스레드가 작업을 끝낼 때까지, 즉 모든 큐가 빌 때까지 이 과정을 반복
풀에 있는 작업자 스레드의 태스크를 재분배하고 균형을 맞출 때 작업 훔치기 알고리즘 사용
7.3 Spliterator 인터페이스
Iterator처럼 Spliterator는 소스의 요소 탐색 기능을 제공, 다만 Spliterator는 병렬 작업에 특화
자바 8은 컬렉션 프레임워크에 포함된 모든 자료구조에 사용할 수 있는 Spliterator 인터페이스의 디폴트 구현을 제공한다.
컬렉션은 spliterator라는 메서드를 제공하는 Spliterator 인터페이스를 구현한다.
public interface Spliterator<T> {
boolean tryAdvance(Consumer<? super T> action);
Spliterator<T> trySplit();
long estimateSize();
int characteristics();
}
tryAdvance 메서드는 Spliterator의 요소를 하나씩 순차적으로 소비하면서 탐색해야할 요소가 남아있으면 참을 반환
trySplit 메서드는 Spliterator의 일부 요소를 분할해서 두 번째 Spliterator를 생성하는 메서드
estimateSize 메서드로 탐색해야 할 요소 수 정보를 제공
탐색해야 할 요소 수가 정확하진 않더라도 제공된 값을 이용해서 더 쉽고 공평하게 Spliterator를 분할할 수 있다.
7.3.1 분할 과정
스트림을 여러 스트림으로 분할하는 과정은 재귀적으로 일어나며 trySplit의 결과가 null이 될 때까지 이 과정을 반복한다.
모든 trySplit의 결과가 null이면 재귀 분할 과정이 종료된다.
이 분할 과정은 characteristics 메서드로 정의되는 Spliterator의 특성에 영향을 받는다.
Spliterator 특성
Spliterator는 characteristics라는 추상 메서드도 선언하는데, 이 메서드는 Spliterator 자체의 특성 집합을 포함하는 int를 반환한다.
public interface Spliterator {
public static final int ORDERED = 0x00000010;
public static final int DISTINCT = 0x00000001;
public static final int SORTED = 0x00000004;
public static final int SIZED = 0x00000040;
public static final int NONNULL = 0x00000100;
public static final int IMMUTABLE = 0x00000400;
public static final int CONCURRENT = 0x00001000;
public static final int SUBSIZED = 0x00004000;
}
특성
의미
ORDERED
리스트처럼 요소에 정해진 순서가 있으므로 이 Spliterator는 요소를 탐색하고 분할할 때 이 순서에 유의해야 한다.
DISTINCT
x, y 두 요소를 방문했을 때 x.equals(y)는 항상 false를 반환한다.
SORTED
탐색된 요소는 미리 정의된 정렬 순서를 따른다.
SIZED
크기가 알려진 소스로 Spliterator를 생성했으므로 estimateSize()는 정확한 값을 반환한다.
NONNULL
탐색하는 모든 요소는 null이 아니다.
IMMUTABLE
이 Spliterator의 소스는 불변이다. 요소를 탐색하는 동안 요소를 추가, 수정, 삭제 불가
CONCURRENT
동기화 없이 Spliterator의 소스를 여러 스레드에서 동시에 고칠 수 있다.
SUBSIZED
이 Spliterator 그리고 분할되는 모든 Spliterator는 SIZED 특성을 갖는다.
7.3.2 커스텀 Spliterator 구현하기
반복형으로 단어 수를 세는 메서드
public static void main(String[] args) {
final String SENTENCE =
"Nel mezzo del cammin di nostra vita " +
"mi ritrovai in una selva oscura " +
"ch la dritta via era smarrita ";
System.out.println("Found " + countWordsIteratively(SENTENCE) + " words");
}
public static void main(String[] args) {
final String SENTENCE =
"Nel mezzo del cammin di nostra vita " +
"mi ritrovai in una selva oscura " +
"ch la dritta via era smarrita ";
Stream<Character> stream =
IntStream.range(0,SENTENCE.length()).mapToObj(SENTENCE::charAt);
System.out.println("Found " + countWords(stream) + " words");
}
public class WordCounter {
private final int counter;
private final boolean lastSpace;
public WordCounter(int i, boolean b) {
this.counter = i;
this.lastSpace = b;
}
public WordCounter accumulate(Character character) {
if(Character.isWhitespace(character)) {
return lastSpace ? this : new WordCounter(counter, true);
}else {
return lastSpace ? new WordCounter(counter + 1, false) : this;
}
}
public WordCounter combine(WordCounter wordCounter) {
return new WordCounter(counter + wordCounter.counter,
wordCounter.lastSpace);
}
public int getConter() {
return counter;
}
}
스트림을 탐색하면서 새로운 문자를 찾을 때마다 accumulate 메서드를 호출
반복형인 countWordsIteratively에서처럼 새로운 비공백 문자를 탐색한 다음에 마지막 문자가 공백이면 counter를 증가
combine은 문자열 서브 스트림을 처리한 WordCounter의 결과를 합친다. WordCounter들의 내부 counter값을 합치는 것이다.
WordCounter 병렬로 수행하기
함수형을 이용하면 직접 스레드를 동기화하지 않고도 병렬 스트림으로 작업을 병렬화할 수 있다.
문자열 스트림에서의 단어 개수 연산을 병렬로 처리해보자.
public static void main(String[] args) {
final String SENTENCE =
"Nel mezzo del cammin di nostra vita " +
"mi ritrovai in una selva oscura " +
"ch la dritta via era smarrita ";
Stream<Character> stream =
IntStream.range(0,SENTENCE.length()).mapToObj(SENTENCE::charAt);
System.out.println("Found " + countWords(stream.parallel()) + " words");
}
Found 79 words
SENTENCE는 실제로는 19개의 단어로 이루어져있으므로 stream.parallel()을 이용해 만든 문자열 병렬 스트림으로는 올바른 병렬 처리가 이루어지지 않고 있다.
원래의 문자열을 임의의 위치에서 둘로 분할하여 서브스트림을 만들기 때문에 원래 한 개의 단어를 여러 개로 계산하는 상황이 발생하는 것이다.
문제 해결을 위해선 문자열을 임의의 위치에서 분할하지 않고 단어가 끝나는 위치에서만 분할하도록 하는 Spliterator 인터페이스의 구현체를 적용한 스트림을 생성하여 이 스트림으로부터 단어 개수를 계산해야 한다.
class WordCounterSpliterator implements Spliterator<Character> {
private final String string;
private int currentChar;
public WordCounterSpliterator(String sentence) {
this.string = sentence;
}
@Override
public boolean tryAdvance(Consumer<? super Character> action){
action.accept(string.charAt(currentChar++)); // 현재 문자 소비
return currentChar < string.length(); // 소비할 문자가 남아있으면 true
}
@Override
public Spliterator<Character> trySplit(){
int currentSize = string.length() - currentChar;
if (currentSize < 10) {
return null;
// 파싱할 문자열을 순차 처리할 수 있을 만큼
// 충분히 작아졌음을 알리는 null 반환
}
for(int splitPos = currentSize / 2 + currentChar;
splitPos < string.length(); splitPos++) {
// 파싱할 문자열의 중간을 분할 위치로 설정
if(Character.isWhitespace(string.charAt(splitPos))) {
// 다음 공백이 나올 때까지 분할 위치를 뒤로 이동시킴
Spliterator<Character> spliterator
= new WordCounterSpliterator(
string.substring(currentChar, splitPos));
currentChar = splitPos;
return spliterator;
}
}
return null;
}
@Override
public long estimateSize() {
return string.length() - currentChar;
}
@Override
public int characteristics() {
return ORDERED + SIZED + SUBSIZED + NONNULL + IMMUTABLE;
// IMMUTABLE(문자열 자체가 불변 클래스이므로 문자열을 파싱하면서 속성 추가 X)
}
}
WordCounterSpliterator 활용
public static void main(String[] args) {
Spliterator<Character> spliterator = new WordCounterSpliterator(SENTENCE);
Stream<Character> stream = StreamSupport.stream(spliterator, true);
// StreamSupport.stream 팩토리 메서드로 전달한 두 번째 불리언 인수는
// 병렬 스트림 생성 여부를 지시한다.
System.out.println("Found " + countWords(stream) + " words");
}