Skip to content

第五章:实时数据流 (CCXT Pro)

CCXT Pro 提供了基于 WebSocket 的实时数据流功能,相比传统的 REST API 轮询方式,WebSocket 能提供更低的延迟和更高的效率。本章将详细介绍如何使用 CCXT Pro 获取各种实时数据。

🚀 CCXT Pro 简介

CCXT Pro 特性

  • 低延迟:WebSocket 连接提供毫秒级数据更新
  • 🔄 实时推送:数据变化时主动推送,无需轮询
  • 📊 多种数据:实时行情、订单簿、成交记录、K线等
  • 🛠️ 统一接口:与 CCXT 基础版保持一致的 API 设计
  • 🔌 自动重连:内置断线重连机制
  • 💾 状态管理:自动维护订单簿等数据结构

安装和配置

bash
# 安装 CCXT Pro
npm install ccxt.pro

# Python
pip install ccxtpro
javascript
// JavaScript/Node.js
const ccxtpro = require('ccxt.pro');

// 创建 Pro 交易所实例
const exchange = new ccxtpro.binance({
    enableRateLimit: true,
    // API密钥(某些功能需要)
    apiKey: process.env.BINANCE_API_KEY,
    secret: process.env.BINANCE_SECRET,
    
    // WebSocket 特定配置
    options: {
        watchOrderBookLimit: 100, // 订单簿深度限制
        tradesLimit: 1000,       // 成交记录缓存数量
        OHLCVLimit: 1000,        // K线数据缓存数量
    },
});

console.log('CCXT Pro 版本:', ccxtpro.__version__);
console.log('支持的交易所:', ccxtpro.exchanges);
python
# Python
import ccxtpro
import asyncio

# 创建 Pro 交易所实例
exchange = ccxtpro.binance({
    'enableRateLimit': True,
    'apiKey': os.getenv('BINANCE_API_KEY'),
    'secret': os.getenv('BINANCE_SECRET'),
    'options': {
        'watchOrderBookLimit': 100,
        'tradesLimit': 1000,
        'OHLCVLimit': 1000,
    },
})

print('CCXT Pro 版本:', ccxtpro.__version__)
print('支持的交易所:', ccxtpro.exchanges)

📈 实时行情数据

监听单个行情

javascript
class TickerWatcher {
    constructor(exchange) {
        this.exchange = exchange;
        this.isRunning = false;
    }
    
    // 监听单个交易对行情
    async watchSingleTicker(symbol) {
        console.log(`📈 开始监听 ${symbol} 实时行情...\n`);
        
        this.isRunning = true;
        
        while (this.isRunning) {
            try {
                const ticker = await this.exchange.watchTicker(symbol);
                
                // 处理行情数据
                this.processTicker(ticker);
                
            } catch (error) {
                console.error(`❌ 监听 ${symbol} 行情错误:`, error.message);
                
                // 短暂等待后重试
                await new Promise(resolve => setTimeout(resolve, 5000));
            }
        }
    }
    
    // 处理行情数据
    processTicker(ticker) {
        const timestamp = new Date(ticker.timestamp).toLocaleTimeString();
        const symbol = ticker.symbol;
        const price = ticker.last;
        const change = ticker.percentage || 0;
        const volume = ticker.baseVolume || 0;
        
        // 价格变动指示器
        const changeEmoji = change >= 0 ? '🟢' : '🔴';
        const priceEmoji = this.getPriceChangeEmoji(change);
        
        // 实时显示
        console.log(
            `${priceEmoji} ${timestamp} | ${symbol} | ` +
            `$${price} | ${change.toFixed(2)}% | ` +
            `Vol: ${volume.toFixed(2)}`
        );
        
        // 价格警报检查
        this.checkPriceAlerts(ticker);
    }
    
    // 获取价格变动表情
    getPriceChangeEmoji(change) {
        if (change >= 5) return '🚀';
        if (change >= 2) return '📈';
        if (change >= 0) return '🟢';
        if (change >= -2) return '🔴';
        if (change >= -5) return '📉';
        return '💥';
    }
    
    // 价格警报检查
    checkPriceAlerts(ticker) {
        const alerts = [
            { symbol: 'BTC/USDT', price: 50000, condition: 'above' },
            { symbol: 'ETH/USDT', price: 3000, condition: 'below' },
        ];
        
        alerts.forEach(alert => {
            if (ticker.symbol === alert.symbol) {
                const shouldAlert = alert.condition === 'above' ? 
                    ticker.last >= alert.price : 
                    ticker.last <= alert.price;
                
                if (shouldAlert) {
                    console.log(`🚨 价格警报: ${ticker.symbol} ${ticker.last} (阈值: ${alert.price})`);
                }
            }
        });
    }
    
    // 停止监听
    stop() {
        this.isRunning = false;
        console.log('⏹️ 停止行情监听');
    }
}

// 使用示例
async function watchTickerExample() {
    const exchange = new ccxtpro.binance({ enableRateLimit: true });
    const watcher = new TickerWatcher(exchange);
    
    // 监听 BTC/USDT 行情
    await watcher.watchSingleTicker('BTC/USDT');
}

// watchTickerExample();

监听多个行情

javascript
class MultiTickerWatcher {
    constructor(exchange) {
        this.exchange = exchange;
        this.symbols = [];
        this.isRunning = false;
        this.tickerData = new Map();
    }
    
    // 添加监听的交易对
    addSymbol(symbol) {
        if (!this.symbols.includes(symbol)) {
            this.symbols.push(symbol);
            console.log(`➕ 添加监听: ${symbol}`);
        }
    }
    
