Axum WebSocket patterns
Upgrade handshake
WebSocket starts as a normal HTTP request. The client sends Connection: Upgrade and Upgrade: websocket headers. The server responds with HTTP 101 Switching Protocols — meaning “I’m no longer speaking HTTP on this connection, we’re switching to the WebSocket frame protocol.” After the 101, both sides send binary WebSocket frames instead of HTTP.
In Axum, WebSocketUpgrade is an extractor that checks these upgrade headers. Call .on_upgrade(callback) — Axum sends the 101 response and then hands your callback a WebSocket object:
async fn ws_handler(ws: WebSocketUpgrade) -> Response {
ws.on_upgrade(handle_socket)
}
async fn handle_socket(mut socket: WebSocket) {
// The HTTP handshake is done. This is a live bidirectional connection.
// socket.recv() reads the next frame from the client.
// socket.send() writes a frame to the client.
}Message enum
enum Message {
Text(Utf8Bytes), // NOT String — see conversion below
Binary(Bytes),
Ping(Bytes),
Pong(Bytes),
Close(Option<CloseFrame>),
}Text holds Utf8Bytes, not String:
String→Utf8Bytes:.into()Utf8Bytes→String:.to_string()(or.as_str()for&str)
Basic echo loop
async fn handle_socket(mut socket: WebSocket) {
while let Some(Ok(msg)) = socket.recv().await {
match msg {
Message::Text(t) => {
socket.send(Message::Text(t)).await.ok();
}
Message::Close(_) => break,
_ => {}
}
}
}Splitting for concurrent access
The echo loop above reads and writes on the same socket. This works when you process one message at a time, but if you need two concurrent loops (e.g. one reading from the client, one writing from a broadcast channel), you can’t hold &mut socket in two places.
socket.split() consumes the socket and returns two independent halves:
SplitSink(sender) — you callsender.send(msg).awaitSplitStream(receiver) — you callreceiver.next().await
Each half can be moved into a separate tokio task.
use futures_util::{SinkExt, StreamExt};
let (mut sender, mut receiver) = socket.split();.send()onSplitSinkrequiresSinkExtfromfutures_util.next()onSplitStreamrequiresStreamExtfromfutures_util- See Rust Streams and StreamExt for why
futures_utilspecifically
Broadcast channel pattern
tokio::sync::broadcast is a multi-producer, multi-consumer channel. One sender, many receivers. When you call tx.send(msg), every active receiver gets a copy.
This is perfect for chat: every connected client subscribes, and any message sent by one client reaches all others.
Shared state holds the sender:
use tokio::sync::broadcast;
struct AppState {
tx: broadcast::Sender<String>,
}
// At startup — create the channel with buffer capacity 100
let (tx, _rx) = broadcast::channel(100);
// _rx is dropped immediately; each client creates its own receiver via tx.subscribe()
let state = Arc::new(AppState { tx });Each client subscribes when it connects:
async fn handle_socket(socket: WebSocket, state: Arc<AppState>) {
let (mut sender, mut receiver) = socket.split();
let mut rx = state.tx.subscribe();
// rx is this client's personal receiver — it will get every message
// sent to state.tx by any other clientConcurrent send/receive with tokio::select!
A chat client needs two concurrent loops:
- Task A: read from broadcast → write to this client’s WebSocket
- Task B: read from this client’s WebSocket → write to broadcast
Each task is spawned with tokio::spawn. The reason for async move (not a regular closure): tokio::spawn expects a Future, and the spawned task needs to own the captured variables (sender, receiver, rx) because the task runs independently — possibly outliving the current function. move transfers ownership into the async block.
// Task A: broadcast → this client
let send_task = tokio::spawn(async move {
// `rx` and `sender` are moved into this task — it owns them
while let Ok(text) = rx.recv().await {
if sender.send(Message::Text(text.into())).await.is_err() {
break; // client disconnected, stop
}
}
});
// Task B: this client → broadcast
let recv_task = tokio::spawn(async move {
// `receiver` and `state` are moved into this task
while let Some(Ok(msg)) = receiver.next().await {
if let Message::Text(t) = msg {
let _ = state.tx.send(t.to_string());
}
}
});Now both tasks run concurrently. But when one finishes (client disconnected, broadcast closed), we need to stop the other. That’s what tokio::select! does:
tokio::select! {
_ = &mut send_task => recv_task.abort(),
_ = &mut recv_task => send_task.abort(),
}Reading the syntax:
tokio::select!polls all branches simultaneously, waiting for the first one to complete_ = &mut send_taskmeans: awaitsend_task(aJoinHandle, which is a future), and discard its return value (_). The&mutis needed becauseselect!borrows the future — it doesn’t consume it, so the other branch can still call.abort()on the surviving task=> recv_task.abort()is the body that runs when this branch wins- Whichever task finishes first triggers its body, which aborts the other task
Without this, a disconnected client would leave a zombie task running forever, leaking memory.
See also
- Rust Streams and StreamExt — StreamExt and SinkExt trait details
- Axum Dependency Injection — how State and extractors work
- Rust testing patterns for async web services — testing WebSocket handlers
- Async — async/await fundamentals