ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Reactive Streams 구현
    spring/reactive 2021. 6. 1. 19:10

    배경

    Observer패턴은 subject-observer 기반 이벤트 프로그래밍을 비동기로 처리하기 쉬운 환경을 제공한다. 공부하려는 Reactive Streams 또한 Observer패턴 기반으로 만들어졌으며 여기에 완료라는 개념과 에러를 핸들링하는 방식을 추가해서 완성되었다.

    Reactive Streams 공식문서 내용을 보면 Publisher는 subject와, Subscriber는 observer와 대응되는 걸 알 수 있다. 또한 Subscriber의 표준을 확인해보면 onSubscribe는 반드시, onNext()는 자유롭게, OnError나 onComplete는 구현여부를 프로그래머에게 맡겼다. 즉, 기본 Observer패턴에 약간의 기능을 선택적으로 추가하게 만든 것이다.

     

     

    아키텍처

    Subscriber는 Publisher를 구독한다. 이벤트가 발생하면 publisher.subscribe(subscriber)를 통해 Subscriber에게 이벤트가 발생했음을 알리게 된다. subscribe()에서는 subscriber의 onSubscribe를 호출해서 이벤트를 알리는데 이때 Subscription이란 객체를 통해 소모할 이벤트 개수를 제어한다. 즉, Publisher는 자신의 이벤트 컨텍스트와 제어방식을 담은 Subscription을 Subscriber에게 전달하고 이벤트 컨텍스트와 제어방식대로 Subscriber는 스스로 이벤트를 소모한다.

     

    구현

    Publisher & Subscriber

    static 메서드를 통해 Stream 데이터소스를 받은 Publisher는 내부적으로 스트림을 반복하도록 구현했다. Subscriber는 총 4가지 메서드를 구현해야 하는데 그 중 핵심 메서드인 onNext()만 of메서드에서 콜백을 전달받으면 오버라이딩하도록 구현했다. 여기서 onSubscribe()를 보면 내부적으로 subscription을 수행하도록 약속되어 있다. 

     

    중간연산 map()

    Publisher의 default메서드로 map()을 정의했다. 굉장히 헷갈리는데 단계별로 정리해본다.

    - 첫 Pulisher는 of()로 stream을 파라미터로 받아 내부적으로 반복하도록 설계되었다.

    - map()에 콜백을 전달하고 새로운 Publisher를 생산한다.

    - 새로운 Publisher는 일단 첫 Publisher의 stream을 반복하는 subscribe()를 그대로 수행한다.

    - 이때 onNext()를 새로 오버라이딩한 Subscriber를 전달한다.

    - onNext()를 오버라이딩할 때 기존 Subscriber의 onNext()를 그대로 사용하지만 전달할 스트림데이터소스만 콜백을 적용한다. 즉, onNext(func(func(item))) 이런 모양이 최종적으로 됨을 예상할 수 있다.

     

    중간연산 filter

    map()과 기본 컨셉은 동일하다. 단, filter에서는 Predicate콜백을 전달받아 그 값이 true인 경우만 onNext()를 호출하는 구조로 바꾸었다. map()을 정확하게 이해했다면 filter는 금방 이해할 수 있을 것 같다!

    Non-Blocking

    위 예제의 경우 모든 작업이 한 스레드에서 일어난다. 사실상 Stream API와 거의 동일해 보인다. 다만, Subscriber의 작업이 DB통신이나 IO같은 굉장이 오래 걸리는 작업이라면? 아니면, Subscriber에게 subscribe를 호출하기 전 Publisher의 작업이 마찬가지로 오래 걸리는 작업이라면? 비동기 처리를 고려해봐야 한다. 사실 Reactive Stream의 핵심이다.

     

    subscribeOn

    이후 수행할 publisher의 모든 작업을 새로운 스레드에 할당하여 처리하게 한다. publisher에서 아예 새로운 스레드가 생성되기 때문에 일반적으로 publisher의 작업이 느린 경우 사용한다.

     

    publishOn

    publisher에서 새로운 subscriber 작업을 만들고 처리하는 시점에 새로운 스레드를 생성한다. publisher의 작업이 빠르고 subscriber의 작업이 느리다면 이러한 방식을 사용하는 게 자원을 효율적으로 사용할 수 있을 것 같다.

    마치며

    토비님의 유튜브를 보며 reative programming을 처음 공부하는데 익숙지 않아 어려운 부분이 많은 것 같다. 자세한 설명보다는 우선 내가 익숙해지고 공부하는 느낌의 정리이니 혹시 틀린부분이 있으면 알려주시길... 여하튼 좀더 코딩을 하다보면 Netty서버에서 reactive를 좀더 편리하게 사용할 수 있는 날이 오겠지^^ 내용은 계속 추가할 예정이다.

    참고

    https://www.youtube.com/watch?v=8fenTR3KOJo&list=PLv-xDnFD-nnmof-yoZQN8Fs2kVljIuFyC&index=10 https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html

    https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#specification

    댓글

Designed by Tistory.