Scaling Websockets With Pubsub
Scalable websockets powered by pubsub
Preface
So here’s a little refresher on stateless vs stateful systems
As long as you are not worrying about distributed systems, stateless vs stateful doesn’t really mean anything, since the requests are going to be served by the same process, and can thus have in memory state without any worries. But as soon as you think about availability, or scale to distributed systems where you have multiple pods running, usually across nodes in different availability zones. Now, if my application has in memory state, it’ll break functionality since the requests will be routed to any of the available nodes/replicas which may not have this previous state.
Now certain problems are stateful by definition, and websockets is one of them. The client establishes a TCP connection with the server which stays alive for quite a long time, and messages are exchanged to and fro, which may not work properly on network breaks/ pod downtime, as the next request may be made to an entirely different server.
So what to do?
So there are two main approaches used widely to solve this problem
The websocket server state, including the client information is stored in a centralized, ideally distributed KV system like redis. Example implementations would be django’s
channels
library and some other frameworks. This name is quite apt, you’ll know why soonThe other approach is to have a common proxy server which would then send the requests, say through rpc’s to the respective services. PushPin is one such example. This works, but has it’s own problems like having a central point of failure and depending on limited tools without widespread support
Pubsub is also generally used to avoid having multiple services in a microservice system have websockets, and limit the “statefulness” to one or two services that can be given special treatment in terms of availability guarantees, through say node affinity. Wouldn’t it be great to have this dedicated service be fully distributed, ideally backed by a RAFT based message broker like NATS
? Another advantage this brings is make it simpler to work with since websockets will be abstracted as simple fire-and-forget pubsub as far as services are concerned.
Approach
Here’s an excalidraw scrsht illustrating the approach
It’s a simple idea, which passes websocket messages to and from our services through pubsub streams. There are five main players here
- client: sends and receives messages over ws to our
websocket
service - recv stream: this goroutine is going to be spawned for each ws connection, which’ll start a consumer listening at
ws.recv.<svc>.<user>
. These svc and user values are obtained from the request, say cookies - send stream: this gorouting, again per request… will listen for client messages and publish them at
ws.send.<svc>.<user>
, that’s it - service consumer: So instead of reading messages from a websocket client, our services will consume them from
ws.send.<svc>.<user>
- service producer: Whenever service needs to send a message to the client, it’ll simply produce to
ws.recv.<svc>.<user>
, this will be picked up by the recv stream and sent to the websocket client
Neat right? And as long as our broker is something like nats/kafka which ensures write consensus, we’ll have persistent, distributed websockets that can be scaled independently to any number of replicas, with the added advantage of having dedicated queues for every websocket client. And since these queues are persistent, you can send to them even when the client is not connected, recv stream will pick it up on the next connection :)
Let’s code it up
Cool, let’s start our go project
I’m gonna add the typical cmd
, pkg
structure to it, and we’re gonna have three packages, for broker
, server
and stream
respectively
1
2
3
4
5
6
7
8
9
10
11
12
13
(base) websocketstream git:main ❯ tree ✖
.
├── README.md
├── cmd
│ └── main.go
├── go.mod
├── go.sum
└── pkg
├── brokers
├── server
└── stream
6 directories, 4 files
Server
Let’s start with the server. I’m using gorilla/websocket
here, add it like so
1
go get github.com/gorilla/websocket
Now go ahead and define the upgrader and the Handler function, in a file called handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package server
import (
"log"
"net/http"
"github.com/gorilla/websocket"
)
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}
func HandleWs(w http.ResponseWriter, r *http.Request){
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil{
log.Fatalf("error upgrading websocket connection\n")
}
defer conn.Close()
log.Println("Client connected")
}
Here’s my main server code, under serve.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package server
import (
"fmt"
"log"
"net/http"
)
type Server struct{
Port int
}
func NewServer(port int) *Server {
return &Server{port}
}
func (s *Server) Start(){
http.HandleFunc("/ws/", HandleWs)
log.Printf("listening on port: %d", s.Port)
if err := http.ListenAndServe(fmt.Sprintf(":%d", s.Port), nil); err != nil{
log.Fatalf("Error starting server at port %d: %s\n", s.Port, err)
}
}
Go ahead and add this to cmd/main.go
1
2
3
4
5
6
7
8
package main
import "github.com/ashupednekar/websocketstream/pkg/server"
func main(){
s := server.NewServer(8000)
s.Start()
}
Broker
Now that we have our server, let’s set up pubsub.
I’m gonna create a base.go
file under brokers
package with two things. The idea here is to shield the rest of our packages from broker specific stuff, instead they are going to deal with a common Broker
interface we’ll define soon
- A
message
struct, this is the common message type that’s gonna be agnostic to which broker we’re working with
1
2
3
4
type Message struct{
Subject string
Data []byte
}
- We need an interface to define what our brokers should do, namely
pub
andsub
:)
1
2
3
4
type Broker interface{
Produce(subject string, data []byte);
Consume(subject string, ch chan Message)
}
The produce
function takes a subject and byte data, I’ve chosen subject as the terminology because I like nats xP. The same things is going to be routing key and topic respectively when you add amqp and kafka support
Let’s have a NewBroker
function that’ll instantiate the right broker based on env, only nats
for now
1
2
3
4
func NewBroker() Broker {
//if os.Getenv("PUBSUB_BROKER") == "nats"{
return NewNatsBroker("websockets")
}
Note that this is returning the interface, i.e. Broker
, and not the NatsBroker
type. This keeps it clean in the rest of the modules and serves as a decent abstraction.
Let’s quickly add the nats implementation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package brokers
import (
"context"
"fmt"
"log"
"os"
"strings"
"time"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
)
type NatsBroker struct{
StreamName string
Stream jetstream.JetStream
}
func NewNatsBroker(stream string) *NatsBroker{
nc, err := nats.Connect(os.Getenv("NATS_BROKER_URL"))
if err != nil{
log.Fatalf("couldn't connect to nats: %s", err);
}
js, _ := jetstream.New(nc)
ctx := context.Background()
_, err = js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{Name: stream, Subjects: []string{"ws.>"}, Retention: jetstream.WorkQueuePolicy})
if err != nil{
log.Fatalf("error creating/updating stream: %s", err)
}
return &NatsBroker{Stream: js, StreamName: stream}
}
func (self *NatsBroker) Produce(subject string, data []byte){
ctx := context.Background()
self.Stream.Publish(ctx, subject, data)
defer ctx.Done()
}
func (self *NatsBroker) Consume(subject string, ch chan Message){
ctx, _ := context.WithTimeout(context.Background(), time.Second * 300)
defer ctx.Done()
c, err := self.Stream.CreateOrUpdateConsumer(ctx, self.StreamName, jetstream.ConsumerConfig{
Durable: strings.ReplaceAll(fmt.Sprintf("%s-consumer", subject), ".", "-"),
FilterSubject: subject,
})
if err != nil{
log.Fatalf("error creating consumer: %s", err)
}
consumer, err := c.Consume(func(msg jetstream.Msg){
msg.Ack()
log.Printf("Received message: %v", msg.Data())
ch <- Message{
Subject: subject,
Data: msg.Data(),
}
})
defer consumer.Stop()
<-ctx.Done()
}
Just add this and run
go mod tidy
, it’ll take care of the dependencies
Here’s what’s being done here
- Connecting to nats, picking the url from an env
- Creating the
websockets
stream, with the wildcard patternws.>
, refer to nats docs for more - Defined
Produce
andConsume
methods conforming to the previously defined interface
Note that the consumer accepts a channel chan Message
and passes that to the callback. Essentially every message consumed will be passed through this channel in the common Message
format we talked about earlier, with the subject and the byte data
Stream
Now let’s get to the meat, the send and recv stream we mentioned in our diagram…
Since we wrote decent abstractions around the pubsub and server, the main business logic is going to be short and clean
Let’s start with the stream responsible for receiving client messages
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package stream
import (
"log"
"github.com/ashupednekar/websocketstream/pkg/brokers"
"github.com/gorilla/websocket"
)
func RecvClientMessages (conn *websocket.Conn, broker brokers.Broker){
go func(){
for {
_, message, err := conn.ReadMessage()
if err != nil{
log.Printf("error reading client message: %s\n", err)
break
}
log.Printf("received message from client: %s\n", message)
broker.Produce("ws.send.svc.user", message)
}
}()
}
The naming here can get tricky, since something that’s “sending” messages from one perspective is actually “receiving” from another perspective. That’s why I chose to go with what messages they are working with.
This one receives client messages from the websocket client, and produces it to pubsub
Note that the whole thing is wrapped in a go func(){}()
cuz we want this to run concurrently in a seperate goroutine
Cool, let’s proceed to handing the messages from our services
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package stream
import (
"github.com/ashupednekar/websocketstream/pkg/brokers"
"github.com/gorilla/websocket"
)
func RecvServiceMessages(conn *websocket.Conn, broker brokers.Broker){
ch := make(chan brokers.Message)
go broker.Consume("ws.recv.svc.user", ch)
for msg := range(ch){
conn.WriteMessage(websocket.BinaryMessage, msg.Data)
}
}
Here’s the bottom line
This one consumes the messages from pubsub, and writes them to the websocket client
Note that the consumer here, runs in a seperate goroutine for concurrency, and we’re getting the messages it writes to the channel, which are then written to the websocket client
Wrapping up
Let’s update our websocket handler
1
2
3
4
5
6
7
8
9
10
11
12
13
func HandleWs(w http.ResponseWriter, r *http.Request){
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil{
log.Fatalf("error upgrading websocket connection\n")
}
defer conn.Close()
log.Println("Client connected")
broker := brokers.NewBroker()
stream.RecvClientMessages(conn, broker)
stream.RecvServiceMessages(conn, broker)
}
I’ve just created the broker and started both streams, note that even the goroutine creation is abstracted from here, that’s a personal preference, you could go either way
One last thing
We need to identify the target service and user from the request, say through cookies I’m gonna update my route and handler to accept these two path params
1
http.HandleFunc("/ws/{svc}/{user}/", HandleWs)
1
2
3
4
5
6
service, _ := r.Cookie("service")
user, _ := r.Cookie("user")
log.Printf("service: %s, user: %s", service.Value, user.Value)
...
stream.RecvClientMessages(conn, broker, service.Value, user.Value)
stream.RecvServiceMessages(conn, broker, service.Value, user.Value)
Now let’s update our stream functions to accept these, and use them in their corresponding pubsub subjects
1
2
3
4
5
func RecvServiceMessages(conn *websocket.Conn, broker brokers.Broker, service string, user string){
...
go broker.Consume(fmt.Sprintf("ws.recv.%s.%s", service, user), ch)
...
}
1
2
3
4
func RecvClientMessages (conn *websocket.Conn, broker brokers.Broker, service string, user string){
...
broker.Produce(fmt.Sprintf("ws.send.%s.%s", service, user), message)
}
1
2
stream.RecvClientMessages(conn, broker, service, user)
stream.RecvServiceMessages(conn, broker, service, user)
Let’s see it in action
Server logs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
(base) websocketstream git:main ❯ go run cmd/main.go ⏎ ✹
2025/01/06 01:58:02 listening on port: 8000
2025/01/06 01:58:14 Client connected
2025/01/06 01:58:14 service: service1, user: user001
2025/01/06 01:58:17 received message from client: clientsendingmessage1
2025/01/06 01:58:17 producing to: ws.send.service1.user001
2025/01/06 01:58:24 received message from client: clientsendingmessage2
2025/01/06 01:58:24 producing to: ws.send.service1.user001
2025/01/06 01:58:42 Received message: [115 101 114 118 101 114 115 101 110 100 105 110 103 109 101 115 115 97 103 101 49]
2025/01/06 01:58:44 Received message: [115 101 114 118 101 114 115 101 110 100 105 110 103 109 101 115 115 97 103 101 50]
2025/01/06 01:58:53 Received message: [121 97 121]
2025/01/06 01:59:01 received message from client: yay
2025/01/06 01:59:01 producing to: ws.send.service1.user001
Websocket client
1
2
3
4
5
6
7
(base) ~ ❯ websocat ws://localhost:8000/ws/ -H 'Cookie: user=user001;service=service1'
clientsendingmessage1
clientsendingmessage2
serversendingmessage1
serversendingmessage2
yay
yay
Service sending messages
1
2
3
4
5
6
(base) ~ ❯ nats pub ws.recv.service1.user001 serversendingmessage1
01:58:42 Published 21 bytes to "ws.recv.service1.user001"
(base) ~ ❯ nats pub ws.recv.service1.user001 serversendingmessage2
01:58:44 Published 21 bytes to "ws.recv.service1.user001"
(base) ~ ❯ nats pub ws.recv.service1.user001 yay
01:58:53 Published 3 bytes to "ws.recv.service1.user001"
Service receiving messages
1
2
3
4
5
6
7
8
9
10
(base) ~ ❯ nats sub "ws.send.service1.>" ⏎
01:58:10 Subscribing on ws.send.service1.>
[#1] Received on "ws.send.service1.user001" with reply "_INBOX.K35i42q5yDFnMzZcaoO6aO.X9GVnakO"
clientsendingmessage1
[#2] Received on "ws.send.service1.user001" with reply "_INBOX.K35i42q5yDFnMzZcaoO6aO.oV4fBGgX"
clientsendingmessage2
[#3] Received on "ws.send.service1.user001" with reply "_INBOX.K35i42q5yDFnMzZcaoO6aO.io2js6cd"
yay
That’s it!
You now have a scalable websocket broker, you can always try it out by installing it like so…
1
2
3
4
5
6
(base) ~ ❯ go install github.com/ashupednekar/websocketstream/cmd@latest ⏎
(base) ~ ❯ which cmd
/Users/ashutoshpednekar/go/bin/cmd
(base) ~ ❯ mv /Users/ashutoshpednekar/go/bin/cmd /Users/ashutoshpednekar/go/bin/websocketstream
(base) ~ ❯ websocketstream
2025/01/06 02:06:26 listening on port: 8000
Thank you :) Any broker support PR’s are welcome. Please star the repo on github
😄