05 June 2023 ~ 15 min read

Building a robust WebSocket interface using RxJS


Building a robust WebSocket interface using RxJS

This post outlines my approach to building a robust WebSocket interface to allow real-time communication with my project's back-end.

Many Angular apps use the traditional HttpClient to talk to a back-end, typically using REST endpoints. However, there are times when you need to communicate with a back-end in real-time, and this is where WebSockets come in.

WebSockets allow for real-time updates, without using polling.

Most applications will use a combination of REST over HTTP and WebSockets, depending on the use case. WebSockets usually don't replace REST, but rather complement it.

Demo project

I've put together a demo project that replicates the use cases from my current project - converting a fairly major AngularJS application into modern Angular.

In this project, my components can subscribe to "notifications" generated by the back-end. These notifications are tied to the user's session.

My project makes use of a shared WebSocket which sends subscription requests to the server, and listens for notification events, which are piped back to the subscribing component.

My project also makes use of traditional REST. Some requests, or other user activity, will trigger notifications to be sent over the WebSocket to subscribing components.

NestJS has entered the room!

While my real current project doesn't use a Node-based back-end, for the demo project I chose NestJS as the back-end framework.

NestJS provides a fantastic framework for building back-ends, and easily supports both traditional HTTP endpoints and WebSockets. It's ideal for Angular developers as many of the concepts are similar to Angular - concepts such as services and modules, and the use of decorators.

Looking around the for examples of using NestJS with WebSockets, I found many demo apps that implement a chat server - where users can exchange messages in real time. All of these examples are based on socket.io, which is a protocol that sits on top of WebSockets.

As my real production project uses raw WebSockets, I wanted to replicate this, so chose raw WebSockets in NestJS as well. NestJS allows a choice of either raw WebSockets or socket.io, but I needed to delve into the WebSocketGateway source code in order to figure a couple of things out.

Angular front-end

Of course, I used Angular for the front-end. As with all of my Angular projects, I got a great start by using Nx to generate the project and build it out.

The demo front-end project showcases connecting to the back-end web socket, and implements a simple chat messaging service (just like everybody else does!)

Angular WebSocket choices

There are a few choices for implementing WebSockets in Angular. My old AngularJS project just used the native browser WebSocket, but for my Angular project, I really wanted to easily use RxJS, and luckily, RxJS comes to the rescue.

RxJS has a specialised Subject - the webSocket.

This is a wrapper around the native WebSocket, and provides a stream of messages from the WebSocket.

Making it robust

While the webSocket Subject is great, it doesn't provide any native error handling. When the connection drops, the Subject will complete and need to be re-connected.

In my project, I wanted the application to connect to the server WebSocket at startup, and remain connected - or at least, appear to, always re-connecting "under the hood" if the connection drops.

To simplify things for subscribing components, all of this is managed by an Angular service, which maintains a single shared WebSocket connection to the server, and keeps it connected.

The back-end

The core part of the back-end is a WebSocket gateway. This is a class, decorated with @WebSocketGateway, which then exposes methods which get called when certain events occur. I had to dig into the source code to figure it out, but the standard WsAdapter class, supplied by the NestJS framework, expects that the WebSocket will receive JSON objects with an "event" property, and another "data" property containing the body of the message.

The method receiving the WebSocket messages is then decorated with @SubscribeMessage('subscriptions') which then matches any messages with an "event" property of "subscriptions".

The demo back-end implements a controller class that exposes a single endpoint, which accepts a message and broadcasts, via the WebSocket gateway to any clients subscribed to the 'message' event type. I implemented a NestJS service which sits between the controller and the WebSocket gateway, being able to pump messages into the gateway from anywhere in the back-end application - in this case - from the REST controller.

Filtering event types

As I wanted my back-end to simulate my real project, the concept of different event types was important. Each event type would determine which client sockets would be sent those messages.

Once clients connect to the server WebSocket they can send "subscription" messages - which set up the event type or types they want to receive. Then, when those events occur, the server must find all the clients interested in those events and selectively send the messages to those clients.

This was a good opportunity to use a WeakSet. When the server receives a "subscription" message, it adds the socket that sent the message to a Map, containing a list of sockets, keyed by the event type. This is a little hard to explain, but the code looks like this:

private subscriptions: Map<EventType, WeakSet<Socket>> = new Map();

When the subscription message is received, the socket is added to the Map as follows:

let map = this.subscriptions.get(eventType);
if (!map) {
	map = new WeakSet<Socket>();
	this.subscriptions.set(eventType, map);
}

map.add(client);

Similarly, on unsubscribe, the socket is removed from the Map:

const map = this.subscriptions.get(eventType);
if (map) {
	map.delete(client);
}

The WeakSet is useful because it weakly holds references to the socket. When the client disconnects, the socket will be disposed. The WeakSet ensures that it's not strongly holding a reference to the socket, allowing it to be garbage collected.

Connecting and disconnecting

A central part of the server is knowing what sockets are connected to it, so that these can be used to send messages.

By implementing the OnGatewayConnection and OnGatewayDisconnect interfaces, the server can keep track of connected sockets.

These interfaces have the handleConnection and handleDisconnect methods, which are called when a client connects or disconnects. The code than then maintain a list of connected client sockets, simply by adding or removing them from an array.

handleConnection(client: Socket) {
  this.wsClients.push(client);
}

handleDisconnect(client: Socket) {
  this.wsClients = this.wsClients.filter((c) => c !== client);
}

Sending messages to clients

As I've implemented a chat server as part of the demo, that requires that messages be broadcast to any client listening to the "message" event type.

So, the code gets the subscribers, then iterates through all connected clients. If the subscribers contains the client, the message is sent to that client.

const event: SubscriptionEvent = {
	eventType: 'message',
	body: msg,
};

// Find the subscribers to the message event
const subscribers = this.subscriptions.get('message');

this.wsClients.forEach((c) => {
	if (subscribers && subscribers.has(c)) {
		c.send(JSON.stringify(event));
	}
});

The front-end

The front-end is a simple Angular application, broken down into a number of components for clarity.

The main aspects I wanted to demonstrate were components being able to subscribe to different event types, then showing the results of that.

In my demo app, there are two components which subscribe to events:

  • The Messenger component, which allows sending and receiving "chat" messages
  • The connection monitoring component, which shows notifications from the server whenever another client connects or disconnects

I also built a component which shows the state of the WebSocket connection, and various statistics about how many subscriptions are active, the number of messages received, the number of times it's connected, and the number of retries when re-connecting.

Demo app

Managing the WebSocket - the SocketService

The entire management of the WebSocket connection is done in an Angular service. This ensures that there's a single connection to the server, which is shared with all components that subscribe to server events. It also does the important work of re-connecting when the socket disconnects.

Component API

The API that the SocketService exposes to allow clients to listen to messages is pretty simple. It takes a single parameter which is the event type, or array of event types to subscribe to. It returns an Observable stream of those events.

I'm a big fan of NgRx's ComponentStore, so I've implemented a lot of my code using ComponentStore effects. So, in my components, I usually do this also, which makes managing subscriptions foolproof.

Let's look at some code. Here's an effect which listens to the "message" event type, and when received, updates the component's state, containing a list of received messages, trimmed to a maximum length:

readonly listenForMessages = this.effect((trigger$) =>
  trigger$.pipe(
    switchMap(() =>
      this.socketService.listen<SubscriptionEvent<Message>>('message').pipe(
        withLatestFrom(this.messages$),
        tap(([message, messages]) => {
          this.patchState({
            messages: [...messages, message.body].slice(-MAX_MESSAGES)
          });
        }),
      ),
    ),
  ),
);

The API is pretty simple - listen to an event type and receive a stream of messages. The SocketService handles all the hard work.

The SocketService

Again, I've used the NgRx ComponentStore to implement the SocketService. The SocketService has to maintain state - using ComponentStore is a great way of doing this.

As I said earlier, I wanted to connect the WebSocket when the application starts, so this is done by setting up the configuration and connecting, right in the constructor:

constructor(@Inject
  (DOCUMENT) document: Document,
  private statsStore: SocketStatsStore
) {
  super({
    baseUri: document.baseURI,
    subMessages: [],
  });

  this.statsStore.setConnected(false);

  this.setUpWebSocketSubjectConfig();
  this.connect();
  this.watchQueue(this.toSend$);
}

The baseURI of the socket is the same as the baseURI of the application. This is configured in Angular's proxy.conf.json file, which is used when running the application in development mode.

{
	"/api": {
		"target": "http://localhost:3100",
		"secure": false
	},
	"/ws": {
		"target": "ws://localhost:3100",
		"secure": false,
		"ws": true,
		"pathRewrite": {
			"^/ws": ""
		},
		"logLevel": "debug"
	}
}

Setting up the configuration

The setUpWebSocketSubjectConfig method sets up the configuration for the WebSocketSubject.

/**
 * Constructs the WebSocketSubjectConfig object, with open and close observers
 * to handle connection status, and trying to re-connect when disconnected.
 */
private readonly setUpWebSocketSubjectConfig = this.effect((trigger$) =>
  trigger$.pipe(
    withLatestFrom(this.baseUri$),
    tap(([, baseUri]) => {
      const url = baseUri.replace(/^http/, 'ws') + 'ws';

      const config: WebSocketSubjectConfig<WsMessage> = {
        url,
        closeObserver: {
          next: (event) => {
            this.statsStore.setConnected(false);
            this.tryReconnect();
          },
        },
        openObserver: {
          next: (event) => {
            this.patchState({ connectError: undefined });

            this.statsStore.setConnected(true);
            this.statsStore.bumpConnections();

            // Notify connected
            this.connected.next();
          },
        },
      };

      this.patchState({ wsSubjectConfig: config });
    }),
  ),
);

The key point in this code is the close and open observers. These are callback functions that run whenever the socket is closed or opened. As I want to keep my socket connected, when the socket is closed, the code tries to re-connect it.

Connecting

Connecting to the socket is pretty simple. The socket is created, using the created configuration, and it returns the stream of messages received. These are tapped, and pushed into a messages Subject.

/**
 * Attempts to connect to the websocket.
 */
private readonly connect = this.effect((trigger$) =>
  trigger$.pipe(
    withLatestFrom(this.wsSubjectConfig$),
    switchMap(([, config]) => {
      assertDefined(config);

      // Create a new socket and listen for messages,
    // pushing them into the messages Subject.
      const socket = new WebSocketSubject(config);
      this.patchState({ socket });
      return socket.pipe(
        tap((msg) => {
          this.messages.next(msg);
          this.statsStore.bumpMessagesReceived();
        }),
        catchError((err) => {
          this.patchState({ connectError: err });
          return EMPTY;
        }),
      );
    }),
  ),
);

The purpose of the messages Subject is to decouple the received messages from the socket connection and disconnection. No matter how many times the socket connects or disconnects, it just keeps pushing messages into the same messages Subject.

The code which handles clients subscribing sends messages received from the messages Subject back to the client components. This way, there's continuity of stream, even if the socket disconnects and reconnects. The client components simply receive the stream of messages, and don't care about the socket's connection status.

Re-connecting

The code handling the re-connection of the web socket is pretty simple too.

/**
 * Handles attempting to reconnect to the websocket until connected or
 * the max retries have been reached.
 */
private readonly tryReconnect = this.effect((trigger$) =>
  trigger$.pipe(
    exhaustMap(() => {
      return timer(RETRY_SECONDS * 1000).pipe(
        withLatestFrom(this.isConnected$),
        takeWhile(([, isConnected]) => {
          if (!isConnected) {
            this.statsStore.bumpConnectionRetries();
          }

          return !isConnected && this.statsStore.reconnectionTries < MAX_RETRIES;
        }),
        tap(() => {
          this.connect();
        }),
      );
    }),
  ),
);

The code just repeatedly tries to connect, until it gives up. This could be enhanced to notify the user if too many retries are attempted, but for demonstration purposes this is enough.

