Server Sent Events with Spring Boot and ReactJS

Spring Jan 16, 2021

Server Sent Events (SSE) is an HTTP standart that provides the capability to servers to push streaming data to client. The flow is unidirectional from server to client and client receives updates when the server pushes some data.

SSE has an EventSource interface with a straightforward API in the client side:

var source = new EventSource('sse-endpoint-address');
source.onmessage = function (event) {
  console.log(event.data);
};

The data sent is always decoded as UTF-8. The server sends the events in the text/event-stream MIME type, and the default event type is a message event. onmessage event handler captures these default messages:

The client API has 3 predefined event handlers:

The server can also send custom event types and in that case client should register event listener for that event:

event: add
data: 73857293

event: remove
data: 2153

event: add
data: 113411
source.addEventListener('add', addHandler, false);
source.addEventListener('remove', removeHandler, false);

Spring MVC Server Sent Events

Spring Boot provides a way to implement SSE by using Flux which is a reactive representation of a stream of events, however in this post I use Spring MVC which provides 3 important classes:

  • ResponseBodyEmitter
  • SseEmitter
  • StreamingResponseBody

ResponseBodyEmitter is a parent class which handles async responses, and SseEmitter is a subclass of ResponseBodyEmitter and provides additional support for Server-Sent Events. Let us see some example implementations with SseEmiter in action.

Pushing Time As a Simple Message Event

We can create a controller in the backend side as follows:

@RestController
public class Controller {

    private static final Logger LOGGER = LoggerFactory.getLogger(Controller.class);
    private final ExecutorService executor = Executors.newSingleThreadExecutor();

    @PostConstruct
    public void init() {
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            executor.shutdown();
            try {
                executor.awaitTermination(1, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                LOGGER.error(e.toString());
            }
        }));
    }

    @GetMapping("/time")
    @CrossOrigin
    public SseEmitter streamDateTime() {

        SseEmitter sseEmitter = new SseEmitter(Long.MAX_VALUE);

        sseEmitter.onCompletion(() -> LOGGER.info("SseEmitter is completed"));

        sseEmitter.onTimeout(() -> LOGGER.info("SseEmitter is timed out"));

        sseEmitter.onError((ex) -> LOGGER.info("SseEmitter got error:", ex));

        executor.execute(() -> {
            for (int i = 0; i < 15; i++) {
                try {
                    sseEmitter.send(LocalDateTime.now().format(DateTimeFormatter.ofPattern("dd-MM-yyyy hh:mm:ss")));
                    sleep(1, sseEmitter);
                } catch (IOException e) {
                    e.printStackTrace();
                    sseEmitter.completeWithError(e);
                }
            }
            sseEmitter.complete();
        });

        LOGGER.info("Controller exits");
        return sseEmitter;
    }

    private void sleep(int seconds, SseEmitter sseEmitter) {
        try {
            Thread.sleep(seconds * 1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
            sseEmitter.completeWithError(e);
        }
    }
}

Note that SseEmitter instance is created and given to the thread pool to be used in async tasks, and also it is returned as the response to the REST call. Async task uses the send method to push data, and since only the data is being provided to the method, it pushes as default message event. So REST call immediately  returns the emitter and the "Controller exits", and whenever something is ready to push executor thread will do that.

At the client side, we can simply create a react project with create-react-app and use the EventSource interface to subscribe to an endpoint as follows:

function App() {

  const [listening, setListening] = useState(false);
  const [data, setData] = useState([]);
  let eventSource = undefined;

  useEffect(() => {
    if (!listening) {
      eventSource = new EventSource("http://localhost:8080/time");

      eventSource.onopen = (event) => {
        console.log("connection opened")
      }

      eventSource.onmessage = (event) => {
        console.log("result", event.data);
        setData(old => [...old, event.data])
      }

      eventSource.onerror = (event) => {
        console.log(event.target.readyState)
        if (event.target.readyState === EventSource.CLOSED) {
          console.log('eventsource closed (' + event.target.readyState + ')')
        }
        eventSource.close();
      }

      setListening(true);
    }

    return () => {
      eventSource.close();
      console.log("eventsource closed")
    }

  }, [])

  return (
    <div className="App">
      <header className="App-header">
        Received Data
        {data.map(d =>
          <span key={d}>{d}</span>
        )}
      </header>
    </div>
  );
}

export default App;

So each received message is pushed to the data array and that array is displayed on the App page:

Pushing Custom Progress Event

This time we can use the SseEventBuilder to push a custom JSON data and give it an event name:

sseEmitter.send(SseEmitter.event().name("Progress").data(progress, MediaType.APPLICATION_JSON));

We will push the instances of following class to represent the progress

@JsonInclude(JsonInclude.Include.NON_EMPTY)
public class ObservableProgress {
    private final int target;
    private final AtomicInteger value = new AtomicInteger(0);

    public ObservableProgress(int target) {
        this.target = target;
    }

    public ObservableProgress increment(int v){
        value.getAndAdd(v);
        return this;
    }

    public int getTarget() {
        return target;
    }

    public int getValue() {
        return value.get();
    }

    @Override
    public String toString() {
        return "ObservableProgress{" +
                "target=" + target +
                ", value=" + value +
                '}';
    }
}

In the controller I want to simulate handling a job with multiple steps and each step is an I/O Blocking operation. Any time we complete a step we push back to the client some progress points. It's implementation is as follows:

@RestController
public class Controller {

    private static final Logger LOGGER = LoggerFactory.getLogger(Controller.class);

