Heartbeat, reconnection, and reliability strategies
class ManagedWebSocket {
constructor(url, options = {}) {
this.url = url;
this.options = options;
this.ws = null;
this.pingInterval = null;
this.pingTimeout = null;
this.heartbeatInterval = 30000; // 30 seconds
this.heartbeatTimeout = 10000; // 10 seconds
}
connect() {
this.ws = new WebSocket(this.url, [], this.options);
this.ws.onopen = () => {
console.log('✅ WebSocket connected');
this.startHeartbeat();
this.onConnected?.();
};
this.ws.onmessage = (event) => {
this.handleMessage(event);
};
this.ws.onclose = (event) => {
console.log(`❌ Connection closed: ${event.code} ${event.reason}`);
this.stopHeartbeat();
this.handleReconnect(event);
};
this.ws.onerror = (error) => {
console.error('🚨 WebSocket error:', error);
};
this.ws.onpong = () => {
console.log('📡 Pong received');
this.clearPingTimeout();
};
}
startHeartbeat() {
this.pingInterval = setInterval(() => {
if (this.ws.readyState === WebSocket.OPEN) {
console.log('📡 Sending ping');
this.ws.ping();
// Set timeout for pong response
this.pingTimeout = setTimeout(() => {
console.log('⏰ Ping timeout - closing connection');
this.ws.close();
}, this.heartbeatTimeout);
}
}, this.heartbeatInterval);
}
stopHeartbeat() {
if (this.pingInterval) {
clearInterval(this.pingInterval);
this.pingInterval = null;
}
this.clearPingTimeout();
}
clearPingTimeout() {
if (this.pingTimeout) {
clearTimeout(this.pingTimeout);
this.pingTimeout = null;
}
}
handleMessage(event) {
const data = JSON.parse(event.data);
console.log('📨 Received:', data);
this.onMessage?.(data);
}
handleReconnect(closeEvent) {
// Check if close was intentional
if (closeEvent.code === 1000) {
console.log('🏁 Connection closed normally');
return;
}
console.log('🔄 Scheduling reconnection...');
this.reconnect();
}
send(data) {
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify(data));
} else {
console.warn('⚠️ Cannot send - WebSocket not connected');
}
}
disconnect() {
this.stopHeartbeat();
if (this.ws) {
this.ws.close(1000, 'Client disconnect');
}
}
}
import websocket
import json
import time
import threading
class ManagedWebSocket:
def __init__(self, url, headers=None):
self.url = url
self.headers = headers or {}
self.ws = None
self.ping_timer = None
self.heartbeat_interval = 30 # seconds
self.is_connected = False
def connect(self):
self.ws = websocket.WebSocketApp(
self.url,
header=self.headers,
on_open=self.on_open,
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close,
on_pong=self.on_pong
)
# Enable automatic ping/pong with 30 second interval
self.ws.run_forever(
ping_interval=self.heartbeat_interval,
ping_timeout=10
)
def on_open(self, ws):
print("✅ WebSocket connected")
self.is_connected = True
self.on_connected()
def on_message(self, ws, message):
try:
data = json.loads(message)
print(f"📨 Received: {data}")
self.handle_message(data)
except json.JSONDecodeError as e:
print(f"❌ JSON decode error: {e}")
def on_error(self, ws, error):
print(f"🚨 WebSocket error: {error}")
def on_close(self, ws, close_status_code, close_msg):
print(f"❌ Connection closed: {close_status_code} {close_msg}")
self.is_connected = False
# Reconnect if not a normal closure
if close_status_code != 1000:
self.reconnect()
def on_pong(self, ws, data):
print("📡 Pong received")
def on_connected(self):
# Override in subclasses
pass
def handle_message(self, data):
# Override in subclasses
pass
def send(self, data):
if self.ws and self.is_connected:
self.ws.send(json.dumps(data))
else:
print("⚠️ Cannot send - WebSocket not connected")
def disconnect(self):
if self.ws:
self.ws.close()
def reconnect(self):
print("🔄 Reconnecting...")
time.sleep(5) # Wait before reconnecting
self.connect()
class ReconnectingWebSocket {
constructor(url, options = {}) {
this.url = url;
this.options = options;
this.ws = null;
// Reconnection settings
this.reconnectAttempts = 0;
this.maxReconnectAttempts = 10;
this.initialDelay = 1000; // 1 second
this.maxDelay = 30000; // 30 seconds
this.backoffFactor = 2;
this.jitter = true;
// Connection state
this.isReconnecting = false;
this.shouldReconnect = true;
}
connect() {
if (this.isReconnecting) {
return;
}
try {
this.ws = new WebSocket(this.url, [], this.options);
this.setupEventHandlers();
} catch (error) {
console.error('Connection error:', error);
this.scheduleReconnect();
}
}
setupEventHandlers() {
this.ws.onopen = () => {
console.log(`✅ Connected (attempt ${this.reconnectAttempts + 1})`);
this.reconnectAttempts = 0;
this.isReconnecting = false;
this.onConnected?.();
};
this.ws.onmessage = (event) => {
this.handleMessage(event);
};
this.ws.onclose = (event) => {
console.log(`❌ Connection lost: ${event.code} ${event.reason}`);
if (this.shouldReconnect && event.code !== 1000) {
this.scheduleReconnect();
}
};
this.ws.onerror = (error) => {
console.error('🚨 WebSocket error:', error);
};
}
scheduleReconnect() {
if (!this.shouldReconnect || this.isReconnecting) {
return;
}
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
console.error('❌ Max reconnection attempts reached');
this.onReconnectFailed?.();
return;
}
this.isReconnecting = true;
this.reconnectAttempts++;
const delay = this.calculateDelay();
console.log(`🔄 Reconnecting in ${delay}ms (attempt ${this.reconnectAttempts})`);
setTimeout(() => {
if (this.shouldReconnect) {
this.connect();
}
}, delay);
}
calculateDelay() {
// Exponential backoff with jitter
let delay = Math.min(
this.initialDelay * Math.pow(this.backoffFactor, this.reconnectAttempts - 1),
this.maxDelay
);
if (this.jitter) {
// Add randomization to prevent thundering herd
delay = delay * (0.5 + Math.random() * 0.5);
}
return Math.floor(delay);
}
send(data) {
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify(data));
return true;
}
console.warn('⚠️ Cannot send - WebSocket not connected');
return false;
}
disconnect() {
this.shouldReconnect = false;
this.isReconnecting = false;
if (this.ws) {
this.ws.close(1000, 'Client disconnect');
}
}
getConnectionState() {
if (!this.ws) return 'DISCONNECTED';
switch (this.ws.readyState) {
case WebSocket.CONNECTING:
return 'CONNECTING';
case WebSocket.OPEN:
return 'CONNECTED';
case WebSocket.CLOSING:
return 'CLOSING';
case WebSocket.CLOSED:
return 'DISCONNECTED';
default:
return 'UNKNOWN';
}
}
}
class MonitoredWebSocket extends ReconnectingWebSocket {
constructor(url, options = {}) {
super(url, options);
this.metrics = {
totalConnections: 0,
totalDisconnections: 0,
totalErrors: 0,
messagesReceived: 0,
messagesSent: 0,
lastConnectedAt: null,
totalUptime: 0,
reconnectionCount: 0
};
this.startMetricsCollection();
}
connect() {
this.metrics.totalConnections++;
super.connect();
}
onConnected() {
this.metrics.lastConnectedAt = Date.now();
this.logMetrics();
super.onConnected?.();
}
setupEventHandlers() {
super.setupEventHandlers();
const originalOnClose = this.ws.onclose;
this.ws.onclose = (event) => {
this.metrics.totalDisconnections++;
if (this.metrics.lastConnectedAt) {
this.metrics.totalUptime += Date.now() - this.metrics.lastConnectedAt;
}
originalOnClose(event);
};
const originalOnError = this.ws.onerror;
this.ws.onerror = (error) => {
this.metrics.totalErrors++;
originalOnError(error);
};
const originalOnMessage = this.ws.onmessage;
this.ws.onmessage = (event) => {
this.metrics.messagesReceived++;
originalOnMessage(event);
};
}
send(data) {
const sent = super.send(data);
if (sent) {
this.metrics.messagesSent++;
}
return sent;
}
scheduleReconnect() {
this.metrics.reconnectionCount++;
super.scheduleReconnect();
}
startMetricsCollection() {
setInterval(() => {
this.logMetrics();
}, 60000); // Log every minute
}
logMetrics() {
const uptime = this.calculateUptime();
const state = this.getConnectionState();
console.log('📊 WebSocket Metrics:', {
state,
connections: this.metrics.totalConnections,
disconnections: this.metrics.totalDisconnections,
errors: this.metrics.totalErrors,
reconnections: this.metrics.reconnectionCount,
messagesReceived: this.metrics.messagesReceived,
messagesSent: this.metrics.messagesSent,
uptime: `${Math.floor(uptime / 1000)}s`,
uptimePercent: this.calculateUptimePercent()
});
}
calculateUptime() {
let totalUptime = this.metrics.totalUptime;
if (this.metrics.lastConnectedAt && this.getConnectionState() === 'CONNECTED') {
totalUptime += Date.now() - this.metrics.lastConnectedAt;
}
return totalUptime;
}
calculateUptimePercent() {
const totalTime = Date.now() - (this.metrics.firstConnectionAt || Date.now());
if (totalTime === 0) return 100;
return Math.min(100, (this.calculateUptime() / totalTime) * 100).toFixed(2);
}
getMetrics() {
return {
...this.metrics,
currentState: this.getConnectionState(),
uptime: this.calculateUptime(),
uptimePercent: this.calculateUptimePercent()
};
}
}
const ConnectionState = {
CONNECTING: 'CONNECTING',
CONNECTED: 'CONNECTED',
RECONNECTING: 'RECONNECTING',
DISCONNECTED: 'DISCONNECTED',
FAILED: 'FAILED'
};
class StatefulWebSocket {
constructor(url, options = {}) {
this.url = url;
this.options = options;
this.state = ConnectionState.DISCONNECTED;
this.stateListeners = [];
}
setState(newState) {
const oldState = this.state;
this.state = newState;
console.log(`🔄 State change: ${oldState} → ${newState}`);
// Notify listeners
this.stateListeners.forEach(listener => {
try {
listener(newState, oldState);
} catch (error) {
console.error('State listener error:', error);
}
});
}
onStateChange(callback) {
this.stateListeners.push(callback);
// Return unsubscribe function
return () => {
const index = this.stateListeners.indexOf(callback);
if (index > -1) {
this.stateListeners.splice(index, 1);
}
};
}
connect() {
if (this.state === ConnectionState.CONNECTING) {
return;
}
this.setState(ConnectionState.CONNECTING);
try {
this.ws = new WebSocket(this.url, [], this.options);
this.setupEventHandlers();
} catch (error) {
this.setState(ConnectionState.FAILED);
throw error;
}
}
setupEventHandlers() {
this.ws.onopen = () => {
this.setState(ConnectionState.CONNECTED);
};
this.ws.onclose = (event) => {
if (event.code === 1000) {
this.setState(ConnectionState.DISCONNECTED);
} else {
this.setState(ConnectionState.RECONNECTING);
this.scheduleReconnect();
}
};
this.ws.onerror = () => {
if (this.state === ConnectionState.CONNECTING) {
this.setState(ConnectionState.FAILED);
}
};
}
isConnected() {
return this.state === ConnectionState.CONNECTED;
}
isConnecting() {
return this.state === ConnectionState.CONNECTING;
}
canSend() {
return this.isConnected() && this.ws.readyState === WebSocket.OPEN;
}
}
Connection Lifecycle
Reconnection Strategy
Error Handling
Performance Optimization
class ProductionWebSocket {
constructor(url, options = {}) {
this.url = url;
this.options = {
maxReconnectAttempts: 10,
heartbeatInterval: 30000,
...options
};
this.ws = null;
this.state = 'DISCONNECTED';
this.messageQueue = [];
this.subscriptions = new Map();
// Bind methods to preserve context
this.connect = this.connect.bind(this);
this.disconnect = this.disconnect.bind(this);
this.send = this.send.bind(this);
}
async connect() {
try {
this.setState('CONNECTING');
this.ws = new WebSocket(this.url, [], this.options.headers ? { headers: this.options.headers } : {});
this.ws.onopen = this.handleOpen.bind(this);
this.ws.onmessage = this.handleMessage.bind(this);
this.ws.onclose = this.handleClose.bind(this);
this.ws.onerror = this.handleError.bind(this);
// Set connection timeout
setTimeout(() => {
if (this.state === 'CONNECTING') {
this.ws.close();
this.handleConnectionTimeout();
}
}, 10000);
} catch (error) {
this.setState('FAILED');
this.handleError(error);
}
}
handleOpen() {
this.setState('CONNECTED');
this.startHeartbeat();
this.flushMessageQueue();
this.resubscribe();
this.onConnected?.();
}
handleMessage(event) {
try {
const data = JSON.parse(event.data);
this.onMessage?.(data);
} catch (error) {
console.error('Message parsing error:', error);
}
}
handleClose(event) {
this.stopHeartbeat();
if (event.code === 1000) {
this.setState('DISCONNECTED');
} else {
this.setState('RECONNECTING');
this.scheduleReconnect();
}
this.onDisconnected?.(event);
}
handleError(error) {
console.error('WebSocket error:', error);
this.onError?.(error);
}
send(data) {
if (this.state === 'CONNECTED') {
this.ws.send(JSON.stringify(data));
return true;
} else {
// Queue message for when connection is restored
this.messageQueue.push(data);
return false;
}
}
subscribe(channel, symbol) {
const subscription = { channel, symbol };
this.subscriptions.set(`${channel}.${symbol}`, subscription);
this.send({
op: 'subscribe',
args: [subscription]
});
}
unsubscribe(channel, symbol) {
const key = `${channel}.${symbol}`;
const subscription = this.subscriptions.get(key);
if (subscription) {
this.subscriptions.delete(key);
this.send({
op: 'unsubscribe',
args: [subscription]
});
}
}
flushMessageQueue() {
while (this.messageQueue.length > 0) {
const message = this.messageQueue.shift();
this.send(message);
}
}
resubscribe() {
if (this.subscriptions.size > 0) {
this.send({
op: 'subscribe',
args: Array.from(this.subscriptions.values())
});
}
}
setState(newState) {
const oldState = this.state;
this.state = newState;
this.onStateChange?.(newState, oldState);
}
disconnect() {
this.shouldReconnect = false;
this.stopHeartbeat();
if (this.ws) {
this.ws.close(1000, 'Client disconnect');
}
}
// Event handlers (set these as needed)
onConnected() {}
onDisconnected(event) {}
onMessage(data) {}
onError(error) {}
onStateChange(newState, oldState) {}
}
// Usage
const client = new ProductionWebSocket('wss://ws.roxom.com/ws', {
maxReconnectAttempts: 5,
heartbeatInterval: 30000
});
client.onConnected = () => {
console.log('🚀 Ready for trading');
client.subscribe('level1', 'US500-BTC');
};
client.onMessage = (data) => {
console.log('📨 Market data:', data);
};
client.connect();
Was this page helpful?