    // 移除监听的交易对
    removeSymbol(symbol) {
        const index = this.symbols.indexOf(symbol);
        if (index > -1) {
            this.symbols.splice(index, 1);
            this.tickerData.delete(symbol);
            console.log(`➖ 移除监听: ${symbol}`);
        }
    }
    
    // 开始监听多个行情
    async startWatching() {
        if (this.symbols.length === 0) {
            console.log('❌ 没有要监听的交易对');
            return;
        }
        
        console.log(`📊 开始监听 ${this.symbols.length} 个交易对的行情...\n`);
        
        this.isRunning = true;
        
        try {
            while (this.isRunning) {
                // 检查交易所是否支持批量监听
                if (this.exchange.has['watchTickers']) {
                    const tickers = await this.exchange.watchTickers(this.symbols);
                    this.processMultipleTickers(tickers);
                } else {
                    // 逐个监听(某些交易所可能不支持批量)
                    await this.watchTickersSequentially();
                }
            }
        } catch (error) {
            console.error('❌ 监听多个行情错误:', error.message);
            
            if (this.isRunning) {
                console.log('🔄 5秒后重新连接...');
                await new Promise(resolve => setTimeout(resolve, 5000));
                await this.startWatching(); // 重新开始
            }
        }
    }
    
    // 逐个监听行情(备用方案)
    async watchTickersSequentially() {
        const promises = this.symbols.map(symbol => 
            this.watchSingleTickerAsync(symbol)
        );
        
        await Promise.race(promises);
    }
    
    async watchSingleTickerAsync(symbol) {
        try {
            const ticker = await this.exchange.watchTicker(symbol);
            this.tickerData.set(symbol, ticker);
            this.displayTickerUpdate(ticker);
        } catch (error) {
            console.error(`❌ ${symbol} 行情错误: ${error.message}`);
        }
    }
    
    // 处理多个行情数据
    processMultipleTickers(tickers) {
        const timestamp = new Date().toLocaleTimeString();
        
        console.clear(); // 清屏(可选)
        console.log(`⏰ ${timestamp} - 实时行情监控面板`);
        console.log('='.repeat(80));
        
        // 按涨跌幅排序
        const sortedTickers = Object.entries(tickers)
            .sort(([,a], [,b]) => (b.percentage || 0) - (a.percentage || 0));
        
        console.log('交易对'.padEnd(12) + '价格'.padEnd(12) + '涨跌幅'.padEnd(10) + 
                   '24h成交量'.padEnd(15) + '更新时间');
        console.log('-'.repeat(80));
        
        sortedTickers.forEach(([symbol, ticker]) => {
            const price = `$${ticker.last?.toFixed(2) || 'N/A'}`.padEnd(12);
            const change = `${(ticker.percentage || 0).toFixed(2)}%`.padEnd(10);
            const volume = `${(ticker.baseVolume || 0).toFixed(2)}`.padEnd(15);
            const time = new Date(ticker.timestamp).toLocaleTimeString();
            const emoji = (ticker.percentage || 0) >= 0 ? '🟢' : '🔴';
            
            console.log(`${emoji} ${symbol.padEnd(10)}${price}${change}${volume}${time}`);
            
            // 更新本地缓存
            this.tickerData.set(symbol, ticker);
        });
        
        console.log('='.repeat(80));
        
        // 显示统计信息
        this.displayMarketStats(tickers);
    }
    
    // 显示单个行情更新
    displayTickerUpdate(ticker) {
        const timestamp = new Date(ticker.timestamp).toLocaleTimeString();
        const change = ticker.percentage || 0;
        const emoji = change >= 0 ? '🟢' : '🔴';
        
        console.log(
            `${emoji} ${timestamp} | ${ticker.symbol} | ` +
            `$${ticker.last} | ${change.toFixed(2)}%`
        );
    }
    
    // 显示市场统计
    displayMarketStats(tickers) {
        const symbols = Object.keys(tickers);
        const upCount = symbols.filter(s => (tickers[s].percentage || 0) > 0).length;
        const downCount = symbols.filter(s => (tickers[s].percentage || 0) < 0).length;
        const unchangedCount = symbols.length - upCount - downCount;
        
        const avgChange = symbols.reduce((sum, s) => 
            sum + (tickers[s].percentage || 0), 0) / symbols.length;
        
        console.log(`📊 市场概况: 上涨 ${upCount} | 下跌 ${downCount} | 平盘 ${unchangedCount}`);
        console.log(`📈 平均涨跌幅: ${avgChange.toFixed(2)}%`);
        console.log(`🌡️ 市场情绪: ${avgChange > 0 ? '乐观 😊' : avgChange < 0 ? '悲观 😰' : '中性 😐'}\n`);
    }
    
    // 获取特定交易对的最新数据
    getLatestTicker(symbol) {
        return this.tickerData.get(symbol);
    }
    
    // 获取所有交易对的最新数据
    getAllLatestTickers() {
        return Object.fromEntries(this.tickerData);
    }
    
    // 停止监听
    stop() {
        this.isRunning = false;
        console.log('⏹️ 停止多行情监听');
    }
}

// 使用示例
async function multiTickerExample() {
    const exchange = new ccxtpro.binance({ enableRateLimit: true });
    const watcher = new MultiTickerWatcher(exchange);
    
    // 添加要监听的交易对
    watcher.addSymbol('BTC/USDT');
    watcher.addSymbol('ETH/USDT');
    watcher.addSymbol('BNB/USDT');
    watcher.addSymbol('ADA/USDT');
    watcher.addSymbol('DOT/USDT');
    
    // 开始监听
    await watcher.startWatching();
}

// multiTickerExample();

📖 实时订单簿

订单簿监听器

