A simple example of using Redis PubSub and Spring Reactive Server Side Events for real-time push events to the browser
Spring Boot v2.0.3.RELEASE
Scenario and design choices
The scenario described in this article is depicted below:
The endgame is to have events published by some 3rd party process to a Redis PubSub channel. Spring is configured to subscribe to this channel, and convert any received events to HTTP Server-sent Events [SSE] which are subsequently sent to the client browser.
Even for such a simple scenario, some planning is required. There are (almost too many) options on how to go about this. For example, you could use Spring Data Redis to interface with redis, rather than directly using the jedis library as I did. You could also go full blown reactive and use the lettuce redis driver instead, which supports reactive programming. See which works best for you, and most importantly – which is easier to maintain and understand.
As always, the Spring Framework makes it a breeze to get a functioning reactive webstack up and running.
DefaultRouter.kt defines a single route (GET /sse) which is handled by the sseEvents method defined in homeHandler.kt.
In HomeHandler.kt, note:
- Line 22 : autowire a service (sseService) of class ServerSideEvents. This service is subsequently used in line 25.
- Line 26: kept here for demonstrative purpose to show how you can access information from the request – which can be used in operations such as filtering the SSE depending on the client.
- Line 27: the use of “bodyToServerSentEvents” is absolutely necessary of course.
Below is the definition of the ServerSideEvents class which is autowired as a service in our handler above.
- Lines 10-27: Following the Jedis documentation, the “MyListener” class is used to subscribe to a redis PubSub channel, which is self-explanatory. For the purposes of this article we only implement the “onMessage” function which is triggered every time a message or event is received on the channel we subscribed to. In our example we subscribe to the channel “foo” on line 42.
Note how we pass a FluxSink into the MyListener class. Whenever a message is received, the message is sent into the flux in line 15
- A Flux is a stream of events (typically more than 1 event). We wanted to create a custom Flux which accepts messages received from the Redis channel, so this custom Flux was created in Line 37. Note that our flux object is defined as a companion object (or static object in Java terminology), so that it can be shared across multiple clients. The custom flux is created by passing a FluxSink object into a lambda function, which in turn passes this FluxSink object into our “onMessage” implementation noted above.
- Note the use of flux.share() in line 54. The share method of a flux “clones” the original flux and stores the new coned flux into the static cloneFlux object. In the getSSE() function that is used by our handler above, it is this coned flux that is returned rather than the original flux. This was done to avoid problems that arose when more than one client connected to the server at the same time.
This is an important point. Whenever a new client connects to Spring Webflux, the server attempts to create a new flux object which is then sent to the client. Several problems can arise from this. For example, since the flux is a static object meant to be shared between clients, creating a new one every time a client connects will impact the other, already connected clients. In addition, when that client then leaves, the flux is cancelled and discarded. This was causing problems to other clients that were still connected since the flux that was subscribed to the PubSub channel was being cancelled and all clients’ connections were being severed. To avoid this and other simultaneous access problem, instead a clone flux is passed to the client
- Redis PubSub using Jedis (Documentation)
- This article proved very useful in learning how to program a “flux sink”
- This article was useful to understand Spring + SSE in general – though it uses annotation-style programming while we used functional style here.
- Screencast on Spring + SSE