토비의 봄 TV 스프링 리액티브 프로그래밍(14) - Flux의 특징과 활용방법 정리
2023, Mar 07
강의 주소
요점
- dependency
- reactive-web
Mono
@GetMapping("/event/{id}") Mono<Event> hello(@PathVariable long id) { return Mono.just(new Event(id, "event" + id)); }
Flux
@GetMapping("/events") Flux<Event> events() { return Flux.just(new Event(1L, "event1"), new Event(1L, "event1")); // 여러개의 인자 }
하지만 Mono로도 컬렉션 리턴이 가능하다
@GetMapping("/event/{id}") Mono<List<Event>> hello(@PathVariable long id) { List<Event> list = Arrays.asList(new Event(1L, "event1"), new Event(1L, "event1")); return Mono.just(list); }
컬렉션을 스트림으로 나눠서 보내고 싶다면, 미디어 타입을 스트림으로 지정한다.
@GetMapping(value = "/events_ex1", produces = MediaType.TEXT_EVENT_STREAM_VALUE) Flux<Event> events_ex1() { return Flux.just(new Event(1L, "event1"), new Event(1L, "event1")); }
약간의 시간차를 두고 리턴된다.
jhsim@jhsimui-MacBookPro toby_flux % curl localhost:8083/events_ex1 jhsim@jhsimui-MacBookPro toby_flux % curl localhost:8083/events_ex1 data:{"id":1,"value":"event1"} data:{"id":1,"value":"event1"}
큰 데이터를 스트림을 나눠서 연속으로 내보낼 수 있다.
@GetMapping(value = "/events_ex2", produces = MediaType.TEXT_EVENT_STREAM_VALUE) Flux<Event> events_ex2() { return Flux.fromStream(Stream.generate(() -> new Event(System.currentTimeMillis(), "value"))) .delayElements(Duration.ofSeconds(1)) // 백그라운드 쓰레드를 별도로 만들어서 1초를 물고 있게된다. 메서드 자체가 blocking되는게 아님 .take(10); // 10개의 request가 오면 cancel해서 끊는다. } }
java의 Stream.generate대신에 Flux의 generate사용
@GetMapping(value = "/events_ex3", produces = MediaType.TEXT_EVENT_STREAM_VALUE) Flux<Event> events_ex3() { return Flux // sink는 데이터를 흘러내보내주는 것. next를 통해 다음 데이터를 가져온다. .<Event>generate(sink -> sink.next(new Event(System.currentTimeMillis(), "value"))) .delayElements(Duration.ofSeconds(1)) .take(10); }
또는 초기값을 설정하고 알고리즘에 의해서 생성시키는 방법
@GetMapping(value = "/events_ex4", produces = MediaType.TEXT_EVENT_STREAM_VALUE) Flux<Event> events_ex4() { return Flux .<Event, Long>generate(() -> 1L, (id, sink) -> { // 초기값 1 설정하고 계속 증가시킴 sink.next(new Event(System.currentTimeMillis(), "value" + id)); return id + 1; }) .delayElements(Duration.ofSeconds(1)) .take(10); }
동일한 zip방식
@GetMapping(value = "/events_ex6", produces = MediaType.TEXT_EVENT_STREAM_VALUE) Flux<Event> events_ex6() { Flux<String> es = Flux.generate(sink -> sink.next("Value")); Flux<Long> interval = Flux.interval(Duration.ofSeconds(1)); return Flux.zip(es, interval).map(tu -> new Event(tu.getT2(), tu.getT1())).take(10); }