본문 바로가기
spring/spring webFlux

[Spring] Reactive Programming (1) - Reactor 3, WebFlux

by moonsiri 2021. 2. 18.
728x90
반응형

Spring WebFlux

The original web framework included in the Spring Framework, Spring Web MVC, was purpose-built for the Servlet API and Servlet containers. The reactive-stack web framework, Spring WebFlux, was added later in version 5.0. It is fully non-blocking, supports Reactive Streams back pressure, and runs on such servers as Netty, Undertow, and Servlet 3.1+ containers.

5.0 버전에 추가된 스프링 웹 플럭스는 리액티브 스택 웹 프레임워크다. 완전하게 논블로킹으로 동작하며, Reactive Streams back pressure를 지원하고, Netty, Undertow, 서블릿 3.1+ 컨테이너 서버에서 실행된다.

 

 

Overview

웹플럭스가 탄생한 이유 중 하나는 적은 쓰레드로 동시 처리를 제어하고 적은 하드웨어 리소스로 확장하기 위해 논블로킹 웹 스택이 필요했기 때문이다.

 

여기서 잠깐! 논블로킹에 대해 알아보자.

자칫하면 비동기와 같은 의미 아닌가 할 수 있다. 동기/비동기는 처리해야 할 작업들을 어떠한 흐름으로 처리할 것인가에 대한 관점이고, 블로킹/논블로킹은 처리되어야 하는 하나의 작업이 전체적인 작업 흐름을 막느냐 안 막느냐에 대한 관점이라고 보면 된다. 논블로킹은 다른 주체의 작업과 관계없이 자신의 작업을 계속하는 것을 말한다. 자세한 내용은 여기 참조.

 

 

Reactive Stream

Reactive Streams 이란 non-blocking backpressure를 이용하여 비동기 서비스를 할 때 기본이 되는 스펙입니다. Java의 RxJava, String5 Webflux의 core에 있는 ProjectRector 프로젝터 모두 해당 스펙을 따르고 있습니다.

또한 Java9에 추가된 Flow 역시 reactive stream 스펙을 채택하여 사용하고 있습니다. 따라서 비동기 프로젝트를 잘 이해하기 위해서는 기본 스펙이 되는 Reactive Stream에 대해서 이해가 필요합니다.

 

reactive stream 시퀀스에서 소스 Publisher가 데이터를 생성합니다. 그러나 기본적으로 Subscriber가 등록(subscribe)할 때까지 아무 작업도 수행되지 않으며, 이때 데이터를 push 합니다.

 

Reactor는 각 단계에서 데이터에 적용할 processing을 설명하기 위해 함께 연결되는 운영자의 개념을 추가합니다. 연산자를 적용하면 새로운 중간 Publisher가 반환됩니다(사실 이 Publisher는 upstream 연산자에 대한 Subscriber 및 downstream에 대한 Publisher로 간주될 수 있음). 데이터의 최종 형식은 사용자의 관점에서 수행할 작업을 정의하는 최종 Subscriber로 끝납니다.

 

 

명세서

 

public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}

Publisher는 Subscriber를 받아들이는 메서드를 가집니다.

 

public interface Subscriber<T> {
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
}

Subscriber는 Subscription을 등록하고 Subscription에서 오는 신호(onNext, onError, onComplete)에 따라서 동작합니다.

 

public interface Subscription {
    public void request(long n);
    public void cancel();
}

Subscription은 Publisher와 Subscriber 사이에서 중계하는 역할을 합니다. request 메서드는 Subscriber가 Publisher에게 데이터를 요청하는 개수이며 cancel 메서드는 구독을 취소하겠다는 의미입니다.

 

https://grokonez.com/java/java-9-flow-api-reactive-streams

  1. Publisher에 본인이 소유할 Subscription을 구현하고 publishing 할 data를 만듭니다.
  2. Publisher는 subscribe() 메서드를 통해 subscriber를 등록합니다.
  3. Subscriber는 onSubscribe() 메서드를 통해 Subscription을 등록하고 Publisher를 구독하기 시작합니다. 이는 Publisher에 구현된 Subscription을 통해 이루어집니다. 이렇게 하면 Publisher와 Subscriber는 Subscription을 통해 연결된 상태가 됩니다. onSubscribe() 내부에 Subscription의 request()를 요청하면 그때부터 data 구독이 시작됩니다.
  4. Suscriber는 Subscription 메서드의 request() 또는 cancel()을 호출을 통해 data의 흐름을 제어할 수 있습니다.
  5. Subscription의 request()에는 조건에 따라 Subscriber의 onNext(), onComplete() 또는 onError()를 호출합니다. 그러면 Subscriber의 해당 메서드의 로직에 따라 request() 또는 cancle()로 제어하게 됩니다.

 

 

Flux

Flux <T>는 Flux 시퀀스를 생성, 변환, 오케스트레이션 하는 데 사용할 수 있는 많은 연산자로 보강된 Reactive Stream Publisher입니다.

 

0 - n개 <T> 요소(onNext 이벤트)를 방출한 다음 완료 또는 오류(onCompleteonError 터미널 이벤트)가 발생합니다. 터미널 이벤트가 트리거 되지 않으면 Flux는 무한합니다. (순서 보장이 안됨)

  • Flux의 정적 팩토리는 소스를 생성하거나 여러 callback 유형을 발생할 수 있습니다.
  • 인스턴스 메서드, 연산자를 사용하면 비동기 시퀀스를 생성하는 비동기 처리 파이프라인을 구축할 수 있습니다.
  • Flux#subscribe() 또는 Flux#publishFlux#publishNext와 같은 멀티캐스팅 연산은 파이프라인의 전용 인스턴스를 구현하고 그 내부의 데이터 흐름을 트리거합니다.

 

 

예제

Flux를 생성하는 다양한 팩토리 메서드.

 

1. empty flux 리턴

Flux<String> emptyFlux() {
    return Flux.empty();
}

 

2. 배열이나 컬렉션을 사용하지 않고 "foo"와 "bar" 값 2개를 포함하는 Flux 반환

Flux<String> fooBarFluxFromValues() {
    return Flux.just("foo", "bar");
}

 

3. "foo"와 "bar" 값 2개를 포함하는 목록으로 Flux 생성

Flux<String> fooBarFluxFromList() {
    return Flux.fromIterable(Arrays.asList("foo", "bar"));
}

 

4. IllegalStateException를 발생시키는 Flux 생성

Flux<String> errorFlux() {
    return Flux.error(IllegalStateException::new);
}

 

5. 100ms 마다 0에서 9까지 증가하는 값을 방출하는 Flux 생성

Flux<Long> counter() {
    return Flux.interval(Duration.ofMillis(100)).take(10);
}

 

 

Mono

Mono <T>는 Reactive Streams Publisher로, Mono 시퀀스를 생성, 변환, 오케스트레이션 하는 데 사용할 수 있는 많은 연산자로 추가되었습니다.

최대 1개까지의 <T> 요소를 방출할 수 있는 Flux의 전문화입니다. Mono는 valued(매개변수로 완료), empty(매개변수가 없이 완료), failed(오류) 중 하나입니다.

 

Mono <Void>는 완료 신호만 관심 있는 경우(실행 가능한 작업 완료에 해당하는 Reactive Streams)에 사용할 수 있습니다.

Flux와 마찬가지로 연산자를 사용하여 각 Subscription에 대해 새로 구현될 비동기 파이프라인을 정의할 수 있습니다.

시퀀스의 카디널리티를 변경하는 일부 API는 Flux를 반환합니다. (반대로 Flux에서 카디널리티를 1로 줄이는 API는 Mono를 반환함)

 

예제

 

1. Flux처럼 static factory를 사용하여 empty Mono 리턴

Mono<String> emptyMono() {
    return Mono.empty();
}

 

2. 아무것도 방출하지 않는 Mono 생성. (empty()와 달리 onComplete 이벤트를 생성하지 않음)

Mono<String> monoWithNoSignal() {
    return Mono.never();
}

 

3. Flux와 마찬가지로 사용 가능한 (unique) value "foo"를 포함하는 Mono 생성

Mono<String> fooMono() {
    return Mono.just("foo");
}

 

4. IllegalStateException를 발생시키는 Flux 생성

