지난 포스팅에서는 현재 만들어지는 어플리케이션이 대부분 네트워크 작업을 수행하며, 그에 따라 비동기 처리가 중요함을 생각할 수 있었습니다. 


JAVA 에서는 이를 위해 Future 인터페이스를 제공했으며, JAVA8 에서는 조금 더 쓰기 쉬운 CompleteableFuture 의 사용법을 알아보았습니다.



CompleteableFuture 에 대해서 조금 더 봐야할 부분은 Future 에 비해 쓰기 쉬워진 점과 더불어 비동기 처리를 조합할 수 있다는 것입니다.


예를들어, 비동기 처리를 하는 중 동기 처리를 수행하고 비동기 처리를 계속해서 진행해야할 수도 있고 각자 시작한 비동기 처리의 결과의 싱크를 맞춰 처리해야할 수도 있습니다. CompleteabeFuture 에는 이를 위한 파이프라인 메소드를 지원합니다.


오늘 포스팅은 시나리오에 따른 구현을 살펴보려 합니다.


1. 두 비동기작업(A, B) 간의 순서가 존재하며, A의 결과로 B 를 수행.


비동기 처리를 해야하는 두 가지 작업 A, B 가 있다고 가정합시다. 


그런데 상황이 조금 복잡합니다.


비동기 처리 A 를 수행하는 것은 문제가 아니지만, B 의 결과를 도출하기 위해서는 A 의 결과가 필요합니다. 즉 A 가 끝난 다음, B 가 실행되어야함을 의미합니다.


CompleteabeFuture 에서는 아래와 같은 파이프라인 작업으로 이를 쉽게 해결할 수 있습니다.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
 Supplier<String> A = () -> {
    try {
        System.out.println("A 스레드 작업 시작");
        Thread.sleep(2000);
        System.out.println("A 스레드 작업 완료");
        return "A 실행";
    } catch (InterruptedException e){
        e.printStackTrace();
        return "실패";
    }
};
 
Function<StringString> B = (aResult) -> {
    try {
        System.out.println("B 스레드 작업 시작");
        Thread.sleep(1000);
        System.out.println("B 스레드 작업 완료");
        return aResult + " B 실행";
    } catch (InterruptedException e){
        e.printStackTrace();
        return "실패";
    }
};
 
 Future<String> result = CompletableFuture.
                supplyAsync(A).
                thenApply(aResult -> aResult + " A 성공 -> ").
                thenCompose(aSucceedResult -> CompletableFuture.supplyAsync(() -> B.apply(aSucceedResult)));
 
System.out.println(result.get());
 
// 결과
// A 스레드 작업 시작
// A 스레드 작업 완료
// B 스레드 작업 시작
// B 스레드 작업 완료
// A 실행 A 성공 -> C 실행
cs


새로운 문법이 등장했습니다.


thenApply 


A의 결과를 받아, 다른 결과를 내보는 Function 과 같은 역할을 수행합니다. 

기존 파이프라인 메소드인 map 과 동일합니다.


생각해봐야할건 앞서, A 의 결과가 끝날 때까지 thenApply 에서 블록이 걸리지 않는다는 것입니다. 즉 A 의 비동기 결과가 모두 끝나야 실행이 됩니다.


thenCompose


A로부터의 최종 결과가 끝나는 즉시, B를 실행합니다. 


함수 디스크립터를 보면, 

A 의 결과인 aSucceedResult 를 받음을 알 수 있으며 B 실행 시 이를 사용합니다.


2. 두 비동기작업(A, B) 간 순서는 없지만, A, B 결과를 합쳐야 하는 경우


이번엔 다른 케이스를 생각해보겠습니다. 


시간이 오래 걸리는 A, C 의 순서 관계는 없으며, 이 둘의 결과가 합쳐지기만 기다리면 됩니다.

비동기 처리는 동시에 보내지만, 최종 결과는 A,C 가 모두 끝날 때까지 기다려야합니다.


이 기능을 위해 우리는 CompleteabeFuture 의 파이프라인 메소드인 thenCombine 을 사용할 수 있습니다. 아래 예제에서 사용법을 확인할 수 있습니다.


가정 : C 의 작업을 A 보다 빨리도록 조정하였습니다.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
 Supplier<String> A = () -> {
    try {
        System.out.println("A 스레드 작업 시작");
        Thread.sleep(2000);
        System.out.println("A 스레드 작업 완료");
        return "A 실행";
    } catch (InterruptedException e){
        e.printStackTrace();
        return "실패";
    }
};
 