javascript
class OrderBookWatcher {
    constructor(exchange) {
        this.exchange = exchange;
        this.isRunning = false;
        this.previousOrderBook = null;
        this.priceAlerts = [];
    }
    
    // 监听订单簿
    async watchOrderBook(symbol, limit = 10) {
        console.log(`📖 开始监听 ${symbol} 实时订单簿 (深度: ${limit})...\n`);
        
        this.isRunning = true;
        
        while (this.isRunning) {
            try {
                const orderbook = await this.exchange.watchOrderBook(symbol, limit);
                
                // 处理订单簿数据
                this.processOrderBook(orderbook);
                
                // 检测变化
                if (this.previousOrderBook) {
                    this.detectOrderBookChanges(this.previousOrderBook, orderbook);
                }
                
                this.previousOrderBook = this.cloneOrderBook(orderbook);
                
            } catch (error) {
                console.error(`❌ 监听订单簿错误: ${error.message}`);
                await new Promise(resolve => setTimeout(resolve, 5000));
            }
        }
    }
    
    // 处理订单簿数据
    processOrderBook(orderbook) {
        const timestamp = new Date(orderbook.timestamp).toLocaleTimeString();
        
        // 计算关键指标
        const bestBid = orderbook.bids[0];
        const bestAsk = orderbook.asks[0];
        
        if (!bestBid || !bestAsk) return;
        
        const spread = bestAsk[0] - bestBid[0];
        const spreadPercent = (spread / bestBid[0]) * 100;
        const midPrice = (bestBid[0] + bestAsk[0]) / 2;
        
        // 计算深度
        const bidDepth = orderbook.bids.reduce((sum, [, amount]) => sum + amount, 0);
        const askDepth = orderbook.asks.reduce((sum, [, amount]) => sum + amount, 0);
        const depthRatio = bidDepth / askDepth;
        
        // 显示核心信息
        console.log(`⏰ ${timestamp} | ${orderbook.symbol}`);
        console.log(`💰 最优买价: $${bestBid[0].toFixed(2)} (${bestBid[1].toFixed(4)})`);
        console.log(`💰 最优卖价: $${bestAsk[0].toFixed(2)} (${bestAsk[1].toFixed(4)})`);
        console.log(`📏 价差: $${spread.toFixed(2)} (${spreadPercent.toFixed(4)}%)`);
        console.log(`📊 中间价: $${midPrice.toFixed(2)}`);
        console.log(`⚖️ 深度比: ${depthRatio.toFixed(2)} (买/卖)`);
        
        // 显示订单簿
        this.displayOrderBook(orderbook, 5);
        
        // 检查价格警报
        this.checkOrderBookAlerts(orderbook);
        
        console.log('-'.repeat(60));
    }
    
    // 显示订单簿
    displayOrderBook(orderbook, displayLimit) {
        const bids = orderbook.bids.slice(0, displayLimit);
        const asks = orderbook.asks.slice(0, displayLimit).reverse();
        
        console.log('\n📈 订单簿:');
        console.log('       卖盘 (ASK)       |       买盘 (BID)       ');
        console.log('   价格    |    数量     |   价格    |    数量   ');
        console.log('-'.repeat(50));
        
        const maxLength = Math.max(asks.length, bids.length);
        
        for (let i = 0; i < maxLength; i++) {
            const ask = asks[i] || ['', ''];
            const bid = bids[i] || ['', ''];
            
            const askPrice = ask[0] ? `$${ask[0].toFixed(2)}` : '';
            const askAmount = ask[1] ? ask[1].toFixed(4) : '';
            const bidPrice = bid[0] ? `$${bid[0].toFixed(2)}` : '';
            const bidAmount = bid[1] ? bid[1].toFixed(4) : '';
            
            console.log(
                `${askPrice.padStart(9)} | ${askAmount.padStart(9)} | ` +
                `${bidPrice.padStart(9)} | ${bidAmount.padStart(9)}`
            );
        }
    }
    
    // 检测订单簿变化
    detectOrderBookChanges(previous, current) {
        const changes = this.getOrderBookChanges(previous, current);
        
        if (changes.length > 0) {
            console.log(`🔄 检测到 ${changes.length} 个变化:`);
            
            changes.slice(0, 5).forEach(change => { // 只显示前5个变化
                const emoji = change.side === 'bid' ? '🟢' : '🔴';
                const action = change.type === 'add' ? '新增' : 
                              change.type === 'remove' ? '移除' : '修改';
                
                console.log(`  ${emoji} ${action} ${change.side}: $${change.price} × ${change.amount}`);
            });
        }
    }
    
    // 获取订单簿变化
    getOrderBookChanges(previous, current) {
        const changes = [];
        
        // 检查买盘变化
        changes.push(...this.getSideChanges(previous.bids, current.bids, 'bid'));
        
        // 检查卖盘变化
        changes.push(...this.getSideChanges(previous.asks, current.asks, 'ask'));
        
        return changes;
    }
    
    // 获取单边变化
    getSideChanges(previousSide, currentSide, side) {
        const changes = [];
        const previousMap = new Map(previousSide.map(([price, amount]) => [price.toString(), amount]));
        const currentMap = new Map(currentSide.map(([price, amount]) => [price.toString(), amount]));
        
        // 检查新增和修改
        for (const [priceStr, amount] of currentMap) {
            const price = parseFloat(priceStr);
            const previousAmount = previousMap.get(priceStr);
            
            if (previousAmount === undefined) {
                changes.push({ type: 'add', side, price, amount });
            } else if (Math.abs(previousAmount - amount) > 0.0001) {
                changes.push({ 
                    type: 'modify', 
                    side, 
                    price, 
                    amount, 
                    change: amount - previousAmount 
                });
            }
        }
        
        // 检查移除
        for (const [priceStr, amount] of previousMap) {
            if (!currentMap.has(priceStr)) {
                changes.push({ 
                    type: 'remove', 
                    side, 
                    price: parseFloat(priceStr), 
                    amount 
                });
            }
        }
        
        return changes;
    }
    
