RxJava with Spring MVC

Spring Cloud Netflix includes RxJava.

RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.

Spring Cloud Netflix provides support for returning rx.Single objects from Spring MVC Controllers. It also supports using rx.Observable objects for Server-sent events (SSE). This can be very convenient if your internal APIs are already built using RxJava (see Feign Hystrix Support for examples).

Here are some examples of using rx.Single:

@RequestMapping(method = RequestMethod.GET, value = "/single")
public Single<String> single() {
    return Single.just("single value");
}

@RequestMapping(method = RequestMethod.GET, value = "/singleWithResponse")
public ResponseEntity<Single<String>> singleWithResponse() {
    return new ResponseEntity<>(Single.just("single value"),
            HttpStatus.NOT_FOUND);
}

@RequestMapping(method = RequestMethod.GET, value = "/singleCreatedWithResponse")
public Single<ResponseEntity<String>> singleOuterWithResponse() {
    return Single.just(new ResponseEntity<>("single value", HttpStatus.CREATED));
}

@RequestMapping(method = RequestMethod.GET, value = "/throw")
public Single<Object> error() {
    return Single.error(new RuntimeException("Unexpected"));
}

If you have an Observable, rather than a single, you can use .toSingle() or .toList().toSingle(). Here are some examples:

@RequestMapping(method = RequestMethod.GET, value = "/single")
public Single<String> single() {
    return Observable.just("single value").toSingle();
}

@RequestMapping(method = RequestMethod.GET, value = "/multiple")
public Single<List<String>> multiple() {
    return Observable.just("multiple", "values").toList().toSingle();
}

@RequestMapping(method = RequestMethod.GET, value = "/responseWithObservable")
public ResponseEntity<Single<String>> responseWithObservable() {

    Observable<String> observable = Observable.just("single value");
    HttpHeaders headers = new HttpHeaders();
    headers.setContentType(APPLICATION_JSON_UTF8);
    return new ResponseEntity<>(observable.toSingle(), headers, HttpStatus.CREATED);
}

@RequestMapping(method = RequestMethod.GET, value = "/timeout")
public Observable<String> timeout() {
    return Observable.timer(1, TimeUnit.MINUTES).map(new Func1<Long, String>() {
        @Override
        public String call(Long aLong) {
            return "single value";
        }
    });
}

If you have a streaming endpoint and client, SSE could be an option. To convert rx.Observable to a Spring SseEmitter use RxResponse.sse(). Here are some examples:

@RequestMapping(method = RequestMethod.GET, value = "/sse")
public SseEmitter single() {
    return RxResponse.sse(Observable.just("single value"));
}

@RequestMapping(method = RequestMethod.GET, value = "/messages")
public SseEmitter messages() {
    return RxResponse.sse(Observable.just("message 1", "message 2", "message 3"));
}

@RequestMapping(method = RequestMethod.GET, value = "/events")
public SseEmitter event() {
    return RxResponse.sse(APPLICATION_JSON_UTF8,
            Observable.just(new EventDto("Spring io", getDate(2016, 5, 19)),
                    new EventDto("SpringOnePlatform", getDate(2016, 8, 1))));
}

results matching ""

    No results matching ""