Supplier<String> C = () -> {
    try {
        System.out.println("C 스레드 작업 시작");
        Thread.sleep(500);
        System.out.println("C 스레드 작업 완료");
        return "C 실행";
    } catch (InterruptedException e){
        e.printStackTrace();
        return "실패";
    }
};
 
Future<String> result2 = CompletableFuture.
                supplyAsync(A).
                thenApply(aResult -> aResult + " A 성공 -> ").
                thenCombine(CompletableFuture.supplyAsync(C), (a, c) -> a + c);
 
System.out.println(result.get());
 
// 결과
// A 스레드 작업 시작
// C 스레드 작업 시작
// C 스레드 작업 완료
// A 스레드 작업 완료
// A 실행 A 성공 -> C 실행
cs


역시 새로운 문법인 thenCombine 입니다.


thenCombine


파라미터로 또 다른 CompleteableFuture 를 받으며, 동시에 각 비동기 작업의 결과로 다른 결과를 도출하는 BiFunction 을 받음을 알 수 있습니다.


A,C 의 실행은 동시에 되지만, 결과는 A,C 가 모두 끝난다음에 도출되는 것을 볼 수 있습니다.

즉 비동기 프로그래밍에서 싱크 맞추기 문제가 이렇게 쉽게 처리가 됨을 알 수 있습니다. 


3. 동작을 미리 등록하고, 실행계획 세우기 


비동기로 실행하는 여러 작업이 있다는 가정하에 우리는 join 메소드를 통해 모든 작업이 완료가 되고 결과를 받아볼 수 있었습니다.


하지만 작업이 너무 많아, 실행이 너무 오래 걸리고 또한 어떤 작업은 타임아웃이 되버릴 수 있습니다. 결국 작업이 많은 게 문제네요. ㅡㅡ^


하지만 이런 것을 생각해볼 수 있습니다. 


future 에 결과로 미리 할 작업을 등록하고, 


비동기 작업이 모두 완료 해야하는지


어느 한개만 완료 해도 되는지,


등을 생각해 볼 수 있습니다. 


물론 타임아웃 시간도 등록하여, 해당 시간내에 작업이 끝났는지 혹은 타임아웃이 되었는지 알려 줄 수도 있겠죠?


일단 미리 작업을 등록하는 것 부터 살펴보죠.


future 에 대한 결과를 받아, 소비하는 Consumer 를 등록하는 CompleteableFuture 의 thenAccept 메소드를 주목합시다.


thenAccept


파라미터로 future 의 결과를 받아, 할 일을 지정하는 Consumer 를 받습니다.

즉 Future 의 결과를 받을 수 있을 때 일을 정의하는 것이 아닌, 미리 일을 정의하고 실행 계획에 따라 Consumer 를 실행합니다.


thenAccept 를 사용하여 일을 미리 저장하고, 실행 계획을 지정하는 방법에 대한 예제는 아래와 같습니다.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// 해야할 일에 대한 Supplier 목록 정의
List<Supplier<Integer>> supplierList = IntStream.range(050).
        mapToObj(n -> {
            // 각 Supplier 는 랜덤한 delay 작업 후, 해당 delay 시간을 출력하는 역할.
            Supplier<Integer> supplier = () -> {
                int time = new Random().nextInt(2000+ 1000;
 
                try {
                    Thread.sleep(time);
                    return time;
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    return -1;
                }
            };
 
            return supplier;
        }).
        collect(Collectors.toList());
 
CompletableFuture[] completableFutures = supplierList.stream().
                map(CompletableFuture::supplyAsync).
                map(f -> f.thenAccept(System.out::println)).        // thenAccept 를 이용하여, 할 일 정의.
                toArray(size -> new CompletableFuture[size]);       // 이 후, Future 의 제네릭은 Void,
                                                                    // 배열로 출력하자.
 
// allOf 사용. 모든 Supplier 는 전부 실행. join 메소드를 통해 모든 실행이 끝나길 기다립니다.
CompletableFuture.allOf(completableFutures).join();
 
// anyOf 사용. 한 Supplier 만 실행되도 작업 마무리, get 메소드를 사용하여 timeout 지정.
CompletableFuture.anyOf(completableFutures).get(5000, TimeUnit.MILLISECONDS);
cs


Stream API 처럼 any와 all 과 같은 형식으로, anyOf, allOf 메소드를 지원합니다. 각 실행 전략에 따라 각 Future 의 결과를 적어도 한 가지만 실행할 지, 모두 실행해야하는 지를 지정합니다.


오늘 포스팅에서는 여러 비동기 처리에 따른 싱크를 맞추는 방법, 실행전략 등을 파이프라인식으로 간단하게 처리할 수 있음을 알 수 있었습니다. 


비동기처리를 하는 방식 역시 선언형으로 간단하게 제어할 수 있다는 것은 매우 흥미로운 점이며, 어플리케이션 만드는 방법은 더욱 간편해지고 우리는 비지니스 로직에 집중하기가 매우 좋아질 것이라 생각합니다.


이처럼 새로운 기술을 익힌다는 것은 매우 즐거운 일이며, 우리의 어플리케이션의 질은 더 좋아질 것입니다. :-)