Mono<String> errorMono() {
    return Mono.error(IllegalStateException::new);
}

 

 

StepVerifier

react-test의 StepVerifier으로 publisher의 동작을 확인할 수 있습니다.

기대하던 이벤트와 일치하지 않는 이벤트가 발생하면 에러를 발생시킵니다.

반드시 verify라는 메서드를 호출해야 합니다. verify를 해야 subscribe가 시작됩니다.

 

  • expectNextCount : n개의 매개변수가 올 것을 기대한다.
  • withVirtualTime : flux가 매개변수를 생성하는데 심각하게 오래 걸린다면 테스트 코드를 검증하는데 시간이 오래 걸리게 될 것이다. 이때 withVirtualTime 메서드를 쓰면 verifier 가 core scheduler를 가상의 스케줄러로 임의적으로 교체해서 테스트 코드의 속도를 올릴 수 있다.
  • thenAwait, expectNoEvent : 가상의 스케줄러의 시간을 빨리 감기 하는 것
    • 단 expectNoEvent를 사용하려면 반드시 expectSubscription 이 선행되어야 한다.

 

 

Transform

Reactor는 데이터 변환에 사용할 수 있는 몇몇의 연산자를 제공합니다.

 

Map

  • 한 개의 데이터를 1-1로 변환
  • Java Stream의 map()과 비슷
  • 스트림의 중간에 값을 변환하는 역할
  • 반환 값은 대상 Object
void TransformMap() {
    Flux.just("a", "bc", "def", "wxyz")
        .map(String::length) // 문자열을 Integer 값으로 1-1 변환
        .subscribe(System.out::println);
}

Mono<User> capitalizeOne(Mono<User> mono) {
    return mono.map(u -> new User(u.getUsername()));
}

Flux<User> capitalizeMany(Flux<User> flux) {
    return flux.map(u -> new User(u.getUsername()));
}

 

 

FlatMap

  • 1:N을 변환 가능
  • 스트림의 중간에 값을 변환하는 역할
  • 반환 값은 reactor의 Publisher (Mono / Flux)

 

 

 

Flux<User> asyncCapitalizeMany(Flux<User> flux) {
    return flux.flatMap(this::asyncCapitalizeUser);
}

Mono<User> asyncCapitalizeUser(User u) {
    return Mono.just(new User(u.getUsername()));
}

 

map 은 반환 값이 대상 Object이고 flatMap은 reactor의 Publisher (Mono / Flux)이다. 출처:&nbsp; https://luvstudy.tistory.com/95

 

 

Merge

  • 여러 개의 publisher를 조합해서 단일 publisher로 만드는 작업
  • flux1.mergeWith(flux2)
    • 두 개의 flux가 도착하자마자 병합
    • flux1은 delay가 있어서 flux2의 값이 먼저 나타냄
  • flux1.concatWith(flux2)
    • 소스의 순서를 유지하려면 concat 연산자를 사용
    • Concat은 flux1이 완료될 때까지 기다렸다가 flux2를 구독(subscribe)하여 flux1의 모든 값이 방출되므로 순서를 유지
  • Flux.concat(mono1, mono2)
    • Mono 두 개를 가져와서 동일한 순서의 Flux로 변환

 

 

Request (Backpressure)

 

volume control. Reactive Stream 용어로 backpressure라고 합니다.

Subscriber가 처리할 수 있는 데이터 양을 Publisher에게 알리고 Publisher가 데이터를 생성하는 속도를 제한할 수 있는 피드백 메커니즘입니다.

 

demand 제어는 Subscription 레벨에서 수행됩니다. Subscription은 각 subscribe() 호출에 대해 생성되며, 데이터의 흐름을 cancel()하거나 request(long)에 따라 demand를 조정하도록 조작할 수 있습니다.

request(Long.MAX_VALUE)는 무제한 demand를 의미하므로 게시자가 가장 빠른 속도로 데이터를 내보냅니다.

 

시퀀스의 요소를 실제로 수정하지 않고 커스텀하려면 "doOn"으로 시작하는 "sid effect" 메서드를 사용하면 됩니다. 예를 들어 subscription 시, "start!"라고 출력하려면 doOnSubscribe()을 사용합니다.

