Real-Time WebSocket Streaming with Smart Money API
Master real-time data streaming with WebSockets. This tutorial covers subscription channels, event handling, connection management, error recovery, and building live dashboards with sub-100ms latency updates from whale wallets and market signals.
WebSocket Overview
WebSockets provide persistent, bidirectional communication between your application and Smart Money API servers. Unlike REST APIs that require polling, WebSockets push data to your application in real-time with latencies under 100 milliseconds.
The Smart Money API WebSocket endpoint is available at:
wss://api.smartmoneyapi.com/ws
WebSocket connections require authentication via your API key and maintain subscriptions to data channels. When subscribed to a channel, you receive events in real-time as data updates occur on the exchanges.
Available Channels
Smart Money API offers multiple WebSocket channels for different data types. Choose channels based on your trading strategy and monitoring needs.
Available Subscription Channels
- whales:BTC, whales:ETH - Whale position changes for specific symbols
- derivatives:funding - Real-time funding rate changes across exchanges
- derivatives:liquidations - Liquidation events as they occur
- derivatives:openinterest - Open interest updates
- onchain:flows - Exchange deposit/withdrawal flows
- signals:confirmation - Confirmation score updates
- market:alerts - Custom alert notifications
- system:status - Connection status and system messages
Connection Setup
Establish a WebSocket connection by connecting to the WebSocket endpoint and sending an authentication message.
// Basic WebSocket connection setup
const API_KEY = process.env.SMARTMONEY_PUBLIC_KEY;
const WS_URL = 'wss://api.smartmoneyapi.com/ws';
const ws = new WebSocket(WS_URL);
ws.addEventListener('open', () => {
console.log('WebSocket connected');
// Send authentication
ws.send(JSON.stringify({
type: 'auth',
api_key: API_KEY
}));
});
ws.addEventListener('message', (event) => {
const data = JSON.parse(event.data);
console.log('Received:', data);
});
ws.addEventListener('error', (error) => {
console.error('WebSocket error:', error);
});
ws.addEventListener('close', () => {
console.log('WebSocket disconnected');
});
Subscription Management
Subscribe to channels after authentication to receive real-time updates. You can subscribe to multiple channels simultaneously.
import websocket
import json
import threading
class WebSocketClient:
def __init__(self, api_key):
self.api_key = api_key
self.ws = None
self.subscriptions = set()
def connect(self):
"""Establish WebSocket connection"""
self.ws = websocket.WebSocketApp(
'wss://api.smartmoneyapi.com/ws',
on_open=self.on_open,
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close
)
# Run in separate thread
wst = threading.Thread(target=self.ws.run_forever)
wst.daemon = True
wst.start()
def on_open(self, ws):
"""Called when connection opens"""
print("WebSocket opened")
# Authenticate
ws.send(json.dumps({
'type': 'auth',
'api_key': self.api_key
}))
# Subscribe to channels
self.subscribe('whales:BTC')
self.subscribe('derivatives:funding')
self.subscribe('signals:confirmation')
def subscribe(self, channel):
"""Subscribe to a channel"""
if self.ws:
self.ws.send(json.dumps({
'type': 'subscribe',
'channel': channel
}))
self.subscriptions.add(channel)
print(f"Subscribed to {channel}")
def unsubscribe(self, channel):
"""Unsubscribe from a channel"""
if self.ws:
self.ws.send(json.dumps({
'type': 'unsubscribe',
'channel': channel
}))
self.subscriptions.discard(channel)
def on_message(self, ws, message):
"""Called when message is received"""
data = json.loads(message)
self.handle_event(data)
def handle_event(self, data):
"""Handle incoming events"""
event_type = data.get('type')
if event_type == 'whale_update':
print(f"Whale update: {data['wallet']} - {data['asset']}")
elif event_type == 'funding_rate':
print(f"Funding: {data['symbol']} - {data['rate']}")
elif event_type == 'signal':
print(f"Signal: {data['symbol']} - {data['signal_type']}")
def on_error(self, ws, error):
print(f"WebSocket error: {error}")
def on_close(self, ws, close_status_code, close_msg):
print("WebSocket closed")
# Usage
client = WebSocketClient(api_key='your_api_key')
client.connect()
# Keep running
import time
while True:
time.sleep(1)
Event Handling and Parsing
Parse and handle different event types from WebSocket messages. Each event type carries specific data relevant to its channel.
class SmartMoneyWebSocket {
constructor(apiKey) {
this.apiKey = apiKey;
this.ws = null;
this.handlers = new Map();
}
connect() {
this.ws = new WebSocket('wss://api.smartmoneyapi.com/ws');
this.ws.onopen = () => this.onOpen();
this.ws.onmessage = (event) => this.onMessage(event);
this.ws.onerror = (error) => this.onError(error);
this.ws.onclose = () => this.onClose();
}
onOpen() {
console.log('WebSocket connected');
// Authenticate
this.send({
type: 'auth',
api_key: this.apiKey
});
// Subscribe to channels
this.subscribe('whales:BTC');
this.subscribe('derivatives:liquidations');
}
send(message) {
if (this.ws.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify(message));
}
}
subscribe(channel) {
this.send({
type: 'subscribe',
channel: channel
});
}
onMessage(event) {
const data = JSON.parse(event.data);
// Route to appropriate handler
const handler = this.handlers.get(data.type);
if (handler) {
handler(data);
} else {
this.handleEvent(data);
}
}
handleEvent(data) {
const { type, channel, payload } = data;
console.log(`Event: ${type} on ${channel}`, payload);
if (type === 'whale_update') {
this.handleWhaleUpdate(payload);
} else if (type === 'liquidation') {
this.handleLiquidation(payload);
} else if (type === 'funding_rate') {
this.handleFundingRate(payload);
}
}
handleWhaleUpdate(data) {
console.log(
`Whale ${data.wallet.substring(0, 10)}... ` +
`moved ${data.size_usd} USD in ${data.asset}`
);
// Update UI
}
handleLiquidation(data) {
console.log(
`Liquidation: ${data.symbol} at ${data.price} - ` +
`${data.size_usd} USD`
);
// Send alert
}
handleFundingRate(data) {
console.log(
`Funding ${data.symbol}: ${(data.rate * 100).toFixed(4)}% ` +
`(${data.exchange})`
);
}
on(eventType, handler) {
this.handlers.set(eventType, handler);
}
onError(error) {
console.error('WebSocket error:', error);
}
onClose() {
console.log('WebSocket closed');
// Reconnect after delay
setTimeout(() => this.connect(), 5000);
}
close() {
if (this.ws) {
this.ws.close();
}
}
}
// Usage
const smartMoneyWS = new SmartMoneyWebSocket('your_api_key');
smartMoneyWS.on('whale_update', (data) => {
// Custom handler for whale updates
console.log('Custom whale handler:', data);
});
smartMoneyWS.connect();
Connection Management and Reconnection
Implement robust connection management with automatic reconnection, heartbeat monitoring, and graceful error recovery.
import websocket
import json
import threading
import time
from typing import Callable, Optional
class RobustWebSocketClient:
def __init__(
self,
api_key: str,
max_reconnect_attempts: int = 10,
reconnect_delay: int = 5
):
self.api_key = api_key
self.ws = None
self.connected = False
self.subscriptions = set()
self.max_reconnect_attempts = max_reconnect_attempts
self.reconnect_delay = reconnect_delay
self.reconnect_attempts = 0
self.heartbeat_thread = None
self.message_handlers = {}
def connect(self):
"""Connect with automatic reconnection logic"""
while self.reconnect_attempts < self.max_reconnect_attempts:
try:
print(f"Attempting connection (attempt {self.reconnect_attempts + 1})")
self.ws = websocket.WebSocketApp(
'wss://api.smartmoneyapi.com/ws',
on_open=self.on_open,
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close
)
wst = threading.Thread(target=self.ws.run_forever)
wst.daemon = True
wst.start()
# Start heartbeat monitor
self.start_heartbeat()
return
except Exception as e:
print(f"Connection failed: {e}")
self.reconnect_attempts += 1
time.sleep(self.reconnect_delay * self.reconnect_attempts)
def on_open(self, ws):
"""Handle connection open"""
print("WebSocket connected")
self.connected = True
self.reconnect_attempts = 0
# Authenticate
ws.send(json.dumps({
'type': 'auth',
'api_key': self.api_key
}))
# Resubscribe to previous channels
for channel in self.subscriptions:
ws.send(json.dumps({
'type': 'subscribe',
'channel': channel
}))
def on_message(self, ws, message):
"""Handle incoming message"""
try:
data = json.loads(message)
# Handle system messages
if data.get('type') == 'pong':
print("Heartbeat pong received")
return
# Route to handlers
for pattern, handler in self.message_handlers.items():
if pattern in data.get('channel', ''):
handler(data)
except Exception as e:
print(f"Error processing message: {e}")
def on_error(self, ws, error):
"""Handle error"""
print(f"WebSocket error: {error}")
self.connected = False
def on_close(self, ws, close_status_code, close_msg):
"""Handle close"""
print("WebSocket closed")
self.connected = False
# Attempt reconnection
if self.reconnect_attempts < self.max_reconnect_attempts:
print(f"Reconnecting in {self.reconnect_delay} seconds...")
time.sleep(self.reconnect_delay)
self.connect()
def start_heartbeat(self):
"""Start heartbeat monitoring"""
def heartbeat():
while self.connected:
try:
if self.ws:
self.ws.send(json.stringify({
'type': 'ping'
}))
time.sleep(30) # Ping every 30 seconds
except:
pass
self.heartbeat_thread = threading.Thread(target=heartbeat)
self.heartbeat_thread.daemon = True
self.heartbeat_thread.start()
def subscribe(self, channel: str):
"""Subscribe to channel"""
if self.ws:
self.ws.send(json.dumps({
'type': 'subscribe',
'channel': channel
}))
self.subscriptions.add(channel)
def on_message_pattern(self, pattern: str, handler: Callable):
"""Register handler for message pattern"""
self.message_handlers[pattern] = handler
def close(self):
"""Close connection gracefully"""
self.connected = False
if self.ws:
self.ws.close()
Python WebSocket Implementation
Complete Python example showing a production-ready WebSocket client with connection pooling and message queuing.
# websocket_client.py - Production-ready implementation
import asyncio
import aiohttp
import json
from typing import Callable, Dict, Any
class AsyncSmartMoneyWebSocket:
def __init__(self, api_key: str):
self.api_key = api_key
self.session = None
self.ws = None
self.message_queue = asyncio.Queue()
self.subscriptions = set()
async def connect(self):
"""Connect to WebSocket"""
self.session = aiohttp.ClientSession()
async with self.session.ws_connect(
'wss://api.smartmoneyapi.com/ws'
) as ws:
self.ws = ws
# Authenticate
await ws.send_json({
'type': 'auth',
'api_key': self.api_key
})
# Listen for messages
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
data = json.loads(msg.data)
await self.message_queue.put(data)
async def subscribe(self, channel: str):
"""Subscribe to channel"""
if self.ws:
await self.ws.send_json({
'type': 'subscribe',
'channel': channel
})
self.subscriptions.add(channel)
async def process_messages(self, handler: Callable):
"""Process messages from queue"""
while True:
message = await self.message_queue.get()
await handler(message)
async def close(self):
"""Close connection"""
if self.ws:
await self.ws.close()
if self.session:
await self.session.close()
# Usage
async def main():
ws = AsyncSmartMoneyWebSocket('your_api_key')
# Message handler
async def handle_message(data):
print(f"Received: {data}")
# Connect and subscribe
connect_task = asyncio.create_task(ws.connect())
process_task = asyncio.create_task(ws.process_messages(handle_message))
await ws.subscribe('whales:BTC')
await ws.subscribe('derivatives:funding')
await asyncio.gather(connect_task, process_task)
if __name__ == '__main__':
asyncio.run(main())
JavaScript WebSocket Implementation
Full-featured JavaScript implementation for browser and Node.js environments with event emitters and state management.
// WebSocket client with state management
class SmartMoneyWSClient extends EventTarget {
constructor(apiKey, options = {}) {
super();
this.apiKey = apiKey;
this.url = 'wss://api.smartmoneyapi.com/ws';
this.ws = null;
this.isConnected = false;
this.subscriptions = new Set();
this.messageBuffer = [];
this.reconnectAttempts = 0;
this.maxReconnectAttempts = options.maxReconnectAttempts || 10;
this.reconnectDelay = options.reconnectDelay || 5000;
}
connect() {
try {
this.ws = new WebSocket(this.url);
this.ws.onopen = () => this.handleOpen();
this.ws.onmessage = (event) => this.handleMessage(event);
this.ws.onerror = (error) => this.handleError(error);
this.ws.onclose = () => this.handleClose();
} catch (error) {
console.error('Connection error:', error);
this.scheduleReconnect();
}
}
handleOpen() {
console.log('WebSocket connected');
this.isConnected = true;
this.reconnectAttempts = 0;
// Authenticate
this.send({
type: 'auth',
api_key: this.apiKey
});
// Process buffered messages
while (this.messageBuffer.length > 0) {
const msg = this.messageBuffer.shift();
this.send(msg);
}
// Emit connected event
this.dispatchEvent(new CustomEvent('connected'));
}
handleMessage(event) {
try {
const data = JSON.parse(event.data);
// Dispatch typed events
const eventType = `message:${data.type}`;
this.dispatchEvent(new CustomEvent(eventType, { detail: data }));
// Emit generic message event
this.dispatchEvent(new CustomEvent('message', { detail: data }));
} catch (error) {
console.error('Message parse error:', error);
}
}
handleError(error) {
console.error('WebSocket error:', error);
this.dispatchEvent(new CustomEvent('error', { detail: error }));
}
handleClose() {
console.log('WebSocket closed');
this.isConnected = false;
this.dispatchEvent(new CustomEvent('disconnected'));
this.scheduleReconnect();
}
scheduleReconnect() {
if (this.reconnectAttempts < this.maxReconnectAttempts) {
this.reconnectAttempts++;
const delay = this.reconnectDelay * Math.pow(2, this.reconnectAttempts - 1);
console.log(`Reconnecting in ${delay}ms (attempt ${this.reconnectAttempts})`);
setTimeout(() => this.connect(), delay);
} else {
console.error('Max reconnection attempts reached');
this.dispatchEvent(new CustomEvent('max_reconnect_failed'));
}
}
send(message) {
if (this.isConnected && this.ws.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify(message));
} else {
// Buffer message for later
this.messageBuffer.push(message);
}
}
subscribe(channel) {
this.send({
type: 'subscribe',
channel: channel
});
this.subscriptions.add(channel);
}
unsubscribe(channel) {
this.send({
type: 'unsubscribe',
channel: channel
});
this.subscriptions.delete(channel);
}
close() {
this.isConnected = false;
if (this.ws) {
this.ws.close();
}
}
}
// Usage with event listeners
const ws = new SmartMoneyWSClient('your_api_key');
ws.addEventListener('connected', () => {
console.log('Connected!');
ws.subscribe('whales:BTC');
ws.subscribe('derivatives:liquidations');
});
ws.addEventListener('message:whale_update', (event) => {
console.log('Whale update:', event.detail);
});
ws.addEventListener('message:liquidation', (event) => {
console.log('Liquidation:', event.detail);
});
ws.addEventListener('error', (event) => {
console.error('Error:', event.detail);
});
ws.connect();
Production Considerations
Follow these best practices when deploying WebSocket clients to production:
Production Checklist
- Connection Pooling: Use multiple connections for high-frequency data
- Message Buffering: Queue messages during disconnections
- Heartbeat Monitoring: Implement ping/pong to detect stale connections
- Exponential Backoff: Gradually increase reconnection delays
- Resource Cleanup: Properly close connections and clean up resources
- Error Logging: Log all connection errors for debugging
- Rate Limiting: Respect server rate limits on subscriptions
- Monitoring: Track connection uptime and message latency metrics
Pro Tip: Use dedicated WebSocket connections for different data types (whales, derivatives, signals) to prevent one connection's issues from affecting others.
For more advanced topics, check our guides on building trading bots, Telegram alerts, and data visualization.
Build Real-Time Applications
Stream real-time whale movements and market signals. Start with WebSocket integration today for sub-100ms latency updates.
Get WebSocket Access