Quick Introduction
One of the new features in JAX-RS 2.1 is Server Side Events. This allows the server to “push” events over to clients that have connected to the server via HTTP(S).
Typically HTTP requests involve a single request, a single response, and then you’re done – for example:
$ curl -v http://localhost:8080/ping * Trying 127.0.0.1... * TCP_NODELAY set * Connected to localhost (127.0.0.1) port 8080 (#0) > GET /ping HTTP/1.1 > Host: localhost:8080 > User-Agent: curl/7.61.1 > Accept: */* > < HTTP/1.1 200 < Date: Mon, 14 Dec 2020 17:04:36 GMT < Content-Type: application/octet-stream < Content-Length: 12 < Server: Apache TomEE < * Connection #0 to host localhost left intact Hello, world
You can see the example above returns a Content-Length header, telling the user-agent (curl, in this example) to expect a 12-byte response. Once that has been received, the connection is closed.
By contrast, Server Side Events creates a connection, which stays open, and new events are flushed to the stream:
$ curl -v http://localhost:8080/rest-sse-example/jms * Trying 127.0.0.1... * TCP_NODELAY set * Connected to localhost (127.0.0.1) port 8080 (#0) > GET /rest-sse-example/jms HTTP/1.1 > Host: localhost:8080 > User-Agent: curl/7.61.1 > Accept: */* > < HTTP/1.1 200 < Accept: */* < host: localhost:8080 < user-agent: curl/7.61.1 < Content-Type: text/event-stream < Transfer-Encoding: chunked < Date: Mon, 14 Dec 2020 17:14:12 GMT < Server: Apache TomEE < data: {"JMSDeliveryMode":2,"JMSDeliveryTime":0,"JMSDestination":{"properties":{"physicalName":"EVENT"},"reference":{"all":{},"className":"org.apache.activemq.command.ActiveMQTopic","factoryClassName":"org.apache.activemq.jndi.JNDIReferenceFactory"},"DLQ":false,"composite":false,"destinationTypeAsString":"Topic","marshallAware":false,"pattern":false,"physicalName":"EVENT","qualifiedName":"topic://EVENT","queue":false,"temporary":false,"dataStructureType":101,"destinationType":2,"topic":true,"topicName":"EVENT"},"JMSExpiration":0,"JMSMessageID":"ID:a-31oi1kgkz4qt5-38337-1607966052153-1:1:1:1:1","JMSPriority":4,"JMSRedelivered":false,"JMSTimestamp":1607966052334,"propertyNames":{},"text":"test message: 0"}
Here, the response stream is kept open until the client closes it. Notice the Content-Type and Transfer-Encoding headers on the response. This is telling the user-agent to expect an event stream, and the data will continue to be sent, in chunks.
The two events shown here were sent with a few seconds between them.
Let’s take a look at the components that make up an SSE service on the server side and build a simple service.
SseEventSink
This interface provides the method to send events from the server to the client. An instance of this interface is acquired through a context method parameter:
@GET
@Path("api/events")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void testMessages(final @Context SseEventSink sink) throws Exception {
for (int i = 0; i < 10; i++) {
sink.send(builder.data(String.class, "Test message " + i).build());
Thread.sleep(5000);
}
}
The SseEventSink.send() call sends an event from the server back to the client. A call to http://localhost:8080/api/events
above will send 10 simple plain text events back to the client, each 5 seconds apart.
The send() method takes an OutboundSseEvent object, which we need to build. That brings us nicely on to…
OutboundSseEvent.Builder
An implementation of this interface is responsible for creating OutboundSseEvents for us. You can use serializable Java objects as the payload you wish to send. These payloads are serialized in the same way that they would be if you were returning the data from a regular JAX-RS call. If you have used JAX-RS’s ResponseBuilder before, the syntax will be quite familiar – essentially, you need to provide a Content-Type, a Java type, and a payload. For example:
public static class MyEvent {
private long timestamp;
private String message;
public MyEvent() {
}
public MyEvent(long timestamp, String message) {
this.timestamp = timestamp;
this.message = message;
}
public long getTimestamp() {
return timestamp;
}
public void setTimestamp(long timestamp) {
this.timestamp = timestamp;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
}
@GET
@Path("api/events")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void testMessages(final @Context SseEventSink sink) throws Exception {
final MyEvent data = new MyEvent(new Date().getTime(), "Test message");
final OutboundSseEvent outboundSseEvent = builder
.data(MyEvent.class, data)
.mediaType(MediaType.APPLICATION_JSON_TYPE)
.build();
sink.send(outboundSseEvent);
}
Note the payload and media type highlighted in bold.
You might be wondering how to obtain a builder object. This can be acquired from the container, by injecting a context Sse object (below) and calling its newEventBuilder() method.
Sse
An instance of the Sse interface can be injected into your JAX-RS class by using a field or setter annotated with @Context.
@Context
public void setSse(final Sse sse) {
this.broadcaster = sse.newBroadcaster();
this.builder = sse.newEventBuilder();
}
This interface provides a simple to use entry-point to obtain an event builder (above), or a broadcaster (more on that below). Once you have a Sse object, call newEventBuilder() or newBroadcaster() methods accordingly.
SseBroadcaster
So far, we’ve looked at sending events from the server for a single request. By calling SseEventSink.send(), we can send an event to the single connection that the sink is associated with.
So, how do we handle the case where we have an event occur in the server, and we want to notify all the current listening connections? Enter, SseBroadcaster!
The SseBroadcaster can be obtained by calling newBroadcaster() on the injected Sse object. You’ll want to save the broadcaster to a field, as Sse.newBroadcaster() will give you a new broadcaster object each time, which is unlikely to be what you want.
Once you have a broadcaster, you can “register” as many event sinks with it as you wish. Any events sent through the broadcaster will be push through each event sink.
Bringing it all together
The Server Side Event functionality is available in Apache TomEE 8.0.x snapshots now and will be available in the 8.0.6 release, coming very shortly. I have added an example: rest-sse-example. This simple, 2 class, 40 line program demonstrates how to broadcast messages sent to a JMS topic to every connection to a JAX-RS SSE Endpoint.
We have a simple message-driven bean listening to the JMS Topic, and sending a CDI event for every message received:
@MessageDriven(activationConfig = {
@ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Topic"),
@ActivationConfigProperty(propertyName = "destination", propertyValue = "EVENT"),
})
public class TopicListener implements MessageListener {
@Inject
private Event messageReceivedEvent;
@Override
public void onMessage(final Message message) {
messageReceivedEvent.fire(message);
}
}
The JAX-RS endpoint is defined as a @Singleton (javax.inject.Singleton as opposed to an EJB Singleton), and uses @Observes to react to CDI events. The observer method pushes the JMS message to every active SseEventSink by calling send() on the SseBroadcaster.
@Path("jms")
@Singleton
public class JmsResource {
private SseBroadcaster broadcaster;
private OutboundSseEvent.Builder builder;
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
public void messages(final @Context SseEventSink sink) {
broadcaster.register(sink);
}
@Context
public void setSse(final Sse sse) {
this.broadcaster = sse.newBroadcaster();
this.builder = sse.newEventBuilder();
}
public void onMessage(final @Observes Message message) {
if (broadcaster == null) {
return;
}
broadcaster.broadcast(builder.data(Message.class, message).mediaType(MediaType.APPLICATION_JSON_TYPE).build());
}
}
Consuming the event stream
There are 2 methods to consume the event stream that we’ll cover in this article – JAX-RS SSE Client, and EventSource in Javascript.
EventSource
EventSource in Javascript in the browser is straightforward to use. There are no additional scripts or libraries to include.
Simply create the event source object, passing in the path to the JAX-RS SSE endpoint (in my case, my page is http://localhost:8080/index.html
, and my SSE endpoint is http://localhost:8080/jms
, so I simply use:
var es = new EventSource('jms');
EventSource provides a number of callbacks. (see: https://developer.mozilla.org/en-US/docs/Web/API/EventSource)
- onmessage
- onerror
- onopen
Adding a callback to process each event as it is received can be done as shown below.
es.onmessage = function(e) {
// process the event here – e.data
}
For example, to add the event as text to a DOM element could be done as follows:
var es = new EventSource('jms');
var messages = document.getElementById('messages');
es.onmessage = function(e) {
var newElement = document.createElement("p");
newElement.textContent = e.data;
messages.appendChild(newElement);
}
SseEventSource
JAX-RS SSE also provides a means for a JAX-RS client to consume server side events.
After constructing a client and target in the usual way with JAX-RS Client, build a SseEventSource.
The SseEventSource object provides a ‘register’ method which takes a Consumer to process InboundSseEvent objects. Using a lambda (as shown below) provides a simple way to process events.
final Client client = ClientBuilder.newClient();
final WebTarget target = client.target("http://localhost:" + port + "/jms");
final SseEventSource source = SseEventSource
.target(target)
.reconnectingEvery(500, TimeUnit.MILLISECONDS)
.build();
source.register((inboundSseEvent) -> {
final Message message = inboundSseEvent.readData(Message.class);
messages.add(message);
});
source.open();
I had to specify @Observes(notifyObserver = Reception.IF_EXISTS) because the JmsResource singleton bean could be initialized when a JMS message is consumed (where no RequestContext exists at the time but is needed to correctly inject the Sse instance).