You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

Binance期货WebSocket流数据处理:满足条件行存储与全交易对流接入方案问询

Solutions for Your Binance WebSocket & DataFrame Organization Questions

Hey there! Let's break down your two core problems and walk through practical, actionable solutions for each.


1. Create Separate DataFrames for Closed Candles per Trading Pair

First, let's fix a critical mistake in your existing code: you're overwriting the pd.DataFrame class with pd.DataFrame=pd.json_normalize(json_message)—that's a big no-no, as it breaks pandas' core functionality. Let's correct that and build the logic to store closed candles for each pair in its own DataFrame.

Step-by-Step Implementation:

  • Initialize a dictionary to hold DataFrames for each pair (keys = pair names, values = DataFrames).
  • In your on_message handler, check if the candle is closed (data.k.x == True).
  • For closed candles, extract the pair name, then either create a new DataFrame for the pair or append the row to the existing one (using pd.concat instead of the deprecated append method).

Here's the revised code:

import json
import pandas as pd
import websocket

# Initialize a dictionary to store DataFrames for each trading pair
pair_dfs = {}

SOCKET= "wss://fstream.binance.com/stream?streams=bnbusdt_perpetual@continuousKline_1m/btcusdt_perpetual@continuousKline_1m/ethusdt_perpetual@continuousKline_1m/dashusdt_perpetual@continuousKline_1m"

def on_open(ws):
    print('opened connection')

def on_close(ws):
    print('connection closed')

def on_message(ws, message):
    global pair_dfs
    json_message = json.loads(message)
    # Correctly normalize JSON to DataFrame (don't overwrite pd.DataFrame!)
    newdata = pd.json_normalize(json_message)
    # Drop unwanted columns (adjust indices as per your needs)
    df = newdata.drop(newdata.columns[[0,1,4,5,6,7,8,9,14,15,17,18,19,20]], axis=1)
    
    # Check if the candle is closed and get the pair name
    is_closed = df['data.k.x'].iloc[0]
    pair_name = df['data.ps'].iloc[0]
    
    if is_closed:
        # Create new DataFrame if pair doesn't exist in our dict
        if pair_name not in pair_dfs:
            pair_dfs[pair_name] = df.copy()
        # Append the row to the existing DataFrame
        else:
            pair_dfs[pair_name] = pd.concat([pair_dfs[pair_name], df], ignore_index=True)
        
        # Optional: Print confirmation when a closed candle is saved
        print(f"Saved closed candle for {pair_name}")

# Start the WebSocket connection
ws = websocket.WebSocketApp(SOCKET, on_open=on_open, on_close=on_close, on_message=on_message)
ws.run_forever()

How to Access the DataFrames:

After running the code, you can pull up individual pair DataFrames like this:

# Get all closed candles for BNBUSDT
bnb_closed_candles = pair_dfs['BNBUSDT']
# Get all closed candles for BTCUSDT
btc_closed_candles = pair_dfs['BTCUSDT']

2. Subscribe to All Binance Futures Perpetual WebSocket Streams

You can't reliably subscribe to all streams via a single URL—the URL length would exceed browser/server limits (Binance restricts URL-based subscriptions to a small number of streams). Instead, use Binance's bulk subscription method: connect to a base WebSocket URL, then send a subscription message with all stream names after opening the connection.

Step-by-Step Implementation:

  1. Use Binance's REST API to fetch all active perpetual contract pairs.
  2. Construct stream names for each pair (format: {pair.lower()}_perpetual@continuousKline_1m).
  3. Send a bulk subscribe message via the WebSocket once connected.

Here's the code:

import json
import pandas as pd
import websocket
import requests

# Initialize dictionary for pair DataFrames
pair_dfs = {}

# Step 1: Fetch all perpetual contract pairs from Binance's REST API
def get_all_perpetual_pairs():
    url = "https://fapi.binance.com/fapi/v1/exchangeInfo"
    response = requests.get(url).json()
    # Filter for only perpetual contracts
    perpetual_pairs = [symbol['symbol'] for symbol in response['symbols'] if symbol['contractType'] == 'PERPETUAL']
    return perpetual_pairs

# Step 2: Build stream names for all pairs
perpetual_pairs = get_all_perpetual_pairs()
streams = [f"{pair.lower()}_perpetual@continuousKline_1m" for pair in perpetual_pairs]

# Use base WebSocket URL (no streams appended)
SOCKET = "wss://fstream.binance.com/ws"

def on_open(ws):
    print('opened connection')
    # Send bulk subscription message
    subscribe_msg = {
        "method": "SUBSCRIBE",
        "params": streams,
        "id": 1
    }
    ws.send(json.dumps(subscribe_msg))
    print(f"Subscribed to {len(streams)} perpetual pairs")

def on_close(ws):
    print('connection closed')

def on_message(ws, message):
    global pair_dfs
    json_message = json.loads(message)
    # Skip subscription confirmation messages (optional)
    if 'result' in json_message:
        return
    # Process candle data
    newdata = pd.json_normalize(json_message)
    df = newdata.drop(newdata.columns[[0,1,4,5,6,7,8,9,14,15,17,18,19,20]], axis=1)
    
    is_closed = df['data.k.x'].iloc[0]
    pair_name = df['data.ps'].iloc[0]
    
    if is_closed:
        if pair_name not in pair_dfs:
            pair_dfs[pair_name] = df.copy()
        else:
            pair_dfs[pair_name] = pd.concat([pair_dfs[pair_name], df], ignore_index=True)
        print(f"Saved closed candle for {pair_name}")

# Start WebSocket connection
ws = websocket.WebSocketApp(SOCKET, on_open=on_open, on_close=on_close, on_message=on_message)
ws.run_forever()

Notes:

  • Binance limits each WebSocket connection to around 1000 streams. If you hit this limit, split the pairs into multiple connections.
  • The REST API call automatically fetches all active perpetual pairs, so you don't have to hardcode any pair names.

内容的提问来源于stack exchange,提问作者abu bakar

火山引擎 最新活动