    // 检查订单簿警报
    checkOrderBookAlerts(orderbook) {
        const bestBid = orderbook.bids[0];
        const bestAsk = orderbook.asks[0];
        
        if (!bestBid || !bestAsk) return;
        
        const spread = bestAsk[0] - bestBid[0];
        const spreadPercent = (spread / bestBid[0]) * 100;
        
        // 价差警报
        if (spreadPercent > 0.1) {
            console.log(`🚨 价差警报: ${spreadPercent.toFixed(4)}% (超过0.1%)`);
        }
        
        // 深度警报
        const bidDepth = orderbook.bids.reduce((sum, [, amount]) => sum + amount, 0);
        const askDepth = orderbook.asks.reduce((sum, [, amount]) => sum + amount, 0);
        
        if (bidDepth / askDepth > 3 || askDepth / bidDepth > 3) {
            console.log(`🚨 深度不平衡: 买盘 ${bidDepth.toFixed(2)} vs 卖盘 ${askDepth.toFixed(2)}`);
        }
    }
    
    // 克隆订单簿
    cloneOrderBook(orderbook) {
        return {
            symbol: orderbook.symbol,
            timestamp: orderbook.timestamp,
            bids: [...orderbook.bids.map(([price, amount]) => [price, amount])],
            asks: [...orderbook.asks.map(([price, amount]) => [price, amount])],
        };
    }
    
    // 停止监听
    stop() {
        this.isRunning = false;
        console.log('⏹️ 停止订单簿监听');
    }
}

// 使用示例
async function orderBookExample() {
    const exchange = new ccxtpro.binance({ enableRateLimit: true });
    const watcher = new OrderBookWatcher(exchange);
    
    // 监听 BTC/USDT 订单簿
    await watcher.watchOrderBook('BTC/USDT', 10);
}

// orderBookExample();

💱 实时成交数据

成交记录监听器

javascript
class TradesWatcher {
    constructor(exchange) {
        this.exchange = exchange;
        this.isRunning = false;
        this.tradesBuffer = [];
        this.statistics = {
            totalTrades: 0,
            totalVolume: 0,
            totalValue: 0,
            buyTrades: 0,
            sellTrades: 0,
            lastReset: Date.now(),
        };
    }
    
    // 监听实时成交
    async watchTrades(symbol, limit = 100) {
        console.log(`💱 开始监听 ${symbol} 实时成交记录...\n`);
        
        this.isRunning = true;
        
        while (this.isRunning) {
            try {
                const trades = await this.exchange.watchTrades(symbol, undefined, limit);
                
                // 处理新的成交记录
                this.processTrades(trades);
                
            } catch (error) {
                console.error(`❌ 监听成交记录错误: ${error.message}`);
                await new Promise(resolve => setTimeout(resolve, 5000));
            }
        }
    }
    
    // 处理成交记录
    processTrades(trades) {
        if (trades.length === 0) return;
        
        // 获取新的成交记录
        const newTrades = this.getNewTrades(trades);
        
        if (newTrades.length > 0) {
            // 显示新成交
            this.displayNewTrades(newTrades);
            
            // 更新统计
            this.updateStatistics(newTrades);
            
            // 分析成交模式
            this.analyzeTradingPattern(newTrades);
            
            // 更新缓存
            this.updateTradesBuffer(trades);
        }
    }
    
    // 获取新的成交记录
    getNewTrades(trades) {
        if (this.tradesBuffer.length === 0) {
            return trades.slice(-5); // 初始显示最后5笔
        }
        
        const lastTimestamp = this.tradesBuffer[this.tradesBuffer.length - 1].timestamp;
        return trades.filter(trade => trade.timestamp > lastTimestamp);
    }
    
    // 显示新成交记录
    displayNewTrades(newTrades) {
        console.log(`🆕 新成交 (${newTrades.length} 笔):`);
        
        newTrades.forEach(trade => {
            const time = new Date(trade.timestamp).toLocaleTimeString();
            const side = trade.side === 'buy' ? '买入' : '卖出';
            const emoji = trade.side === 'buy' ? '🟢' : '🔴';
            const amount = trade.amount.toFixed(4);
            const price = trade.price.toFixed(2);
            const value = (trade.amount * trade.price).toFixed(2);
            
            // 判断大单
            const isLargeTrade = trade.amount * trade.price > 10000; // 大于1万USD
            const sizeEmoji = isLargeTrade ? '🐋' : '';
            
            console.log(
                `  ${emoji}${sizeEmoji} ${time} | ${side} | ` +
                `${amount} @ $${price} = $${value}`
            );
        });
        
        console.log('');
    }
    
    // 更新统计信息
    updateStatistics(newTrades) {
        newTrades.forEach(trade => {
            this.statistics.totalTrades++;
            this.statistics.totalVolume += trade.amount;
            this.statistics.totalValue += trade.amount * trade.price;
            
            if (trade.side === 'buy') {
                this.statistics.buyTrades++;
            } else {
                this.statistics.sellTrades++;
            }
        });
        
        // 每分钟显示一次统计
        const now = Date.now();
        if (now - this.statistics.lastReset > 60000) {
            this.displayStatistics();
            this.resetStatistics();
        }
    }
    
    // 显示统计信息
    displayStatistics() {
        const duration = (Date.now() - this.statistics.lastReset) / 1000;
        const buyRatio = (this.statistics.buyTrades / this.statistics.totalTrades * 100).toFixed(1);
        const avgTradeSize = (this.statistics.totalVolume / this.statistics.totalTrades).toFixed(4);
        const avgTradeValue = (this.statistics.totalValue / this.statistics.totalTrades).toFixed(2);
        
        console.log('📊 过去1分钟统计:');
        console.log(`总成交: ${this.statistics.totalTrades} 笔`);
        console.log(`买入: ${this.statistics.buyTrades} 笔 (${buyRatio}%)`);
        console.log(`卖出: ${this.statistics.sellTrades} 笔 (${(100-buyRatio).toFixed(1)}%)`);
        console.log(`总成交量: ${this.statistics.totalVolume.toFixed(4)}`);
        console.log(`总成交额: $${this.statistics.totalValue.toLocaleString()}`);
        console.log(`平均成交量: ${avgTradeSize}`);
        console.log(`平均成交额: $${avgTradeValue}`);
        console.log(`成交频率: ${(this.statistics.totalTrades / duration * 60).toFixed(1)} 笔/分钟`);
        console.log('-'.repeat(50));
    }
    
    // 重置统计
    resetStatistics() {
        this.statistics = {
            totalTrades: 0,
            totalVolume: 0,
            totalValue: 0,
            buyTrades: 0,
            sellTrades: 0,
            lastReset: Date.now(),
        };
    }
    
    // 分析交易模式
    analyzeTradingPattern(newTrades) {
        if (newTrades.length < 3) return;
        
        // 检测连续大单
        const largeTrades = newTrades.filter(trade => 
            trade.amount * trade.price > 10000
        );
        
        if (largeTrades.length >= 3) {
            const direction = largeTrades.every(t => t.side === 'buy') ? '买入' : 
                            largeTrades.every(t => t.side === 'sell') ? '卖出' : '混合';
            console.log(`🐋 检测到连续大单: ${largeTrades.length} 笔 (${direction})`);
        }
        
        // 检测价格异动
        if (newTrades.length >= 2) {
            const prices = newTrades.map(t => t.price);
            const minPrice = Math.min(...prices);
            const maxPrice = Math.max(...prices);
            const priceRange = ((maxPrice - minPrice) / minPrice * 100);
            
            if (priceRange > 0.5) { // 0.5% 以上的价格波动
                console.log(`⚡ 检测到价格异动: ${priceRange.toFixed(2)}% (${minPrice.toFixed(2)} - ${maxPrice.toFixed(2)})`);
            }
        }
    }
    
    // 更新成交缓存
    updateTradesBuffer(trades) {
        this.tradesBuffer = [...trades];
        
        // 限制缓存大小
        if (this.tradesBuffer.length > 1000) {
            this.tradesBuffer = this.tradesBuffer.slice(-1000);
        }
    }
    
    // 获取最近成交统计
    getRecentTradeStats(minutes = 5) {
        const cutoffTime = Date.now() - (minutes * 60 * 1000);
        const recentTrades = this.tradesBuffer.filter(trade => 
            trade.timestamp > cutoffTime
        );
        
        if (recentTrades.length === 0) return null;
        
        const buyTrades = recentTrades.filter(t => t.side === 'buy').length;
        const sellTrades = recentTrades.length - buyTrades;
        const totalVolume = recentTrades.reduce((sum, t) => sum + t.amount, 0);
        const totalValue = recentTrades.reduce((sum, t) => sum + (t.amount * t.price), 0);
        
        return {
            period: `${minutes} 分钟`,
            totalTrades: recentTrades.length,
            buyTrades,
            sellTrades,
            buyRatio: (buyTrades / recentTrades.length * 100).toFixed(1),
            totalVolume: totalVolume.toFixed(4),
            totalValue: totalValue.toFixed(2),
            avgPrice: (totalValue / totalVolume).toFixed(2),
        };
    }
    
    // 停止监听
    stop() {
        this.isRunning = false;
        console.log('⏹️ 停止成交记录监听');
    }
}

// 使用示例
async function tradesExample() {
    const exchange = new ccxtpro.binance({ enableRateLimit: true });
    const watcher = new TradesWatcher(exchange);
    
    // 监听 ETH/USDT 成交记录
    await watcher.watchTrades('ETH/USDT', 100);
}

// tradesExample();

📊 实时K线数据

K线数据监听器

javascript
class OHLCVWatcher {
    constructor(exchange) {
        this.exchange = exchange;
        this.isRunning = false;
        this.candleBuffer = new Map(); // 按时间周期存储
        this.indicators = new Map();   // 技术指标缓存
    }
    
    // 监听K线数据
    async watchOHLCV(symbol, timeframe = '1m', limit = 100) {
        console.log(`📊 开始监听 ${symbol} ${timeframe} K线数据...\n`);
        
        if (!this.exchange.has['watchOHLCV']) {
            throw new Error('该交易所不支持实时K线数据');
        }
        
        this.isRunning = true;
        
        while (this.isRunning) {
            try {
                const ohlcv = await this.exchange.watchOHLCV(symbol, timeframe, undefined, limit);
                
                // 处理K线数据
                this.processOHLCV(ohlcv, symbol, timeframe);
                
            } catch (error) {
                console.error(`❌ 监听K线数据错误: ${error.message}`);
                await new Promise(resolve => setTimeout(resolve, 5000));
            }
        }
    }
    
