CompleteableFuture 를 이용한 비동기 처리 조합
지난 포스팅에서는 현재 만들어지는 어플리케이션이 대부분 네트워크 작업을 수행하며, 그에 따라 비동기 처리가 중요함을 생각할 수 있었습니다.
JAVA 에서는 이를 위해 Future 인터페이스를 제공했으며, JAVA8 에서는 조금 더 쓰기 쉬운 CompleteableFuture 의 사용법을 알아보았습니다.
CompleteableFuture 에 대해서 조금 더 봐야할 부분은 Future 에 비해 쓰기 쉬워진 점과 더불어 비동기 처리를 조합할 수 있다는 것입니다.
예를들어, 비동기 처리를 하는 중 동기 처리를 수행하고 비동기 처리를 계속해서 진행해야할 수도 있고 각자 시작한 비동기 처리의 결과의 싱크를 맞춰 처리해야할 수도 있습니다. CompleteabeFuture 에는 이를 위한 파이프라인 메소드를 지원합니다.
오늘 포스팅은 시나리오에 따른 구현을 살펴보려 합니다.
1. 두 비동기작업(A, B) 간의 순서가 존재하며, A의 결과로 B 를 수행.
비동기 처리를 해야하는 두 가지 작업 A, B 가 있다고 가정합시다.
그런데 상황이 조금 복잡합니다.
비동기 처리 A 를 수행하는 것은 문제가 아니지만, B 의 결과를 도출하기 위해서는 A 의 결과가 필요합니다. 즉 A 가 끝난 다음, B 가 실행되어야함을 의미합니다.
CompleteabeFuture 에서는 아래와 같은 파이프라인 작업으로 이를 쉽게 해결할 수 있습니다.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 | Supplier<String> A = () -> { try { System.out.println("A 스레드 작업 시작"); Thread.sleep(2000); System.out.println("A 스레드 작업 완료"); return "A 실행"; } catch (InterruptedException e){ e.printStackTrace(); return "실패"; } }; Function<String, String> B = (aResult) -> { try { System.out.println("B 스레드 작업 시작"); Thread.sleep(1000); System.out.println("B 스레드 작업 완료"); return aResult + " B 실행"; } catch (InterruptedException e){ e.printStackTrace(); return "실패"; } }; Future<String> result = CompletableFuture. supplyAsync(A). thenApply(aResult -> aResult + " A 성공 -> "). thenCompose(aSucceedResult -> CompletableFuture.supplyAsync(() -> B.apply(aSucceedResult))); System.out.println(result.get()); // 결과 // A 스레드 작업 시작 // A 스레드 작업 완료 // B 스레드 작업 시작 // B 스레드 작업 완료 // A 실행 A 성공 -> C 실행 | cs |
새로운 문법이 등장했습니다.
thenApply
A의 결과를 받아, 다른 결과를 내보는 Function 과 같은 역할을 수행합니다.
기존 파이프라인 메소드인 map 과 동일합니다.
생각해봐야할건 앞서, A 의 결과가 끝날 때까지 thenApply 에서 블록이 걸리지 않는다는 것입니다. 즉 A 의 비동기 결과가 모두 끝나야 실행이 됩니다.
thenCompose
A로부터의 최종 결과가 끝나는 즉시, B를 실행합니다.
함수 디스크립터를 보면,
A 의 결과인 aSucceedResult 를 받음을 알 수 있으며 B 실행 시 이를 사용합니다.
2. 두 비동기작업(A, B) 간 순서는 없지만, A, B 결과를 합쳐야 하는 경우
이번엔 다른 케이스를 생각해보겠습니다.
시간이 오래 걸리는 A, C 의 순서 관계는 없으며, 이 둘의 결과가 합쳐지기만 기다리면 됩니다.
즉 비동기 처리는 동시에 보내지만, 최종 결과는 A,C 가 모두 끝날 때까지 기다려야합니다.
이 기능을 위해 우리는 CompleteabeFuture 의 파이프라인 메소드인 thenCombine 을 사용할 수 있습니다. 아래 예제에서 사용법을 확인할 수 있습니다.
가정 : C 의 작업을 A 보다 빨리도록 조정하였습니다.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 | Supplier<String> A = () -> { try { System.out.println("A 스레드 작업 시작"); Thread.sleep(2000); System.out.println("A 스레드 작업 완료"); return "A 실행"; } catch (InterruptedException e){ e.printStackTrace(); return "실패"; } }; Supplier<String> C = () -> { try { System.out.println("C 스레드 작업 시작"); Thread.sleep(500); System.out.println("C 스레드 작업 완료"); return "C 실행"; } catch (InterruptedException e){ e.printStackTrace(); return "실패"; } }; Future<String> result2 = CompletableFuture. supplyAsync(A). thenApply(aResult -> aResult + " A 성공 -> "). thenCombine(CompletableFuture.supplyAsync(C), (a, c) -> a + c); System.out.println(result.get()); // 결과 // A 스레드 작업 시작 // C 스레드 작업 시작 // C 스레드 작업 완료 // A 스레드 작업 완료 // A 실행 A 성공 -> C 실행 | cs |
역시 새로운 문법인 thenCombine 입니다.
thenCombine
파라미터로 또 다른 CompleteableFuture 를 받으며, 동시에 각 비동기 작업의 결과로 다른 결과를 도출하는 BiFunction 을 받음을 알 수 있습니다.
A,C 의 실행은 동시에 되지만, 결과는 A,C 가 모두 끝난다음에 도출되는 것을 볼 수 있습니다.
즉 비동기 프로그래밍에서 싱크 맞추기 문제가 이렇게 쉽게 처리가 됨을 알 수 있습니다.
3. 동작을 미리 등록하고, 실행계획 세우기
비동기로 실행하는 여러 작업이 있다는 가정하에 우리는 join 메소드를 통해 모든 작업이 완료가 되고 결과를 받아볼 수 있었습니다.
하지만 작업이 너무 많아, 실행이 너무 오래 걸리고 또한 어떤 작업은 타임아웃이 되버릴 수 있습니다. 결국 작업이 많은 게 문제네요. ㅡㅡ^
하지만 이런 것을 생각해볼 수 있습니다.
future 에 결과로 미리 할 작업을 등록하고,
비동기 작업이 모두 완료 해야하는지,
어느 한개만 완료 해도 되는지,
등을 생각해 볼 수 있습니다.
물론 타임아웃 시간도 등록하여, 해당 시간내에 작업이 끝났는지 혹은 타임아웃이 되었는지 알려 줄 수도 있겠죠?
일단 미리 작업을 등록하는 것 부터 살펴보죠.
future 에 대한 결과를 받아, 소비하는 Consumer 를 등록하는 CompleteableFuture 의 thenAccept 메소드를 주목합시다.
thenAccept
파라미터로 future 의 결과를 받아, 할 일을 지정하는 Consumer 를 받습니다.
즉 Future 의 결과를 받을 수 있을 때 일을 정의하는 것이 아닌, 미리 일을 정의하고 실행 계획에 따라 Consumer 를 실행합니다.
thenAccept 를 사용하여 일을 미리 저장하고, 실행 계획을 지정하는 방법에 대한 예제는 아래와 같습니다.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 | // 해야할 일에 대한 Supplier 목록 정의 List<Supplier<Integer>> supplierList = IntStream.range(0, 50). mapToObj(n -> { // 각 Supplier 는 랜덤한 delay 작업 후, 해당 delay 시간을 출력하는 역할. Supplier<Integer> supplier = () -> { int time = new Random().nextInt(2000) + 1000; try { Thread.sleep(time); return time; } catch (InterruptedException e) { e.printStackTrace(); return -1; } }; return supplier; }). collect(Collectors.toList()); CompletableFuture[] completableFutures = supplierList.stream(). map(CompletableFuture::supplyAsync). map(f -> f.thenAccept(System.out::println)). // thenAccept 를 이용하여, 할 일 정의. toArray(size -> new CompletableFuture[size]); // 이 후, Future 의 제네릭은 Void, // 배열로 출력하자. // allOf 사용. 모든 Supplier 는 전부 실행. join 메소드를 통해 모든 실행이 끝나길 기다립니다. CompletableFuture.allOf(completableFutures).join(); // anyOf 사용. 한 Supplier 만 실행되도 작업 마무리, get 메소드를 사용하여 timeout 지정. CompletableFuture.anyOf(completableFutures).get(5000, TimeUnit.MILLISECONDS); | cs |
Stream API 처럼 any와 all 과 같은 형식으로, anyOf, allOf 메소드를 지원합니다. 각 실행 전략에 따라 각 Future 의 결과를 적어도 한 가지만 실행할 지, 모두 실행해야하는 지를 지정합니다.
오늘 포스팅에서는 여러 비동기 처리에 따른 싱크를 맞추는 방법, 실행전략 등을 파이프라인식으로 간단하게 처리할 수 있음을 알 수 있었습니다.
비동기처리를 하는 방식 역시 선언형으로 간단하게 제어할 수 있다는 것은 매우 흥미로운 점이며, 어플리케이션 만드는 방법은 더욱 간편해지고 우리는 비지니스 로직에 집중하기가 매우 좋아질 것이라 생각합니다.
이처럼 새로운 기술을 익힌다는 것은 매우 즐거운 일이며, 우리의 어플리케이션의 질은 더 좋아질 것입니다. :-)
|
'개발이야기 > 함수형 방법론' 카테고리의 다른 글
[JAVA8 포스팅 끝] 용어정리 및 키워드. (0) | 2017.04.11 |
---|---|
JAVA8 에 추가된 새로운 날짜 & 시간 API. (1) | 2017.04.04 |
CompleteableFuture 를 이용한 비동기 처리 (0) | 2017.03.23 |
Null 대신 Optional! (0) | 2017.03.14 |
디폴트 메소드와 다중상속 (0) | 2017.03.13 |