doOne 메서드는 해당 이벤트에 대한 커스텀 액션 관련 callback을 받습니다, 이러한 callback에서는 지연시간이 있는 작업을 차단하거나 호출해서는 안됩니다. 빠른 연산자에 더 적합합니다.

 

예제

        Flux<String> colorFlux = Flux.just("red", "white", "blue")
                .log()
                .doOnSubscribe(subscription -> System.out.println("start!"))
                .map(String::toUpperCase)
                .doOnNext(System.out::println)
                .doOnComplete(() -> System.out.println("The end!"));
        colorFlux.subscribe();

        StepVerifier.create(colorFlux, 1)
                .expectNext("RED")
                .thenRequest(2)
                .expectNext("WHITE")
                .expectNext("BLUE")
                .verifyComplete();

        StepVerifier.create(colorFlux, 1)
                .expectNext("RED")
                .thenCancel()
                .verifyThenAssertThat();

 

 

 

Error

오류가 발생하면 어떻게 처리하라고 명시할 수 있습니다. 오류를 전파하는 동시에 복구(다른 시퀀스로 되돌아가거나 새로운 Subscription을 다시 시도함)합니다.

  • onErrorReturn : 에러가 발생하면 어떤 값을 리턴
  • onErrorResume : 에러가 발생하면 원하는 엘리먼트(sequence)를 리턴

 

RuntimeException은 Subscriber나 StepVerifier 에 의해서 잡히게됩니다. 

  • Exception.propagate : 발생한 ExceptionRuntimeException을 상속하고 있지 않을 경우, Checked Exception 형태로 발생한다. Exception.propagate를 이용하면 Checked Exception은 RuntimeException으로 바꿔서 전파한다. 
        flux.map(p -> {
            try {
                return anythingAction(p);
            } catch (Exception e) {
                throw Exceptions.propagate(e);
            }
        });

 

 

 

Reactive -> Blocking

 

때로는 코드의 일부만 reactive로 마이그레이션 할 수 있으며, more imperative code에서 reactive 시퀀스를 재사용해야합니다.

따라서 Mono의 값을 사용할 수 있을 때까지 block해야하는 경우 Mono#block() 메서드를 사용합니다. onError 이벤트가 트리거 되면 예외가 발생합니다.

가능한 end-to-end reactive code를 선호하여 이를 피해야합니다. 전체 reactive 파이프 라인을 잠글 수 있는 잠재력이 있으므로 다른 reactive code 중간에서 이를 피해야합니다.

 

마찬가지로 blockFirst()/blockLast()를 사용하여 Flux의 첫번째 또는 마지막 값을 block할 수 있습니다. toIterable을 사용하여 Flux를 Iterable로 변환할 수도 있습니다.

 

 

Blocking -> Reactive

 

subscribeOn 메서드를 사용하면 제공된 스케줄러에서 시작부터 시퀀스를 분리 할 수 ​​있습니다. 예를 들어, Schedulers.boundedElastic()은 요청에 따라 증가하는 스레드 풀을 생성하여 한동안 사용되지 않은 스레드를 자동으로 해제합니다.

  • subscribeOn : subsribe -> onSubribe -> request

 

slow subscribers의 경우 (예 : 데이터베이스에 저장) publishOn 연산자를 사용하여 시퀀스의 더 작은 섹션을 분리 할 수 ​​있습니다. subscribeOn과 달리 아래에있는 chain 부분에만 영향을 미치며 새 스케줄러로 전환합니다.

예를 들어 doOnNext를 사용하여 저장소에서 저장을 수행 할 수 있지만 then() 연산자(Mono <Void> 반환)를 체이닝하여 저장 성공 또는 실패 여부만 더 명확히 알 수 있습니다.

 

 

 

 

[Reference]

brunch.co.kr/@springboot/154

https://projectreactor.io/docs/core/release/reference/index.html

https://tech.io/playgrounds/929/reactive-programming-with-reactor-3/Intro

https://godekdls.github.io/Reactive%20Spring/contents/

github.com/moonsiri/spring-framework/tree/master/reactive-rest-service

728x90
반응형

댓글