    // 处理K线数据
    processOHLCV(ohlcv, symbol, timeframe) {
        if (ohlcv.length === 0) return;
        
        const key = `${symbol}:${timeframe}`;
        const previousData = this.candleBuffer.get(key) || [];
        
        // 检查是否有新的K线
        const newCandles = this.getNewCandles(previousData, ohlcv);
        
        if (newCandles.length > 0) {
            // 显示新K线
            this.displayNewCandles(newCandles, symbol, timeframe);
            
            // 计算技术指标
            this.calculateIndicators(ohlcv, symbol, timeframe);
            
            // 分析K线模式
            this.analyzeCandles(ohlcv, symbol, timeframe);
        }
        
        // 更新缓存
        this.candleBuffer.set(key, [...ohlcv]);
    }
    
    // 获取新K线
    getNewCandles(previousData, currentData) {
        if (previousData.length === 0) {
            return currentData.slice(-3); // 初始显示最后3根
        }
        
        const lastTimestamp = previousData[previousData.length - 1][0];
        return currentData.filter(candle => candle[0] > lastTimestamp);
    }
    
    // 显示新K线
    displayNewCandles(newCandles, symbol, timeframe) {
        console.log(`🕯️ ${symbol} ${timeframe} 新K线:`);
        
        newCandles.forEach(candle => {
            const [timestamp, open, high, low, close, volume] = candle;
            const time = new Date(timestamp).toLocaleString();
            const change = ((close - open) / open * 100);
            const emoji = change >= 0 ? '🟢' : '🔴';
            
            // 判断K线大小
            const bodySize = Math.abs(close - open);
            const shadowSize = high - low;
            const bodyRatio = bodySize / shadowSize;
            
            let patternEmoji = '';
            if (bodyRatio > 0.8) patternEmoji = '📏'; // 长实体
            else if (bodyRatio < 0.2) patternEmoji = '🕯️'; // 十字星
            
            console.log(
                `  ${emoji}${patternEmoji} ${time} | ` +
                `开: $${open.toFixed(2)} | 高: $${high.toFixed(2)} | ` +
                `低: $${low.toFixed(2)} | 收: $${close.toFixed(2)} | ` +
                `量: ${volume.toFixed(2)} | 涨跌: ${change.toFixed(2)}%`
            );
        });
        
        console.log('');
    }
    
    // 计算技术指标
    calculateIndicators(ohlcv, symbol, timeframe) {
        if (ohlcv.length < 20) return; // 需要足够的数据
        
        const key = `${symbol}:${timeframe}`;
        const closes = ohlcv.map(candle => candle[4]);
        
        // 计算简单移动平均线
        const sma5 = this.calculateSMA(closes, 5);
        const sma10 = this.calculateSMA(closes, 10);
        const sma20 = this.calculateSMA(closes, 20);
        
        // 计算RSI
        const rsi = this.calculateRSI(closes, 14);
        
        // 计算MACD
        const macd = this.calculateMACD(closes);
        
        const indicators = {
            sma5: sma5[sma5.length - 1],
            sma10: sma10[sma10.length - 1],
            sma20: sma20[sma20.length - 1],
            rsi: rsi[rsi.length - 1],
            macd: macd,
            timestamp: Date.now(),
        };
        
        this.indicators.set(key, indicators);
        
        // 显示技术指标
        this.displayIndicators(indicators, symbol, timeframe);
    }
    
    // 显示技术指标
    displayIndicators(indicators, symbol, timeframe) {
        console.log(`📈 ${symbol} ${timeframe} 技术指标:`);
        console.log(`SMA5: $${indicators.sma5?.toFixed(2) || 'N/A'}`);
        console.log(`SMA10: $${indicators.sma10?.toFixed(2) || 'N/A'}`);
        console.log(`SMA20: $${indicators.sma20?.toFixed(2) || 'N/A'}`);
        console.log(`RSI: ${indicators.rsi?.toFixed(2) || 'N/A'}`);
        
        if (indicators.macd) {
            console.log(`MACD: ${indicators.macd.macd?.toFixed(4) || 'N/A'}`);
            console.log(`信号线: ${indicators.macd.signal?.toFixed(4) || 'N/A'}`);
            console.log(`柱状图: ${indicators.macd.histogram?.toFixed(4) || 'N/A'}`);
        }
        
        // 简单信号判断
        this.generateSignals(indicators);
        
        console.log('-'.repeat(50));
    }
    
    // 生成交易信号
    generateSignals(indicators) {
        const signals = [];
        
        // RSI信号
        if (indicators.rsi) {
            if (indicators.rsi > 80) {
                signals.push('🔴 RSI超买 (>80)');
            } else if (indicators.rsi < 20) {
                signals.push('🟢 RSI超卖 (<20)');
            }
        }
        
        // SMA信号
        if (indicators.sma5 && indicators.sma10 && indicators.sma20) {
            if (indicators.sma5 > indicators.sma10 && indicators.sma10 > indicators.sma20) {
                signals.push('📈 短期上升趋势');
            } else if (indicators.sma5 < indicators.sma10 && indicators.sma10 < indicators.sma20) {
                signals.push('📉 短期下降趋势');
            }
        }
        
        // MACD信号
        if (indicators.macd && indicators.macd.histogram) {
            if (indicators.macd.histogram > 0) {
                signals.push('🟢 MACD多头排列');
            } else {
                signals.push('🔴 MACD空头排列');
            }
        }
        
        if (signals.length > 0) {
            console.log('🎯 交易信号:');
            signals.forEach(signal => console.log(`  ${signal}`));
        }
    }
    