Implementing the listen method

As I mentioned earlier, listening to events involves sending the server a "subscription" request. When a component finishes listening (which will finalise the RxJS subscription), an "unsubscription" request is sent to the server.

However, there's a potential complication here. As multiple clients could listen to the same event type, we only want to send the subscription message to the server for the first client subscribing to that event type. Likewise, when the last client unsubscribes, we want to send the unsubscription message to the server.

Let's look at the listen code:

/**
 * Begins listening to a type of events or events.
 *
 * Sets up the subscription with the server, sending a subscribe message,
 * and returning a stream of filtered messages.
 *
 * When the client closes the stream, sends an unsubscribe message to the server.
 *
 * @param eventType
 * @returns A stream of messages of the specified type.
 */
listen<T extends SubscriptionEvent>(eventType: EventType | EventType[])
  : Observable<T> {
  // Send a message to the server to begin subscribe to each of the event types
  // we're first to subscribe to.
  this.subscribeIfFirst(eventType);

  return this.messages$.pipe(
    map((msg) => msg as SubscriptionEvent),
    filter((msg) => {
      if (typeof eventType === 'string') {
        return msg.eventType === eventType;
      } else {
        return eventType.includes(msg.eventType);
      }
    }),
    map((msg) => msg as T),
    finalize(() => {
      // Caller has unsubscribed from the stream.
      // Send the message to the server to  unsubscribe for each eventType we're
      // last to unsubscribe from.
      this.unsubscribeIfLast(eventType);
    }),
  );
}

So, the filtering of the subscription and unsubscription messages is done in the subscribeIfFirst and unsubscribeIfLast methods.

The heart of this method though, is piping the messages Subject through a filter so only return messages that match the event type. Remember, since we're sharing a single WebSocket connection, every message received on the WebSocket will be pushed into the messages Subject. So, we need to filter out the messages that this particular call to the listen method isn't interested in - i.e. where the eventType doesn't match.

Watching the queue

As we can only send messages when the web socket is connected, we may need to queue messages. This is implemented with another piece of state - a simple array of messages to send to the server. This is monitored by an effect, which takes the toSend$ stream as a parameter.

The toSend$ stream is set up as follows:

/**
 * A stream of messages to send, combined with whether the websocket is connected.
 * This will emit when the websocket is connected, and there are messages to send.
 */
private readonly toSend$ = combineLatest([this.isConnected$, this.subMessages$])
  .pipe(
    filter(([isConnected, queue]) => !!(isConnected && queue.length)),
    map(([, queue]) => queue),
  );

So, this is using combineLatest to combine the latest values from the isConnected$ and subMessages$ streams. The filter then ensures that the stream only emits when the web socket is connected and there are messages to send.

Whenever a message is added to the queue, or the web socket connects, the toSend$ stream will emit, causing that message to be sent to the server, and then removed from the queue.

Gotcha!

When developing the demo application I wanted to test out the logic of the web socket disconnecting, and be able to watch it re-connecting.

Chrome's dev tools has a useful feature to simulate a network disconnect - on the Network tab, you can set the throttling to simulate Offline.

Chrome dev tools network tab

Unfortunately, this doesn't work!, which is a little puzzling. It seems that WebSocket connections are immune to this simulation, and it only apply to HTTP requests.

Anyway, I implemented a Disconnect button in my demo app, which simply closes the WebSocket connection. The re-connection logic then kicks in and re-connects the socket.

Conclusion

I hope I've shown how easy it is to implement an elegant and robust WebSocket interface using RxJS.

There's a few other things that need to be considered for a full-blown production solution, such as re-sending subscription messages when a client is reconnected, and, of course, authentication.

But the bare bones of the solution is here, and I hope it's a good starting point for others needing to implement a similar solution.

Source code

You can find the source code for the complete demo solution here


Headshot of Craig Shearer

Hi, I'm Craig. I'm a full-stack software architect and developer based in Auckland, Aotearoa/New Zealand. You can follow me on Twitter