    @GetMapping("/run")
    @CrossOrigin
    public SseEmitter doTheJob() {

        SseEmitter sseEmitter = new SseEmitter(Long.MAX_VALUE);

        sseEmitter.onCompletion(() -> LOGGER.info("SseEmitter is completed"));

        sseEmitter.onTimeout(() -> LOGGER.info("SseEmitter is timed out"));

        sseEmitter.onError((ex) -> LOGGER.info("SseEmitter got error:", ex));

        ObservableProgress progress = new ObservableProgress(100);

        runAsync(() -> {
            sleep(1, sseEmitter);
            pushProgress(sseEmitter, progress.increment(10));
        })
        .thenRunAsync(() -> {
            sleep(1, sseEmitter);
            pushProgress(sseEmitter, progress.increment(20));
        })
        .thenRunAsync(() -> {
            sleep(1, sseEmitter);
            pushProgress(sseEmitter, progress.increment(10));
        })
        .thenRunAsync(() -> {
            sleep(1, sseEmitter);
            pushProgress(sseEmitter, progress.increment(20));
        })
        .thenRunAsync(() -> {
            sleep(1, sseEmitter);
            pushProgress(sseEmitter, progress.increment(20));
        })
        .thenRunAsync(() -> {
            sleep(1, sseEmitter);
            pushProgress(sseEmitter, progress.increment(20));
        })
        .thenRunAsync(sseEmitter::complete)
        .exceptionally(ex -> {
            sseEmitter.completeWithError(ex);
            throw (CompletionException) ex;
        });

        LOGGER.info("Controller exits");
        return sseEmitter;
    }

    private void pushProgress(SseEmitter sseEmitter, ObservableProgress progress) {
        try {
            LOGGER.info("Pushing progress: {}", progress.toString());
            sseEmitter.send(SseEmitter.event().name("Progress").data(progress, MediaType.APPLICATION_JSON));
        } catch (IOException e) {
            LOGGER.error("An error occurred while emitting progress.", e);
        }
    }

    private void sleep(int seconds, SseEmitter sseEmitter) {
        try {
            Thread.sleep(seconds * 1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
            sseEmitter.completeWithError(e);
        }
    }
}

Note that I use CompletableFuture to simulate the handling async tasks.

In the client side we need to register a listener for our new custom event type:

eventSource.addEventListener("Progress", (event) => {
        const result = JSON.parse(event.data);
        console.log("received:", result);
        setData(result)
});

So the App.js becomes as follows:

import React, {useEffect, useState} from "react";
import {Card, Progress, Row} from "antd";

function App() {

  const [listening, setListening] = useState(false);
  const [data, setData] = useState({value: 0, target: 100});
  let eventSource = undefined;

  useEffect(() => {
    if (!listening) {
      eventSource = new EventSource("http://localhost:8080/run");

      eventSource.addEventListener("Progress", (event) => {
        const result = JSON.parse(event.data);
        console.log("received:", result);
        setData(result)
      });

      eventSource.onerror = (event) => {
        console.log(event.target.readyState)
        if (event.target.readyState === EventSource.CLOSED) {
          console.log('SSE closed (' + event.target.readyState + ')')
        }
        eventSource.close();
      }

      eventSource.onopen = (event) => {
        console.log("connection opened")
      }
      setListening(true);
    }
    return () => {
      eventSource.close();
      console.log("event closed")
    }

  }, [])

  return (

    <>
      <Card title="Progress Circle">
        <Row justify="center">
          <Progress type="circle" percent={data.value / data.target * 100}/>
        </Row>
      </Card>
      <Card title="Progress Line">
        <Row justify="center">
          <Progress percent={data.value / data.target * 100} />
        </Row>
      </Card>
    </>


  );
}

Note that I use Ant Design Progress component to visualise the progress.

Limitation to the maximum number of open connections

Note that SSE has a drawback when not used over HTTP/2, a browser is not allowed to open more than 6 SSE connections for the same address (www.ex-adress.com). So in chrome and firefox we can maximum open 6 tabs to the same address which opens sse connection. When we are using over HTTP/2 there is no such low number limitation, by default up to 100 connections is allowed.

Server-Sent Events vs. WebSockets

Websockets and SSE (Server-Sent Events) are both capable of pushing data to browsers.

Websockets connections are bidirectional and can send data to the browser and receive data from the browser. The games, messaging apps, and the cases where you need near real-time updates in both directions are good examples for WebSocket usage.

SSE connections are unidirectional and can only push data to the browser. Stock tick data, pushing notifications, twitters updating timeline are good examples of an application that could benefit from SSE.

In practice since everything that can be done with SSE can also be done with Websockets and Websockets provides richer protocol. That's why WebSockets gets more attention and used more widely. However, it can be overkill for some types of applications, and the backend could be easier to implement with a protocol such as SSE.

References:

Using server-sent events - Web APIs | MDN
Developing a web application that uses server-sent events is straightforward. You’ll need a bit of code on the server to stream events to the front-end, but the client side code works almost identically to websockets in part of handling incoming events. This is one-way connection, so you can’t send …
WebSockets vs. Server-Sent events/EventSource
Both WebSockets and Server-Sent Events are capable of pushing data to browsers. To me they seem to be competing technologies. What is the difference between them? When would you choose one over the...
Stream Updates with Server-Sent Events - HTML5 Rocks
The EventSource API is designed for receiving push notifications from a server, removing the need for client-size XHR polling.

Tags