    // 分析K线模式
    analyzeCandles(ohlcv, symbol, timeframe) {
        if (ohlcv.length < 3) return;
        
        const recent = ohlcv.slice(-3); // 最近3根K线
        const patterns = [];
        
        // 检测锤子线
        const lastCandle = recent[recent.length - 1];
        const [, open, high, low, close] = lastCandle;
        
        const bodySize = Math.abs(close - open);
        const upperShadow = high - Math.max(open, close);
        const lowerShadow = Math.min(open, close) - low;
        
        if (lowerShadow > bodySize * 2 && upperShadow < bodySize * 0.5) {
            patterns.push('🔨 锤子线');
        }
        
        // 检测吞噬模式
        if (recent.length >= 2) {
            const prev = recent[recent.length - 2];
            const curr = recent[recent.length - 1];
            
            if (prev[4] < prev[1] && curr[4] > curr[1] && 
                curr[1] < prev[4] && curr[4] > prev[1]) {
                patterns.push('🐻➡️🐂 看涨吞噬');
            }
        }
        
        // 检测三连阳/阴
        if (recent.length >= 3) {
            const allUp = recent.every(candle => candle[4] > candle[1]);
            const allDown = recent.every(candle => candle[4] < candle[1]);
            
            if (allUp) patterns.push('📈 三连阳');
            if (allDown) patterns.push('📉 三连阴');
        }
        
        if (patterns.length > 0) {
            console.log('🕯️ K线模式:');
            patterns.forEach(pattern => console.log(`  ${pattern}`));
        }
    }
    
    // 简单移动平均线
    calculateSMA(data, period) {
        const sma = [];
        for (let i = period - 1; i < data.length; i++) {
            const sum = data.slice(i - period + 1, i + 1).reduce((a, b) => a + b, 0);
            sma.push(sum / period);
        }
        return sma;
    }
    
    // RSI计算
    calculateRSI(closes, period = 14) {
        const rsi = [];
        const changes = [];
        
        for (let i = 1; i < closes.length; i++) {
            changes.push(closes[i] - closes[i - 1]);
        }
        
        for (let i = period; i < changes.length; i++) {
            const recentChanges = changes.slice(i - period, i);
            const gains = recentChanges.filter(change => change > 0);
            const losses = recentChanges.filter(change => change < 0).map(Math.abs);
            
            const avgGain = gains.reduce((a, b) => a + b, 0) / period;
            const avgLoss = losses.reduce((a, b) => a + b, 0) / period;
            
            if (avgLoss === 0) {
                rsi.push(100);
            } else {
                const rs = avgGain / avgLoss;
                rsi.push(100 - (100 / (1 + rs)));
            }
        }
        
        return rsi;
    }
    
    // MACD计算(简化版)
    calculateMACD(closes) {
        if (closes.length < 26) return null;
        
        const ema12 = this.calculateEMA(closes, 12);
        const ema26 = this.calculateEMA(closes, 26);
        
        if (ema12.length === 0 || ema26.length === 0) return null;
        
        const macdLine = ema12[ema12.length - 1] - ema26[ema26.length - 1];
        
        return {
            macd: macdLine,
            signal: null, // 简化版本
            histogram: null,
        };
    }
    
    // EMA计算
    calculateEMA(data, period) {
        const ema = [];
        const multiplier = 2 / (period + 1);
        
        ema[0] = data[0];
        
        for (let i = 1; i < data.length; i++) {
            ema[i] = (data[i] * multiplier) + (ema[i - 1] * (1 - multiplier));
        }
        
        return ema;
    }
    
    // 获取指标数据
    getIndicators(symbol, timeframe) {
        const key = `${symbol}:${timeframe}`;
        return this.indicators.get(key);
    }
    
    // 停止监听
    stop() {
        this.isRunning = false;
        console.log('⏹️ 停止K线数据监听');
    }
}

// 使用示例
async function ohlcvExample() {
    const exchange = new ccxtpro.binance({ enableRateLimit: true });
    const watcher = new OHLCVWatcher(exchange);
    
    // 监听 BTC/USDT 1分钟K线
    await watcher.watchOHLCV('BTC/USDT', '1m', 50);
}

// ohlcvExample();

🔄 连接管理与错误处理

连接管理器

javascript
class ConnectionManager {
    constructor() {
        this.connections = new Map();
        this.reconnectAttempts = new Map();
        this.maxReconnectAttempts = 5;
        this.reconnectDelay = 5000;
    }
    
    // 创建带重连功能的监听器
    async createResilientWatcher(exchange, watchFunction, ...args) {
        const key = `${exchange.id}_${watchFunction.name}_${args.join('_')}`;
        
        console.log(`🔌 创建弹性连接: ${key}`);
        
        this.connections.set(key, {
            exchange,
            watchFunction,
            args,
            isActive: true,
            lastError: null,
        });
        
        this.reconnectAttempts.set(key, 0);
        
        return this.startWatching(key);
    }
    
    async startWatching(key) {
        const connection = this.connections.get(key);
        if (!connection || !connection.isActive) return;
        
        try {
            console.log(`📡 开始监听: ${key}`);
            
            while (connection.isActive) {
                try {
                    const result = await connection.watchFunction.call(
                        connection.exchange, 
                        ...connection.args
                    );
                    
                    // 重置重连计数
                    this.reconnectAttempts.set(key, 0);
                    
                    // 这里可以添加数据处理逻辑
                    this.handleData(key, result);
                    
                } catch (error) {
                    console.error(`❌ 连接 ${key} 错误:`, error.message);
                    connection.lastError = error;
                    
                    if (connection.isActive) {
                        await this.handleReconnect(key);
                    }
                }
            }
            
        } catch (error) {
            console.error(`❌ 严重错误 ${key}:`, error.message);
            await this.handleReconnect(key);
        }
    }
    
