본문 바로가기
일 그리고 컴퓨터

반응형 프로그래밍(Reactive Programming)

by 취미부자현니 2024. 9. 10.

 

사전 용어 정리

  • Blocking(블로킹), Non-blocking(논-블로킹)
  • Synchronous(동기), Asynchronous(비동기)
  • Blocking system : 1개의 쓰레드가 1개의 요청을 처리함(Thread pool을 생성해서 요청마다 thread가 처리함, thread pool보다 많은 요청이 들어오는 경우 thread pool hell 현상 발생), sync + blocking, 명령형 프로그래밍, JDBC JPA 네트워킹 지원
  • Non-blocking system : 1개의 쓰레드가 여러개의 요청을 처리함(요청을 핸들러에게 처리를 위임하고 완료되면 callback 메소드를 통해 응답받음), async + non-blocking, 반응형 프로그래밍, Reactor RxJava 지원

Reactive manifesto

  • 여기서 reactive란?
    • 주변의 변화하는 모든것에 대해 반응하는 것을 의미한다.
    • 데이터가 변경 될 때마다 이벤트를 발생시켜 데이터를 계속적으로 전달
    • 선언형 프로그래밍 : 단순히 목표를 선언
    • 반대로는 명령형 프로그래밍 : 실행할 동작을 구체적으로 명시하는 명령형 프로그래밍
    • 네트워크 컴포넌트는 I/O 이벤트에 반응
    • UI 컨트롤러가 마우스 이벤트에 반응
    • 같은 맥락으로 non-blocking = reactive, 수행이 완료되거나 데이터가 준비되었다는 알림에 반응하는 것
    • Non-blocking은 이벤트 속도를 제어하여 생산자가 목적지를 지나가지 않게 하는 것이 중요하다. (Backpressure)
  • 소프트웨어 아키텍쳐에 대한 선언문
  • Reactive system의 특성을 강조하고 구축에 필요한 가이드라인 제공
  • 4 가지의 핵심 가치를 제시
    • Responsive(응답성) : 가능한 즉각적으로 응답하는 것
    • Elastic(유연성) : 작업량이 변화하더라도 응답성을 유지하는 것
    • Resilient(탄력성) : 장애에 직면하더라도 응답성을 유지하는 것
    • Message Driven(메시지 기반) : 비동기 메시지 전달에 의존하여 구성 요소 사이에서 느슨한 결합, 격리, 위치 투명성을 보장

Reactive Streams

구조

  • Reactive Streams : 표준화된 라이브러리가 인터페이스 형태로 스펙이 정의되어 있음
    • Publisher-Subscriber 혹은 Producer-Consumer 모델이 기초구성임
    • Publisher : 데이터를 생성하는 컴포넌트
      • Hot publisher : subscriber가 없어도 데이터를 생성 (sns)
      • Cold publisher : subscribe가 시작되는 순간부터 데이터를 생성 후 전송
    • Subscriber : 데이터를 받아서 처리하는 컴포넌트
    • Subscription : 데이터의 흐름을 조절 (request, cancel)
    • 동작 순서
      1. Subscriber → Publisher : 구독요청
      1. Publisher → Subscriber : Subscription 정보를 전달, subscription이 매개체
      1. Subcriber → Subscription → Publisher : subscription의 request 함수를 통해 publisher에게 요청
      1. Publisher → Subscription → Subscriber : subscription을 통해 시그널(onNext 여러번, onComplete, onError 딱 한번)을 전달

구현체

  • Reactive Java Libraries
    • Project Reactor, RxJava, Mutiny

Project Reactor

  • Pivotal사에서 개발
  • Spring reactor에서 사용
  • Mono와 Flux publisher를 제공
  • Mono : 0~1개의 데이터를 전달
Mono<String> mono = Mono.just("Hyunnyi");
Mono<Object> monoEmpty = Mono.empty();
Mono<Object> monoError = Mono.error(new Exception());
  • Flux : 0~N개의 데이터를 전달
    Flux<Integer> flux = Flux.just(1, 2, 3, 4);
    Flux<String> fluxString = Flux.fromArray(new String[]{"A", "B", "C"});
    Flux<String> fluxIterable = Flux.fromIterable(Arrays.asList("A", "B", "C"));
    Flux<Integer> fluxRange = Flux.range(2, 5);
    Flux<Long> fluxLong = Flux.interval(Duration.ofSeconds(10));
  • Publisher는 subscribe( )가 호출되기 전까지는 아무런 동작을 하지 않음
  • Backpressure (배압) : 스트림 요소의 전송을 조절하는 방법 (수신자가 소비할 수 있는 요소 수를 제어), “publisher가 생성하는 데이터의 속도 > subscriber가 처리하는 속도”인 경우에 시스템이 무너지는 결과가 발생함
    1. Buffer : 처리하지 못한 데이터들은 버퍼에 임시 저장함. Out Of Memory가 발생할 수 있다는 단점이 존재
    2. Drop : 처리하지 못한 데이터들은 전부 삭제함. 서버는 안정적으로 유지가 가능함
    3. Control : subscriber가 처리가능한 속도로 데이터를 생성하도록 publisher의 속도를 제어함
  • .log( ) 매소드를 이용하면 콘솔 로그에서 스트림 신호들을 확인할 수 있음

