Release Notes: WebSocket Implementation Improvements
The previous WS implementation had several limitations that affected the reliability and maintainability of the Pusher client:
- Didn't handle reconnections and timeouts as robustly as needed.
- Made it difficult to manage the WebSocket connection lifecycle independently of the main client.
What Changed?
1. Redesigned WS Client Architecture
- We've introduced a dedicated
WebSocketClient
struct that holds all WS-related functionality. - The new design makes use of a channel-based approach for sending commands to the WebSocket client
pub struct WebSocketClient {
url: Url,
socket: Option<WebSocketStream<MaybeTlsStream<TcpStream>>>,
state: Arc<RwLock<ConnectionState>>,
event_tx: mpsc::Sender<Event>,
command_rx: mpsc::Receiver<WebSocketCommand>,
}
pub enum WebSocketCommand {
Send(String),
Close,
}
2. Improved Connection Handling
- The client now attempts to reconnect automatically when the connection is lost, using an exponential backoff strategy.
- Connection State Management: We've implemented a more robust connection state management system, allowing users to accurately track the current state of the WebSocket connection.
3. Ping/Pong Mechanism
- Reliable Keepalive: We've implemented a proper ping/pong mechanism to keep the connection alive and detect disconnections more quickly.
- Configurable Timeouts: Users can now configure ping interval and pong timeout durations to suit their specific needs.
const PING_INTERVAL: Duration = Duration::from_secs(30);
const PONG_TIMEOUT: Duration = Duration::from_secs(10);
4. Async Improvements
- Efficient Event Loop: The new
run
method inWebSocketClient
usestokio::select!
for efficient handling of multiple asynchronous operations. - Non-blocking Ops: All WebSocket operations are now fully asynchronous, preventing any potential blocking in the event loop.
pub async fn run(&mut self) {
let mut ping_interval = interval(PING_INTERVAL);
let mut pong_timeout = Box::pin(sleep(Duration::from_secs(0)));
let mut waiting_for_pong = false;
while let Some(socket) = &mut self.socket {
tokio::select! {
// ... handle various async events ...
}
}
// ... handle disconnection ...
}
Breaking Changes
- The internal
WebSocketClient
API has changed significantly. If you were directly interacting with WebSocket internals, you'll need to update your code. - The
PusherClient
method signatures related to WebSocket operations have been updated to use the new format.