Server Sent Events with Spring Boot and ReactJS
Server Sent Events (SSE) is an HTTP standart that provides the capability to servers to push streaming data to client. The flow is unidirectional from server to client and client receives updates when the server pushes some data.
SSE has an EventSource interface with a straightforward API in the client side:
var source = new EventSource('sse-endpoint-address');
source.onmessage = function (event) {
console.log(event.data);
};
The data sent is always decoded as UTF-8. The server sends the events in the text/event-stream
MIME type, and the default event type is a message
event. onmessage
event handler captures these default messages:
The client API has 3 predefined event handlers:
The server can also send custom event types and in that case client should register event listener for that event:
event: add
data: 73857293
event: remove
data: 2153
event: add
data: 113411
source.addEventListener('add', addHandler, false);
source.addEventListener('remove', removeHandler, false);
Spring MVC Server Sent Events
Spring Boot provides a way to implement SSE by using Flux which is a reactive representation of a stream of events, however in this post I use Spring MVC which provides 3 important classes:
- ResponseBodyEmitter
- SseEmitter
- StreamingResponseBody
ResponseBodyEmitter
is a parent class which handles async responses, and SseEmitter
is a subclass of ResponseBodyEmitter
and provides additional support for Server-Sent Events. Let us see some example implementations with SseEmiter
in action.
Pushing Time As a Simple Message Event
We can create a controller in the backend side as follows:
@RestController
public class Controller {
private static final Logger LOGGER = LoggerFactory.getLogger(Controller.class);
private final ExecutorService executor = Executors.newSingleThreadExecutor();
@PostConstruct
public void init() {
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
executor.shutdown();
try {
executor.awaitTermination(1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
LOGGER.error(e.toString());
}
}));
}
@GetMapping("/time")
@CrossOrigin
public SseEmitter streamDateTime() {
SseEmitter sseEmitter = new SseEmitter(Long.MAX_VALUE);
sseEmitter.onCompletion(() -> LOGGER.info("SseEmitter is completed"));
sseEmitter.onTimeout(() -> LOGGER.info("SseEmitter is timed out"));
sseEmitter.onError((ex) -> LOGGER.info("SseEmitter got error:", ex));
executor.execute(() -> {
for (int i = 0; i < 15; i++) {
try {
sseEmitter.send(LocalDateTime.now().format(DateTimeFormatter.ofPattern("dd-MM-yyyy hh:mm:ss")));
sleep(1, sseEmitter);
} catch (IOException e) {
e.printStackTrace();
sseEmitter.completeWithError(e);
}
}
sseEmitter.complete();
});
LOGGER.info("Controller exits");
return sseEmitter;
}
private void sleep(int seconds, SseEmitter sseEmitter) {
try {
Thread.sleep(seconds * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
sseEmitter.completeWithError(e);
}
}
}
Note that SseEmitter
instance is created and given to the thread pool to be used in async tasks, and also it is returned as the response to the REST call. Async task uses the send
method to push data, and since only the data is being provided to the method, it pushes as default message
event. So REST call immediately returns the emitter and the "Controller exits", and whenever something is ready to push executor thread will do that.
At the client side, we can simply create a react project with create-react-app
and use the EventSource
interface to subscribe to an endpoint as follows:
function App() {
const [listening, setListening] = useState(false);
const [data, setData] = useState([]);
let eventSource = undefined;
useEffect(() => {
if (!listening) {
eventSource = new EventSource("http://localhost:8080/time");
eventSource.onopen = (event) => {
console.log("connection opened")
}
eventSource.onmessage = (event) => {
console.log("result", event.data);
setData(old => [...old, event.data])
}
eventSource.onerror = (event) => {
console.log(event.target.readyState)
if (event.target.readyState === EventSource.CLOSED) {
console.log('eventsource closed (' + event.target.readyState + ')')
}
eventSource.close();
}
setListening(true);
}
return () => {
eventSource.close();
console.log("eventsource closed")
}
}, [])
return (
<div className="App">
<header className="App-header">
Received Data
{data.map(d =>
<span key={d}>{d}</span>
)}
</header>
</div>
);
}
export default App;
So each received message is pushed to the data array and that array is displayed on the App page:
Pushing Custom Progress Event
This time we can use the SseEventBuilder
to push a custom JSON data and give it an event name:
sseEmitter.send(SseEmitter.event().name("Progress").data(progress, MediaType.APPLICATION_JSON));
We will push the instances of following class to represent the progress
@JsonInclude(JsonInclude.Include.NON_EMPTY)
public class ObservableProgress {
private final int target;
private final AtomicInteger value = new AtomicInteger(0);
public ObservableProgress(int target) {
this.target = target;
}
public ObservableProgress increment(int v){
value.getAndAdd(v);
return this;
}
public int getTarget() {
return target;
}
public int getValue() {
return value.get();
}
@Override
public String toString() {
return "ObservableProgress{" +
"target=" + target +
", value=" + value +
'}';
}
}
In the controller I want to simulate handling a job with multiple steps and each step is an I/O Blocking operation. Any time we complete a step we push back to the client some progress points. It's implementation is as follows:
@RestController
public class Controller {
private static final Logger LOGGER = LoggerFactory.getLogger(Controller.class);
@GetMapping("/run")
@CrossOrigin
public SseEmitter doTheJob() {
SseEmitter sseEmitter = new SseEmitter(Long.MAX_VALUE);
sseEmitter.onCompletion(() -> LOGGER.info("SseEmitter is completed"));
sseEmitter.onTimeout(() -> LOGGER.info("SseEmitter is timed out"));
sseEmitter.onError((ex) -> LOGGER.info("SseEmitter got error:", ex));
ObservableProgress progress = new ObservableProgress(100);
runAsync(() -> {
sleep(1, sseEmitter);
pushProgress(sseEmitter, progress.increment(10));
})
.thenRunAsync(() -> {
sleep(1, sseEmitter);
pushProgress(sseEmitter, progress.increment(20));
})
.thenRunAsync(() -> {
sleep(1, sseEmitter);
pushProgress(sseEmitter, progress.increment(10));
})
.thenRunAsync(() -> {
sleep(1, sseEmitter);
pushProgress(sseEmitter, progress.increment(20));
})
.thenRunAsync(() -> {
sleep(1, sseEmitter);
pushProgress(sseEmitter, progress.increment(20));
})
.thenRunAsync(() -> {
sleep(1, sseEmitter);
pushProgress(sseEmitter, progress.increment(20));
})
.thenRunAsync(sseEmitter::complete)
.exceptionally(ex -> {
sseEmitter.completeWithError(ex);
throw (CompletionException) ex;
});
LOGGER.info("Controller exits");
return sseEmitter;
}
private void pushProgress(SseEmitter sseEmitter, ObservableProgress progress) {
try {
LOGGER.info("Pushing progress: {}", progress.toString());
sseEmitter.send(SseEmitter.event().name("Progress").data(progress, MediaType.APPLICATION_JSON));
} catch (IOException e) {
LOGGER.error("An error occurred while emitting progress.", e);
}
}
private void sleep(int seconds, SseEmitter sseEmitter) {
try {
Thread.sleep(seconds * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
sseEmitter.completeWithError(e);
}
}
}
Note that I use CompletableFuture
to simulate the handling async tasks.
In the client side we need to register a listener for our new custom event type:
eventSource.addEventListener("Progress", (event) => {
const result = JSON.parse(event.data);
console.log("received:", result);
setData(result)
});
So the App.js
becomes as follows:
import React, {useEffect, useState} from "react";
import {Card, Progress, Row} from "antd";
function App() {
const [listening, setListening] = useState(false);
const [data, setData] = useState({value: 0, target: 100});
let eventSource = undefined;
useEffect(() => {
if (!listening) {
eventSource = new EventSource("http://localhost:8080/run");
eventSource.addEventListener("Progress", (event) => {
const result = JSON.parse(event.data);
console.log("received:", result);
setData(result)
});
eventSource.onerror = (event) => {
console.log(event.target.readyState)
if (event.target.readyState === EventSource.CLOSED) {
console.log('SSE closed (' + event.target.readyState + ')')
}
eventSource.close();
}
eventSource.onopen = (event) => {
console.log("connection opened")
}
setListening(true);
}
return () => {
eventSource.close();
console.log("event closed")
}
}, [])
return (
<>
<Card title="Progress Circle">
<Row justify="center">
<Progress type="circle" percent={data.value / data.target * 100}/>
</Row>
</Card>
<Card title="Progress Line">
<Row justify="center">
<Progress percent={data.value / data.target * 100} />
</Row>
</Card>
</>
);
}
Note that I use Ant Design
Progress component to visualise the progress.
Limitation to the maximum number of open connections
Note that SSE has a drawback when not used over HTTP/2, a browser is not allowed to open more than 6 SSE connections for the same address (www.ex-adress.com). So in chrome and firefox we can maximum open 6 tabs to the same address which opens sse connection. When we are using over HTTP/2 there is no such low number limitation, by default up to 100 connections is allowed.
Server-Sent Events vs. WebSockets
Websockets and SSE (Server-Sent Events) are both capable of pushing data to browsers.
Websockets connections are bidirectional and can send data to the browser and receive data from the browser. The games, messaging apps, and the cases where you need near real-time updates in both directions are good examples for WebSocket usage.
SSE connections are unidirectional and can only push data to the browser. Stock tick data, pushing notifications, twitters updating timeline are good examples of an application that could benefit from SSE.
In practice since everything that can be done with SSE can also be done with Websockets and Websockets provides richer protocol. That's why WebSockets gets more attention and used more widely. However, it can be overkill for some types of applications, and the backend could be easier to implement with a protocol such as SSE.
References: