Axum WebSocket patterns

Upgrade handshake

WebSocket starts as a normal HTTP request. The client sends Connection: Upgrade and Upgrade: websocket headers. The server responds with HTTP 101 Switching Protocols — meaning “I’m no longer speaking HTTP on this connection, we’re switching to the WebSocket frame protocol.” After the 101, both sides send binary WebSocket frames instead of HTTP.

In Axum, WebSocketUpgrade is an extractor that checks these upgrade headers. Call .on_upgrade(callback) — Axum sends the 101 response and then hands your callback a WebSocket object:

async fn ws_handler(ws: WebSocketUpgrade) -> Response {
    ws.on_upgrade(handle_socket)
}
 
async fn handle_socket(mut socket: WebSocket) {
    // The HTTP handshake is done. This is a live bidirectional connection.
    // socket.recv() reads the next frame from the client.
    // socket.send() writes a frame to the client.
}

Message enum

enum Message {
    Text(Utf8Bytes),  // NOT String — see conversion below
    Binary(Bytes),
    Ping(Bytes),
    Pong(Bytes),
    Close(Option<CloseFrame>),
}

Text holds Utf8Bytes, not String:

  • StringUtf8Bytes: .into()
  • Utf8BytesString: .to_string() (or .as_str() for &str)

Basic echo loop

async fn handle_socket(mut socket: WebSocket) {
    while let Some(Ok(msg)) = socket.recv().await {
        match msg {
            Message::Text(t) => {
                socket.send(Message::Text(t)).await.ok();
            }
            Message::Close(_) => break,
            _ => {}
        }
    }
}

Splitting for concurrent access

The echo loop above reads and writes on the same socket. This works when you process one message at a time, but if you need two concurrent loops (e.g. one reading from the client, one writing from a broadcast channel), you can’t hold &mut socket in two places.

socket.split() consumes the socket and returns two independent halves:

  • SplitSink (sender) — you call sender.send(msg).await
  • SplitStream (receiver) — you call receiver.next().await

Each half can be moved into a separate tokio task.

use futures_util::{SinkExt, StreamExt};
 
let (mut sender, mut receiver) = socket.split();
  • .send() on SplitSink requires SinkExt from futures_util
  • .next() on SplitStream requires StreamExt from futures_util
  • See Rust Streams and StreamExt for why futures_util specifically

Broadcast channel pattern

tokio::sync::broadcast is a multi-producer, multi-consumer channel. One sender, many receivers. When you call tx.send(msg), every active receiver gets a copy.

This is perfect for chat: every connected client subscribes, and any message sent by one client reaches all others.

Shared state holds the sender:

use tokio::sync::broadcast;
 
struct AppState {
    tx: broadcast::Sender<String>,
}
 
// At startup — create the channel with buffer capacity 100
let (tx, _rx) = broadcast::channel(100);
// _rx is dropped immediately; each client creates its own receiver via tx.subscribe()
let state = Arc::new(AppState { tx });

Each client subscribes when it connects:

async fn handle_socket(socket: WebSocket, state: Arc<AppState>) {
    let (mut sender, mut receiver) = socket.split();
    let mut rx = state.tx.subscribe();
    // rx is this client's personal receiver — it will get every message
    // sent to state.tx by any other client

Concurrent send/receive with tokio::select!

A chat client needs two concurrent loops:

  • Task A: read from broadcast → write to this client’s WebSocket
  • Task B: read from this client’s WebSocket → write to broadcast

Each task is spawned with tokio::spawn. The reason for async move (not a regular closure): tokio::spawn expects a Future, and the spawned task needs to own the captured variables (sender, receiver, rx) because the task runs independently — possibly outliving the current function. move transfers ownership into the async block.

// Task A: broadcast → this client
let send_task = tokio::spawn(async move {
    // `rx` and `sender` are moved into this task — it owns them
    while let Ok(text) = rx.recv().await {
        if sender.send(Message::Text(text.into())).await.is_err() {
            break; // client disconnected, stop
        }
    }
});
 
// Task B: this client → broadcast
let recv_task = tokio::spawn(async move {
    // `receiver` and `state` are moved into this task
    while let Some(Ok(msg)) = receiver.next().await {
        if let Message::Text(t) = msg {
            let _ = state.tx.send(t.to_string());
        }
    }
});

Now both tasks run concurrently. But when one finishes (client disconnected, broadcast closed), we need to stop the other. That’s what tokio::select! does:

tokio::select! {
    _ = &mut send_task => recv_task.abort(),
    _ = &mut recv_task => send_task.abort(),
}

Reading the syntax:

  • tokio::select! polls all branches simultaneously, waiting for the first one to complete
  • _ = &mut send_task means: await send_task (a JoinHandle, which is a future), and discard its return value (_). The &mut is needed because select! borrows the future — it doesn’t consume it, so the other branch can still call .abort() on the surviving task
  • => recv_task.abort() is the body that runs when this branch wins
  • Whichever task finishes first triggers its body, which aborts the other task

Without this, a disconnected client would leave a zombie task running forever, leaking memory.

See also