Binance期货WebSocket流数据处理:满足条件行存储与全交易对流接入方案问询
Hey there! Let's break down your two core problems and walk through practical, actionable solutions for each.
1. Create Separate DataFrames for Closed Candles per Trading Pair
First, let's fix a critical mistake in your existing code: you're overwriting the pd.DataFrame class with pd.DataFrame=pd.json_normalize(json_message)—that's a big no-no, as it breaks pandas' core functionality. Let's correct that and build the logic to store closed candles for each pair in its own DataFrame.
Step-by-Step Implementation:
- Initialize a dictionary to hold DataFrames for each pair (keys = pair names, values = DataFrames).
- In your
on_messagehandler, check if the candle is closed (data.k.x == True). - For closed candles, extract the pair name, then either create a new DataFrame for the pair or append the row to the existing one (using
pd.concatinstead of the deprecatedappendmethod).
Here's the revised code:
import json import pandas as pd import websocket # Initialize a dictionary to store DataFrames for each trading pair pair_dfs = {} SOCKET= "wss://fstream.binance.com/stream?streams=bnbusdt_perpetual@continuousKline_1m/btcusdt_perpetual@continuousKline_1m/ethusdt_perpetual@continuousKline_1m/dashusdt_perpetual@continuousKline_1m" def on_open(ws): print('opened connection') def on_close(ws): print('connection closed') def on_message(ws, message): global pair_dfs json_message = json.loads(message) # Correctly normalize JSON to DataFrame (don't overwrite pd.DataFrame!) newdata = pd.json_normalize(json_message) # Drop unwanted columns (adjust indices as per your needs) df = newdata.drop(newdata.columns[[0,1,4,5,6,7,8,9,14,15,17,18,19,20]], axis=1) # Check if the candle is closed and get the pair name is_closed = df['data.k.x'].iloc[0] pair_name = df['data.ps'].iloc[0] if is_closed: # Create new DataFrame if pair doesn't exist in our dict if pair_name not in pair_dfs: pair_dfs[pair_name] = df.copy() # Append the row to the existing DataFrame else: pair_dfs[pair_name] = pd.concat([pair_dfs[pair_name], df], ignore_index=True) # Optional: Print confirmation when a closed candle is saved print(f"Saved closed candle for {pair_name}") # Start the WebSocket connection ws = websocket.WebSocketApp(SOCKET, on_open=on_open, on_close=on_close, on_message=on_message) ws.run_forever()
How to Access the DataFrames:
After running the code, you can pull up individual pair DataFrames like this:
# Get all closed candles for BNBUSDT bnb_closed_candles = pair_dfs['BNBUSDT'] # Get all closed candles for BTCUSDT btc_closed_candles = pair_dfs['BTCUSDT']
2. Subscribe to All Binance Futures Perpetual WebSocket Streams
You can't reliably subscribe to all streams via a single URL—the URL length would exceed browser/server limits (Binance restricts URL-based subscriptions to a small number of streams). Instead, use Binance's bulk subscription method: connect to a base WebSocket URL, then send a subscription message with all stream names after opening the connection.
Step-by-Step Implementation:
- Use Binance's REST API to fetch all active perpetual contract pairs.
- Construct stream names for each pair (format:
{pair.lower()}_perpetual@continuousKline_1m). - Send a bulk subscribe message via the WebSocket once connected.
Here's the code:
import json import pandas as pd import websocket import requests # Initialize dictionary for pair DataFrames pair_dfs = {} # Step 1: Fetch all perpetual contract pairs from Binance's REST API def get_all_perpetual_pairs(): url = "https://fapi.binance.com/fapi/v1/exchangeInfo" response = requests.get(url).json() # Filter for only perpetual contracts perpetual_pairs = [symbol['symbol'] for symbol in response['symbols'] if symbol['contractType'] == 'PERPETUAL'] return perpetual_pairs # Step 2: Build stream names for all pairs perpetual_pairs = get_all_perpetual_pairs() streams = [f"{pair.lower()}_perpetual@continuousKline_1m" for pair in perpetual_pairs] # Use base WebSocket URL (no streams appended) SOCKET = "wss://fstream.binance.com/ws" def on_open(ws): print('opened connection') # Send bulk subscription message subscribe_msg = { "method": "SUBSCRIBE", "params": streams, "id": 1 } ws.send(json.dumps(subscribe_msg)) print(f"Subscribed to {len(streams)} perpetual pairs") def on_close(ws): print('connection closed') def on_message(ws, message): global pair_dfs json_message = json.loads(message) # Skip subscription confirmation messages (optional) if 'result' in json_message: return # Process candle data newdata = pd.json_normalize(json_message) df = newdata.drop(newdata.columns[[0,1,4,5,6,7,8,9,14,15,17,18,19,20]], axis=1) is_closed = df['data.k.x'].iloc[0] pair_name = df['data.ps'].iloc[0] if is_closed: if pair_name not in pair_dfs: pair_dfs[pair_name] = df.copy() else: pair_dfs[pair_name] = pd.concat([pair_dfs[pair_name], df], ignore_index=True) print(f"Saved closed candle for {pair_name}") # Start WebSocket connection ws = websocket.WebSocketApp(SOCKET, on_open=on_open, on_close=on_close, on_message=on_message) ws.run_forever()
Notes:
- Binance limits each WebSocket connection to around 1000 streams. If you hit this limit, split the pairs into multiple connections.
- The REST API call automatically fetches all active perpetual pairs, so you don't have to hardcode any pair names.
内容的提问来源于stack exchange,提问作者abu bakar