자바 8 인 액션
국내도서
저자 : 라울-게이브리얼 우르마(RAOUL-GABRIEL URMA),마리오 푸스코(MARIO FUSCO),앨런 마이크로프트(ALAN MYCROFT) / 우정은역
출판 : 한빛미디어 2015.04.01
상세보기


반응형
Posted by N'

요즘 제작되는 어플리케이션은 대부분 네트워크 작업을 필요로 하는 경우가 늘어나고 있습니다.


단순 정보만 요청하고 받던 클라이언트 기반 프로그램부터 시작하여, 인터넷이 필요가 없을 것 같은 메모나 사진 촬영 앱도 공유 혹은 클라우드 처리를 지원하기 때문에 네트워크 작업이 없는 경우는 거의 없다고 생각할 수 있을 것 같습니다.


이러한 네트워크와 연결을 시도하는 프로그램을 제작해본 사람들이라면 알고 있겠지만, 네트워크 처리나 File IO 등 오래걸리는 일은 비동기 처리를 해야 합니다. 


비동기 처리는 하는 이유는 IO 작업이 일어나는 동안 메인스레드(아마도 UI 스레드)가 그동안 놀고 있는 상태(블럭 상태 - CPU 사이클이 낭비됨.)를 피하기 위해서입니다. 사용자 측면에서 어떤 정보를 읽어오는 동안 UI 가 멈춘다면, 불편하다고 충분히 느낄 수 있을 것입니다. ㅡㅡ^


아래는 비동기 처리 작업에 대한 그림입니다. 


비동기 프로세스(Async) 의 경우 Process B 가 실행되는 동안 Process A 는 계속해서 작업을 할 수 있습니다.



Java7 부터는 Future 인터페이스를 통해서, 이러한 비동기 프로세스를 수행할 수 있었습니다.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
ExecutorService executorService = Executors.newCachedThreadPool();
 
Future<Double> future = executorService.submit(()-> {
    // 편의 상 람다 사용. 자바7 에서는 사용 불가.
    Thread.sleep(2000);
 
    return 1000.0;
});
 
System.out.println("비동기 처리를 하는 동안 다른 일처리.");
 
try {
    // 타임아웃 3초로 지정.
    System.out.println("결과 : " + future.get(3000, TimeUnit.MILLISECONDS));
catch (Exception e) {
    e.printStackTrace();
}
 
// 출력 결과
// 비동기 처리를 하는 동안 다른 일처리.
// 결과 : 1000.0
cs


Future 클래스는 비동기 계산이 끝났는지 확인할 수 있는 isDone, 타임아웃 기간을 결정하고 결과를 출력하는 get 메소드 등이 있습니다.


간단히 비동기 처리는 되지만 조금 아쉽습니다. 


실무에서는 비동기 처리가 꼭 하나씩 생긴다고 볼 수 없으며, 각 비동기 처리에 대한 결과를 동기를 맞춰 또 다른 결과를 내야할 수 도 있습니다. 즉 각 Future 클래스 간 여러 의존성에 대한 관리가 힘들 수 있습니다.


JAVA8 에서는 복잡한 비동기처리를 선언형으로 이용할 수 있는 CompleteableFuture 를 제공하며, Stream API 나 Optional 같이 람다표현식과 파이프라인을 사용하여 비동기 작업을 조합할 수 있습니다.


일단 CompleteableFuture 의 간단한 예제는 아래와 같습니다. CompleteableFuture 는 기본적으로 supplyAsync, runAsync 등 팩토리 메소드를 제공하며, 쉽게 비동기 작업을 수행할 수 있습니다.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Future<Double> future = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (Exception e){
                e.printStackTrace();
            }
 
            return 1000.0;
        });
 
