계속해서 JAVA8 에서의 병렬 처리 방법에 대해 포스팅을 하고 있습니다.


JAVA8 에서 Stream API 를 사용하면, 병렬처리를 구현할 때 고려해야할 문제들을 생각하지 않아도 됐지만, 어떤 시점에 어떤식으로 주의하며 적용을 해야한다는 내용을 포스팅 했었습니다.



오늘은 병렬처리관련 마지막 포스팅으로, JAVA8 에서 새로 추가된 인터페이스인 Spliterator 를 소개하려 합니다. Spliterator 는 '분할할 수 있는 반복' 로, 기존 존재했던 반복자인 Iterator 와 비슷하지만 병렬 작업에 특화된 인터페이스입니다.


구현해야하는 인터페이스의 추상메소드 시그니처는 다음과 같습니다.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public interface Spliterator<T> {
    
    /**
     * <pre>
     *     해당 반복자를 순차적으로 소비(Consumer) 하면서, 
     *     탐색해야할 요소가 남아있는지에 대한 여부를 출력합니다.
     * </pre>
     * 
     * @param action
     * @return
     */
    boolean tryAdvance(Consumer<super T> action);
    
    /**
     * <pre>
     * 본인의 Spliterator 의 요소 값을 일부 분할하여, 
     * 또 다른 Spliterator 를 생성합니다. 
     * 
     * (작업의 분할과정이라 생각하면, 쉬울 것 같습니다.)
     * </pre>
     * 
     * @return
     */
    Spliterator<T> trySplit();
    
    /**
     * <pre>
     *     작업해야할 요소 정보 수를 추측한 크기를 반환합니다.
     *    해당 크기는 꼭 일치해야 하는 것은 아닙니다. 
     *
     *    (trySplit 메소드를 이용하여, 작업분할 시 참고 자료가 됩니다.)
     * </pre>
     * 
     * @return
     */
    long estimateSize();
    
    /**
     * 현재 Spliterator 의 특성정보를 표시합니다.
     * 
     * <pre>
     *     ORDER : 요소분할 시, 순서가 존재하기 때문에 유의해서 분할하도록 합니다.
     *     DISTINCT : 요쇼 간 같은 값(equals)은 없다고 보장합니다.
     *     SORTED : 탐색된 요소는 미리 정의된 정렬순서를 따릅니다. 
     *     SIZED : estimateSize 은 정확한 정보를 줄 것을 보장합니다.
     *     NONNULL : 탐색하는 모든 요소는 null 이 아님을 보장합니다.
     *     IMMUTABLE : 요소가 탐색되는 동안 데이터를 추가하거나, 삭제할 수 없습니다.
     *     CONCURRENT : 동기화 없이, 여러 스레드가 소스를 동시에 수정할 수 있습니다.
     *     SUBSIZED : 이 Spliterator 와 분할 된 모든 Spliterator 는 모두 SIZE 임을 보장합니다.
     * </pre>
     * 
     * @return
     */
    int characteristics();
}
 
cs


즉 Spliterator 의 메소드 시그니처들은 요소들의 분할을 위해 존재하고 있습니다.


해당 분할 과정은 다음과 같이 이루어 집니다.


출처 : [JAVA8 in Action]


Spliterator 들은 trySplit 메소드를 사용하여, 분할할 수  있는 단계까지 분할합니다. 

즉 내부적으로 분할할 수 있다면, 분할된 부분에 대한 정보를 가진 Spliterator 을 생성한다는 것이죠. 더이상 분할할 수 없다면 null 을 출력합니다. 


이런식으로 최종적으로 도출된 [병렬 특화 반복자]로 병렬처리를 수행합니다. 


한번 예제를 작성해보겠습니다.


1
2
3
4
5
6
7
8
List<Integer> dataSet = IntStream.range(01000).boxed().collect(Collectors.toList());
        
dataSet.parallelStream().
        reduce(Integer::sum).
        ifPresent(System.out::println);
 
// 출력
// 499500
cs


간단한 덧셈에 대한 선언입니다.  


현재는 parallelStream 을 이용해서, Stream 형태로 추출하였지만 우리는 병렬에 특화된 Spliterator 를 만들겁니다. ㅡㅡ^


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
/**
 * Integer 의 덧셈을 위한 Spliterator 
 * 
 * @author Doohyun
 *
 */
public class IntegerSumSpliterator implements Spliterator<Integer>{
    // 자르지 말아야할 최소 사이즈!
    private static final int LIMITED_SPLIT_SIZE = 500;
    
    private List<Integer> integerList;
    private Integer currentIndex = 0;
    
    public IntegerSumSpliterator(List<Integer> integerList){
        this.integerList = integerList;
    }
 
    @Override
    public boolean tryAdvance(Consumer<super Integer> action) {
        action.accept(integerList.get(currentIndex++));
        return currentIndex < integerList.size();
    }
 
    @Override
    public Spliterator<Integer> trySplit() {
        Integer currentSize = integerList.size() - currentIndex;
        
        if (currentSize <= LIMITED_SPLIT_SIZE) {
            // 잘라진 사이즈가 자르지 말아야할 최소사이즈보다 작다면 null 을 출력.
            return null;
        } else {
            // 할 일을 절반씩 잘라줍시다.
            Integer splitTargetSize = currentIndex + currentSize/2;
    
            List<Integer> subList = integerList.subList(currentIndex, splitTargetSize);
            currentIndex = splitTargetSize;
            
            return new IntegerSumSpliterator(subList);
        }
    }
 
    @Override
    public long estimateSize() {
        return integerList.size() - currentIndex;
    }
 
    /**
     * 타겟 리스트의 속
     *     - 각 요소의 유일함을 보장 [(1~1000) 까지의 리스트] : DISTINCT
     *     - 연산 중 중간에 수정될 일이 없으며 : IMMUTABLE
     *     - 여러 스레드가 동시 접근 가능 : CONCURRENT
     *     - 크기는 보장, 하위로 만들어지는 크기도 보장 : SIZED, SUBSIZED - filter 등 고려 안함.
     */
    @Override
    public int characteristics() {
        return Spliterator.DISTINCT + Spliterator.IMMUTABLE + Spliterator.CONCURRENT + Spliterator.SIZED + Spliterator.SUBSIZED;
    }
 
}
 
cs


각 인터페이스를 구현하고, StreamSupport 팩토리를 통해서 Stream 으로 변환해줍니다.


1
2
3
4
5
6
StreamSupport.stream(new IntegerSumSpliterator(dataSet), true)
            .reduce(Integer::sum)
            .ifPresent(System.out::println);
 
// 출력
// 499500
cs


위의 일반 Stream 을 사용한 것과 같은 결과가 나왔습니다. 그런데 굳이 귀찮게 Spliterator 를 만들어야 하나요? 같은 결과가 나오는 데 ㅡㅡ^


Spliterator 의 장점은 개발자의 의지병렬로 처리할 양이나 처리할 때 고려할 사항 (characteristics) 등을 변경할 수 있다는 것입니다. 성능 측정 결과를 기반으로 병렬처리 로직을 더욱 최적화 시킬 수 있을 것이라 생각합니다.



자바 8 인 액션
국내도서
저자 : 라울-게이브리얼 우르마(RAOUL-GABRIEL URMA),마리오 푸스코(MARIO FUSCO),앨런 마이크로프트(ALAN MYCROFT) / 우정은역
출판 : 한빛미디어 2015.04.01
상세보기








반응형
Posted by N'