WebFlux란?

  • Spring 5, Spring boot 2에서 새롭게 추가된 모듈
  • Non-blocking reactive stream을 지원하는 Reactive-Stack의 웹 프레임워크
  • 은 양의 쓰레드와 최소한의 하드웨어 자원을 이용해 non-blocking 방식으로 web stack을 동시에 처리하기 위해 개발됨
  • Java 5에서의 RestController와 unit test, Java 8에서의 함수형 API를 위한 람다식이 non-blocking 어플리케이션 API가 토대가 됨.
  • Spring stack 비교
    • Spring servlet stack
      • Thread-per-request 모델 사용, 용청이 들어올때마다 쓰레드 할당
      • SecurityContext를 ThreadLocal에 저장
      • Thread당 하나의 db 요청을 처리
      • Thread pool을 안쓴다면? → thread 생성, 제거 비용이 더 비쌈, thread 마다 스택 메모리 할당으로 메모리 부족 문제 발생, 더 많은 thread가 cpu 점유를 위해 context switching 발생
    • Spring reactive stack
      • 비동기 이벤트 기반의 네트워크 어플리케이션 프레임워크인 netty 사용
      • Netty와 같은 서버들은 non-blocking, asynchronous로 잘 구현되어 있지만 servlet의 sync 혹은 blocking으로 동작하는 함수들에 의해 그 성능을 다 내지 못하여 필요성이 대두됨
        • Non-blocking 서버는 요청을 channel을 통해서 받음
        • channel에서 발생한 이벤트는 이벤트 루프 객체 안에있는 이벤트 큐에 쌓아놓음
        • 쓰레드에서 이벤트 루프라는 무한루프들 돌리며 이벤트 큐에 쌓인 이벤트를 처리함
        • 처리 결과를 콜백 혹은 퓨처 패턴으로 돌려줌
          • 퓨처 패턴 : 매서드의 결고하가 올 때 까지 블로킹하고 처리가 완료되면 결과를 반환한다.
          • 이벤트 방식의 리스너에 담아서 사용하면 비동기 제어도 가능함
  • WebFluxConfigurer를 통해 설정 가능
    • Conversion, formatting : 숫자, 날짜 포맷터와 컨버터 커스텀
    • Validation : 빈 검증
    • Content Type Resolvers : 요청된 미디어 타입 결정방법
    • HTTP message codecs : 요청과 응답 본문을 읽고 쓰는 방식
    • View Resolvers : 스프링 프레임워크와 통합된 뷰 기술 등록
    • Path Matching : 경로 매칭 옵션 커스텀 (접미사 매칭을 지원하지 않음)

예제코드

 

 

비동기 HTTP 요청 (WebClient)

  • RestTemplate은 non-blocking과 비동기로 사용이 불가능
    • AsyncRestTemplate이 있었지만 현재 deprecatd 상태임
  • 비동기 Http 요청을 보내기 위해선 WebClient를 사용해야함
    • body만 받을 경우 .retrieve() 사용
    • response의 상태값, 헤더를 가지고 오려면 exchangeToMono() or exchangeToFlux() 사용
  • 예제코드

 

속도 비교 (API 1000번 호출)

WebClient - WebFlux - R2DBC - MySql

  • get : 297ms
  • post : 280ms

 

 

RestTemplate - MVC - JDBC - MySql

  • get : 3361ms
  • post : 312262ms

 

Error Handling

  • Reactive streams에서 onError 이벤트 발생 시 더 이상 onNext, onComplete 이벤트를 생산하지 않고 종료함
  • Reactor에서 onError 이벤트가 발생하면 onError 이벤트를 아래로 전파
  • onError 이벤츠를 처리하기 위해서
    • handling이 없는 경우

  • consumer를 이용하여 처리 (errorConsumer)

  • 고정된 값 반환 (onErrorReturn)
    • 함수를 사용하는 경우에는 원하는 결과가 나오지 않을 수 있음

  • publisher를 반환 (onErrorResume)

  • onComplete 이벤트로 변경 (onErrorComplete)

 

  • 다른 에러로 변환 (onErrorMap)

  • 해당 데이터만 건너뛰고 동작 (onErrorContinue)

 

  • 에러일때 동작을 하고 에러 반환 (doOnError)

 

 

개발 시 주의점

  • 동기, 블로킹 코드가 들어가면 WebFlux로 개발한 의미가 없어짐
  • DB가 blocking이라면 WebFlux를 사용할 이유가 없음
  • Reactive를 지원하는 DB를 사용해야 함
  • .log( ) 를 과도하게 사용하면 속도가 느려지며, 메모리를 많이 사용함
  • .map( ) 은 내부 로직에 의해 결과가 반환되며 동기로 동작함, 많이 조합하면 gc발생
  • .flatMap( ) 은 연산결과를 publisher로 반환하며 비동기로 동작함

DB

  • Asynchronous Non-blocking I/O방식을 지원하는 DB를 사용해야 장점이 있음
  • R2DBC(Reactive Relational Database Connectivity)라는 드라이버가 나와서 java에서도 비동기 논블로킹 방식으로 DB 사용이 가능해짐
  • 사용 가능한 DB
    • H2, MariaDB, MySQL, Postgres, Oracle, MS SQL, jasync sql, MySQL, Mongo DB, Redis 등

참고링크

반응형