System.out.println("비동기 처리를 하는 동안 다른 일처리.");
 
try {
    // 타임아웃 3초로 지정.
    System.out.println("결과 : " + future.get(3000, TimeUnit.MILLISECONDS));
catch (Exception e) {
    e.printStackTrace();
}
cs


비동기로 처리되어야 할 일이 많아지면 어떨까요? 동시에 앞의 예제처럼 Sleep 을 해야하는 task 가 다수일 때는 간단하게 두가지의 선택 경로를 생각해볼 수 있습니다.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
Supplier<Double> supplier = () -> {
    try {
        Thread.sleep(2000);
    } catch (Exception e){
        e.printStackTrace();
    }
 
    return 1000.0;
};
 
List<Supplier<Double>> supplierList = Arrays.asList(supplier, supplier, supplier, supplier);
 
// 병렬 스트림을 이용. 각 태스크를 병렬로 하여 성능을 높이자.
supplierList.parallelStream().
    map(Supplier::get).
    reduce(Double::sum).
    ifPresent(System.out::println);
 
// CompletableFuture 를 이용한 비동기적으로 처리
{
    List<CompletableFuture<Double>> completableFutures = supplierList.stream().
            map(CompletableFuture::supplyAsync).
            collect(Collectors.toList());
 
    // join 메소드는 모든 비동기 동작이 끝나길 기다립니다.
    completableFutures.stream().
            map(CompletableFuture::join).
            reduce(Double::sum).
            ifPresent(System.out::println);
}
cs


두 구현 방식에 따라 결과는 큰 차이가 나지 않을 수 있습니다. 


그러나 일반 순차 Stream 을 병렬 Stream 으로 변경한 첫 번째 방법이 간단해 보입니다.


굳이 CompletableFuture 를 쓸 필요가 없어보이지만, 병렬스트림과 달리 이를 이용한 방법은 executor 를 커스터마이징 할 수 있습니다.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
List<Supplier<Double>> supplierList = IntStream.range(0100).mapToObj(n -> supplier).collect(Collectors.toList());
 
supplierList.parallelStream().
        map(Supplier::get).
        reduce(Double::sum).
        ifPresent(System.out::println);
 
// CompletableFuture 를 이용한 비동기적으로 처리
{
 
    final Executor executor = Executors.newFixedThreadPool(Math.min(supplierList.size(), 100), r -> {
                Thread t = new Thread(r);
                // 데몬 스레드 정의
                // 일반 스레드가 실행 중일 때 자바 프로그램은 종료되지 않음 -> 어떤 이벤트를 한없이 기다리면서 종료되지 않은 일반 자바 스레드가 있으면 문제
                // 데몬 스레드는 자바 프로그램이 종료될 때 종료
                t.setDaemon(true);
                return t;
            });
 
    List<CompletableFuture<Double>> completableFutures = supplierList.stream().
            map(CompletableFuture::supplyAsync).
            collect(Collectors.toList());
 
    // join 메소드는 모든 비동기 동작이 끝나길 기다립니다.
    completableFutures.stream().
            map(CompletableFuture::join).
            reduce(Double::sum).
            ifPresent(System.out::println);
}
 
// 병렬스트림 걸린 시간 : 26066초
// CompletableFuture 사용 걸린 시간 : 2015초
cs


놀라운 결과입니다. 걸린 시간 자체가 무려 10배가 넘게 차이가 남을 알 수 있습니다.

즉 로직에 따라 Executor 를 다르게 하여, 최적화 시키는 것이 효과적일 수 있음을 알 수 있습니다.


비동기 처리의 최적화와 더불어, CompleteableFuture 는 람다표현식이나 파이프라인 메소드를 이용하여, 비동기 연산을 조합할 수 있습니다. 다음 포스팅에서는 이에 대해 다루어 보도록 하겠습니다. 



자바 8 인 액션
국내도서
저자 : 라울-게이브리얼 우르마(RAOUL-GABRIEL URMA),마리오 푸스코(MARIO FUSCO),앨런 마이크로프트(ALAN MYCROFT) / 우정은역
출판 : 한빛미디어 2015.04.01
상세보기


반응형
Posted by N'