    async handleReconnect(key) {
        const attempts = this.reconnectAttempts.get(key) || 0;
        
        if (attempts >= this.maxReconnectAttempts) {
            console.error(`💀 连接 ${key} 超过最大重连次数,停止重连`);
            this.stopConnection(key);
            return;
        }
        
        this.reconnectAttempts.set(key, attempts + 1);
        
        const delay = this.reconnectDelay * Math.pow(2, attempts); // 指数退避
        console.log(`🔄 ${attempts + 1}/${this.maxReconnectAttempts} 次重连 ${key},${delay/1000}秒后重试...`);
        
        await new Promise(resolve => setTimeout(resolve, delay));
        
        // 重新开始监听
        if (this.connections.get(key)?.isActive) {
            this.startWatching(key);
        }
    }
    
    handleData(key, data) {
        // 这里可以根据不同的监听类型处理数据
        // 实际应用中可以通过回调函数或事件来处理
        console.log(`📊 收到 ${key} 数据,长度: ${Array.isArray(data) ? data.length : 'N/A'}`);
    }
    
    // 停止特定连接
    stopConnection(key) {
        const connection = this.connections.get(key);
        if (connection) {
            connection.isActive = false;
            console.log(`⏹️ 停止连接: ${key}`);
        }
    }
    
    // 停止所有连接
    stopAllConnections() {
        console.log('⏹️ 停止所有连接...');
        
        for (const [key, connection] of this.connections) {
            connection.isActive = false;
        }
        
        this.connections.clear();
        this.reconnectAttempts.clear();
    }
    
    // 获取连接状态
    getConnectionStatus() {
        const status = [];
        
        for (const [key, connection] of this.connections) {
            status.push({
                key,
                isActive: connection.isActive,
                reconnectAttempts: this.reconnectAttempts.get(key) || 0,
                lastError: connection.lastError?.message || null,
            });
        }
        
        return status;
    }
    
    // 显示连接状态
    displayConnectionStatus() {
        const status = this.getConnectionStatus();
        
        console.log('\n📊 连接状态报告:');
        console.log('='.repeat(60));
        
        status.forEach(conn => {
            const statusEmoji = conn.isActive ? '🟢' : '🔴';
            const errorInfo = conn.lastError ? ` (错误: ${conn.lastError})` : '';
            
            console.log(
                `${statusEmoji} ${conn.key} | ` +
                `重连次数: ${conn.reconnectAttempts}${errorInfo}`
            );
        });
        
        console.log('='.repeat(60));
    }
}

// 综合监控示例
class ComprehensiveMonitor {
    constructor() {
        this.connectionManager = new ConnectionManager();
        this.isRunning = false;
    }
    
    async start() {
        console.log('🚀 启动综合监控系统...\n');
        
        const exchange = new ccxtpro.binance({ enableRateLimit: true });
        this.isRunning = true;
        
        // 创建多个监听器
        const watchers = [
            {
                name: 'BTC行情',
                func: exchange.watchTicker.bind(exchange),
                args: ['BTC/USDT'],
            },
            {
                name: 'ETH订单簿',
                func: exchange.watchOrderBook.bind(exchange),
                args: ['ETH/USDT', 10],
            },
            {
                name: 'BTC成交',
                func: exchange.watchTrades.bind(exchange),
                args: ['BTC/USDT'],
            },
        ];
        
        // 启动所有监听器
        const promises = watchers.map(watcher => 
            this.connectionManager.createResilientWatcher(
                exchange,
                watcher.func,
                ...watcher.args
            ).catch(error => {
                console.error(`启动 ${watcher.name} 失败:`, error.message);
            })
        );
        
        // 定期显示状态
        const statusInterval = setInterval(() => {
            if (this.isRunning) {
                this.connectionManager.displayConnectionStatus();
            } else {
                clearInterval(statusInterval);
            }
        }, 30000); // 每30秒显示一次状态
        
        // 等待所有监听器
        await Promise.all(promises);
    }
    
    stop() {
        console.log('🛑 停止综合监控系统...');
        this.isRunning = false;
        this.connectionManager.stopAllConnections();
    }
}

// 使用示例
async function comprehensiveExample() {
    const monitor = new ComprehensiveMonitor();
    
    // 启动监控
    monitor.start().catch(error => {
        console.error('监控系统错误:', error.message);
    });
    
    // 10分钟后停止(示例)
    setTimeout(() => {
        monitor.stop();
    }, 600000);
}

// comprehensiveExample();

🎯 章节总结

本章我们深入学习了 CCXT Pro 的实时数据流功能:

实时行情监听

  • 单个和多个交易对行情监控
  • 价格警报和市场情绪分析
  • 实时数据面板显示

实时订单簿

  • 深度数据监听和变化检测
  • 流动性分析和异常警报
  • 支撑阻力位识别

实时成交数据

  • 成交记录监听和统计
  • 大单检测和交易模式分析
  • 买卖力度对比

实时K线数据

  • 多时间周期K线监听
  • 技术指标实时计算
  • K线模式识别

连接管理

  • 自动重连机制
  • 错误处理和故障恢复
  • 连接状态监控

💡 实战建议

  1. 渐进式学习 - 从单一数据类型开始,逐步增加复杂性
  2. 资源管理 - 合理控制同时监听的数据流数量
  3. 错误处理 - 实施完善的重连和错误恢复机制
  4. 数据存储 - 考虑将重要数据持久化存储
  5. 性能优化 - 监控内存使用和CPU消耗

🔗 下一步

掌握了实时数据流后,接下来学习:

  1. 错误处理与最佳实践 - 处理各种异常情况
  2. 高级功能与应用 - 开发复杂的交易策略
  3. 常见问题解答 - 解决常见问题

下一章:错误处理与最佳实践

基于 VitePress 构建的现代化知识库