第五章:实时数据流 (CCXT Pro)
CCXT Pro 提供了基于 WebSocket 的实时数据流功能,相比传统的 REST API 轮询方式,WebSocket 能提供更低的延迟和更高的效率。本章将详细介绍如何使用 CCXT Pro 获取各种实时数据。
🚀 CCXT Pro 简介
CCXT Pro 特性
- ⚡ 低延迟:WebSocket 连接提供毫秒级数据更新
- 🔄 实时推送:数据变化时主动推送,无需轮询
- 📊 多种数据:实时行情、订单簿、成交记录、K线等
- 🛠️ 统一接口:与 CCXT 基础版保持一致的 API 设计
- 🔌 自动重连:内置断线重连机制
- 💾 状态管理:自动维护订单簿等数据结构
安装和配置
bash
# 安装 CCXT Pro
npm install ccxt.pro
# Python
pip install ccxtpro
javascript
// JavaScript/Node.js
const ccxtpro = require('ccxt.pro');
// 创建 Pro 交易所实例
const exchange = new ccxtpro.binance({
enableRateLimit: true,
// API密钥(某些功能需要)
apiKey: process.env.BINANCE_API_KEY,
secret: process.env.BINANCE_SECRET,
// WebSocket 特定配置
options: {
watchOrderBookLimit: 100, // 订单簿深度限制
tradesLimit: 1000, // 成交记录缓存数量
OHLCVLimit: 1000, // K线数据缓存数量
},
});
console.log('CCXT Pro 版本:', ccxtpro.__version__);
console.log('支持的交易所:', ccxtpro.exchanges);
python
# Python
import ccxtpro
import asyncio
# 创建 Pro 交易所实例
exchange = ccxtpro.binance({
'enableRateLimit': True,
'apiKey': os.getenv('BINANCE_API_KEY'),
'secret': os.getenv('BINANCE_SECRET'),
'options': {
'watchOrderBookLimit': 100,
'tradesLimit': 1000,
'OHLCVLimit': 1000,
},
})
print('CCXT Pro 版本:', ccxtpro.__version__)
print('支持的交易所:', ccxtpro.exchanges)
📈 实时行情数据
监听单个行情
javascript
class TickerWatcher {
constructor(exchange) {
this.exchange = exchange;
this.isRunning = false;
}
// 监听单个交易对行情
async watchSingleTicker(symbol) {
console.log(`📈 开始监听 ${symbol} 实时行情...\n`);
this.isRunning = true;
while (this.isRunning) {
try {
const ticker = await this.exchange.watchTicker(symbol);
// 处理行情数据
this.processTicker(ticker);
} catch (error) {
console.error(`❌ 监听 ${symbol} 行情错误:`, error.message);
// 短暂等待后重试
await new Promise(resolve => setTimeout(resolve, 5000));
}
}
}
// 处理行情数据
processTicker(ticker) {
const timestamp = new Date(ticker.timestamp).toLocaleTimeString();
const symbol = ticker.symbol;
const price = ticker.last;
const change = ticker.percentage || 0;
const volume = ticker.baseVolume || 0;
// 价格变动指示器
const changeEmoji = change >= 0 ? '🟢' : '🔴';
const priceEmoji = this.getPriceChangeEmoji(change);
// 实时显示
console.log(
`${priceEmoji} ${timestamp} | ${symbol} | ` +
`$${price} | ${change.toFixed(2)}% | ` +
`Vol: ${volume.toFixed(2)}`
);
// 价格警报检查
this.checkPriceAlerts(ticker);
}
// 获取价格变动表情
getPriceChangeEmoji(change) {
if (change >= 5) return '🚀';
if (change >= 2) return '📈';
if (change >= 0) return '🟢';
if (change >= -2) return '🔴';
if (change >= -5) return '📉';
return '💥';
}
// 价格警报检查
checkPriceAlerts(ticker) {
const alerts = [
{ symbol: 'BTC/USDT', price: 50000, condition: 'above' },
{ symbol: 'ETH/USDT', price: 3000, condition: 'below' },
];
alerts.forEach(alert => {
if (ticker.symbol === alert.symbol) {
const shouldAlert = alert.condition === 'above' ?
ticker.last >= alert.price :
ticker.last <= alert.price;
if (shouldAlert) {
console.log(`🚨 价格警报: ${ticker.symbol} ${ticker.last} (阈值: ${alert.price})`);
}
}
});
}
// 停止监听
stop() {
this.isRunning = false;
console.log('⏹️ 停止行情监听');
}
}
// 使用示例
async function watchTickerExample() {
const exchange = new ccxtpro.binance({ enableRateLimit: true });
const watcher = new TickerWatcher(exchange);
// 监听 BTC/USDT 行情
await watcher.watchSingleTicker('BTC/USDT');
}
// watchTickerExample();
监听多个行情
javascript
class MultiTickerWatcher {
constructor(exchange) {
this.exchange = exchange;
this.symbols = [];
this.isRunning = false;
this.tickerData = new Map();
}
// 添加监听的交易对
addSymbol(symbol) {
if (!this.symbols.includes(symbol)) {
this.symbols.push(symbol);
console.log(`➕ 添加监听: ${symbol}`);
}
}
// 移除监听的交易对
removeSymbol(symbol) {
const index = this.symbols.indexOf(symbol);
if (index > -1) {
this.symbols.splice(index, 1);
this.tickerData.delete(symbol);
console.log(`➖ 移除监听: ${symbol}`);
}
}
// 开始监听多个行情
async startWatching() {
if (this.symbols.length === 0) {
console.log('❌ 没有要监听的交易对');
return;
}
console.log(`📊 开始监听 ${this.symbols.length} 个交易对的行情...\n`);
this.isRunning = true;
try {
while (this.isRunning) {
// 检查交易所是否支持批量监听
if (this.exchange.has['watchTickers']) {
const tickers = await this.exchange.watchTickers(this.symbols);
this.processMultipleTickers(tickers);
} else {
// 逐个监听(某些交易所可能不支持批量)
await this.watchTickersSequentially();
}
}
} catch (error) {
console.error('❌ 监听多个行情错误:', error.message);
if (this.isRunning) {
console.log('🔄 5秒后重新连接...');
await new Promise(resolve => setTimeout(resolve, 5000));
await this.startWatching(); // 重新开始
}
}
}
// 逐个监听行情(备用方案)
async watchTickersSequentially() {
const promises = this.symbols.map(symbol =>
this.watchSingleTickerAsync(symbol)
);
await Promise.race(promises);
}
async watchSingleTickerAsync(symbol) {
try {
const ticker = await this.exchange.watchTicker(symbol);
this.tickerData.set(symbol, ticker);
this.displayTickerUpdate(ticker);
} catch (error) {
console.error(`❌ ${symbol} 行情错误: ${error.message}`);
}
}
// 处理多个行情数据
processMultipleTickers(tickers) {
const timestamp = new Date().toLocaleTimeString();
console.clear(); // 清屏(可选)
console.log(`⏰ ${timestamp} - 实时行情监控面板`);
console.log('='.repeat(80));
// 按涨跌幅排序
const sortedTickers = Object.entries(tickers)
.sort(([,a], [,b]) => (b.percentage || 0) - (a.percentage || 0));
console.log('交易对'.padEnd(12) + '价格'.padEnd(12) + '涨跌幅'.padEnd(10) +
'24h成交量'.padEnd(15) + '更新时间');
console.log('-'.repeat(80));
sortedTickers.forEach(([symbol, ticker]) => {
const price = `$${ticker.last?.toFixed(2) || 'N/A'}`.padEnd(12);
const change = `${(ticker.percentage || 0).toFixed(2)}%`.padEnd(10);
const volume = `${(ticker.baseVolume || 0).toFixed(2)}`.padEnd(15);
const time = new Date(ticker.timestamp).toLocaleTimeString();
const emoji = (ticker.percentage || 0) >= 0 ? '🟢' : '🔴';
console.log(`${emoji} ${symbol.padEnd(10)}${price}${change}${volume}${time}`);
// 更新本地缓存
this.tickerData.set(symbol, ticker);
});
console.log('='.repeat(80));
// 显示统计信息
this.displayMarketStats(tickers);
}
// 显示单个行情更新
displayTickerUpdate(ticker) {
const timestamp = new Date(ticker.timestamp).toLocaleTimeString();
const change = ticker.percentage || 0;
const emoji = change >= 0 ? '🟢' : '🔴';
console.log(
`${emoji} ${timestamp} | ${ticker.symbol} | ` +
`$${ticker.last} | ${change.toFixed(2)}%`
);
}
// 显示市场统计
displayMarketStats(tickers) {
const symbols = Object.keys(tickers);
const upCount = symbols.filter(s => (tickers[s].percentage || 0) > 0).length;
const downCount = symbols.filter(s => (tickers[s].percentage || 0) < 0).length;
const unchangedCount = symbols.length - upCount - downCount;
const avgChange = symbols.reduce((sum, s) =>
sum + (tickers[s].percentage || 0), 0) / symbols.length;
console.log(`📊 市场概况: 上涨 ${upCount} | 下跌 ${downCount} | 平盘 ${unchangedCount}`);
console.log(`📈 平均涨跌幅: ${avgChange.toFixed(2)}%`);
console.log(`🌡️ 市场情绪: ${avgChange > 0 ? '乐观 😊' : avgChange < 0 ? '悲观 😰' : '中性 😐'}\n`);
}
// 获取特定交易对的最新数据
getLatestTicker(symbol) {
return this.tickerData.get(symbol);
}
// 获取所有交易对的最新数据
getAllLatestTickers() {
return Object.fromEntries(this.tickerData);
}
// 停止监听
stop() {
this.isRunning = false;
console.log('⏹️ 停止多行情监听');
}
}
// 使用示例
async function multiTickerExample() {
const exchange = new ccxtpro.binance({ enableRateLimit: true });
const watcher = new MultiTickerWatcher(exchange);
// 添加要监听的交易对
watcher.addSymbol('BTC/USDT');
watcher.addSymbol('ETH/USDT');
watcher.addSymbol('BNB/USDT');
watcher.addSymbol('ADA/USDT');
watcher.addSymbol('DOT/USDT');
// 开始监听
await watcher.startWatching();
}
// multiTickerExample();
📖 实时订单簿
订单簿监听器
javascript
class OrderBookWatcher {
constructor(exchange) {
this.exchange = exchange;
this.isRunning = false;
this.previousOrderBook = null;
this.priceAlerts = [];
}
// 监听订单簿
async watchOrderBook(symbol, limit = 10) {
console.log(`📖 开始监听 ${symbol} 实时订单簿 (深度: ${limit})...\n`);
this.isRunning = true;
while (this.isRunning) {
try {
const orderbook = await this.exchange.watchOrderBook(symbol, limit);
// 处理订单簿数据
this.processOrderBook(orderbook);
// 检测变化
if (this.previousOrderBook) {
this.detectOrderBookChanges(this.previousOrderBook, orderbook);
}
this.previousOrderBook = this.cloneOrderBook(orderbook);
} catch (error) {
console.error(`❌ 监听订单簿错误: ${error.message}`);
await new Promise(resolve => setTimeout(resolve, 5000));
}
}
}
// 处理订单簿数据
processOrderBook(orderbook) {
const timestamp = new Date(orderbook.timestamp).toLocaleTimeString();
// 计算关键指标
const bestBid = orderbook.bids[0];
const bestAsk = orderbook.asks[0];
if (!bestBid || !bestAsk) return;
const spread = bestAsk[0] - bestBid[0];
const spreadPercent = (spread / bestBid[0]) * 100;
const midPrice = (bestBid[0] + bestAsk[0]) / 2;
// 计算深度
const bidDepth = orderbook.bids.reduce((sum, [, amount]) => sum + amount, 0);
const askDepth = orderbook.asks.reduce((sum, [, amount]) => sum + amount, 0);
const depthRatio = bidDepth / askDepth;
// 显示核心信息
console.log(`⏰ ${timestamp} | ${orderbook.symbol}`);
console.log(`💰 最优买价: $${bestBid[0].toFixed(2)} (${bestBid[1].toFixed(4)})`);
console.log(`💰 最优卖价: $${bestAsk[0].toFixed(2)} (${bestAsk[1].toFixed(4)})`);
console.log(`📏 价差: $${spread.toFixed(2)} (${spreadPercent.toFixed(4)}%)`);
console.log(`📊 中间价: $${midPrice.toFixed(2)}`);
console.log(`⚖️ 深度比: ${depthRatio.toFixed(2)} (买/卖)`);
// 显示订单簿
this.displayOrderBook(orderbook, 5);
// 检查价格警报
this.checkOrderBookAlerts(orderbook);
console.log('-'.repeat(60));
}
// 显示订单簿
displayOrderBook(orderbook, displayLimit) {
const bids = orderbook.bids.slice(0, displayLimit);
const asks = orderbook.asks.slice(0, displayLimit).reverse();
console.log('\n📈 订单簿:');
console.log(' 卖盘 (ASK) | 买盘 (BID) ');
console.log(' 价格 | 数量 | 价格 | 数量 ');
console.log('-'.repeat(50));
const maxLength = Math.max(asks.length, bids.length);
for (let i = 0; i < maxLength; i++) {
const ask = asks[i] || ['', ''];
const bid = bids[i] || ['', ''];
const askPrice = ask[0] ? `$${ask[0].toFixed(2)}` : '';
const askAmount = ask[1] ? ask[1].toFixed(4) : '';
const bidPrice = bid[0] ? `$${bid[0].toFixed(2)}` : '';
const bidAmount = bid[1] ? bid[1].toFixed(4) : '';
console.log(
`${askPrice.padStart(9)} | ${askAmount.padStart(9)} | ` +
`${bidPrice.padStart(9)} | ${bidAmount.padStart(9)}`
);
}
}
// 检测订单簿变化
detectOrderBookChanges(previous, current) {
const changes = this.getOrderBookChanges(previous, current);
if (changes.length > 0) {
console.log(`🔄 检测到 ${changes.length} 个变化:`);
changes.slice(0, 5).forEach(change => { // 只显示前5个变化
const emoji = change.side === 'bid' ? '🟢' : '🔴';
const action = change.type === 'add' ? '新增' :
change.type === 'remove' ? '移除' : '修改';
console.log(` ${emoji} ${action} ${change.side}: $${change.price} × ${change.amount}`);
});
}
}
// 获取订单簿变化
getOrderBookChanges(previous, current) {
const changes = [];
// 检查买盘变化
changes.push(...this.getSideChanges(previous.bids, current.bids, 'bid'));
// 检查卖盘变化
changes.push(...this.getSideChanges(previous.asks, current.asks, 'ask'));
return changes;
}
// 获取单边变化
getSideChanges(previousSide, currentSide, side) {
const changes = [];
const previousMap = new Map(previousSide.map(([price, amount]) => [price.toString(), amount]));
const currentMap = new Map(currentSide.map(([price, amount]) => [price.toString(), amount]));
// 检查新增和修改
for (const [priceStr, amount] of currentMap) {
const price = parseFloat(priceStr);
const previousAmount = previousMap.get(priceStr);
if (previousAmount === undefined) {
changes.push({ type: 'add', side, price, amount });
} else if (Math.abs(previousAmount - amount) > 0.0001) {
changes.push({
type: 'modify',
side,
price,
amount,
change: amount - previousAmount
});
}
}
// 检查移除
for (const [priceStr, amount] of previousMap) {
if (!currentMap.has(priceStr)) {
changes.push({
type: 'remove',
side,
price: parseFloat(priceStr),
amount
});
}
}
return changes;
}
// 检查订单簿警报
checkOrderBookAlerts(orderbook) {
const bestBid = orderbook.bids[0];
const bestAsk = orderbook.asks[0];
if (!bestBid || !bestAsk) return;
const spread = bestAsk[0] - bestBid[0];
const spreadPercent = (spread / bestBid[0]) * 100;
// 价差警报
if (spreadPercent > 0.1) {
console.log(`🚨 价差警报: ${spreadPercent.toFixed(4)}% (超过0.1%)`);
}
// 深度警报
const bidDepth = orderbook.bids.reduce((sum, [, amount]) => sum + amount, 0);
const askDepth = orderbook.asks.reduce((sum, [, amount]) => sum + amount, 0);
if (bidDepth / askDepth > 3 || askDepth / bidDepth > 3) {
console.log(`🚨 深度不平衡: 买盘 ${bidDepth.toFixed(2)} vs 卖盘 ${askDepth.toFixed(2)}`);
}
}
// 克隆订单簿
cloneOrderBook(orderbook) {
return {
symbol: orderbook.symbol,
timestamp: orderbook.timestamp,
bids: [...orderbook.bids.map(([price, amount]) => [price, amount])],
asks: [...orderbook.asks.map(([price, amount]) => [price, amount])],
};
}
// 停止监听
stop() {
this.isRunning = false;
console.log('⏹️ 停止订单簿监听');
}
}
// 使用示例
async function orderBookExample() {
const exchange = new ccxtpro.binance({ enableRateLimit: true });
const watcher = new OrderBookWatcher(exchange);
// 监听 BTC/USDT 订单簿
await watcher.watchOrderBook('BTC/USDT', 10);
}
// orderBookExample();
💱 实时成交数据
成交记录监听器
javascript
class TradesWatcher {
constructor(exchange) {
this.exchange = exchange;
this.isRunning = false;
this.tradesBuffer = [];
this.statistics = {
totalTrades: 0,
totalVolume: 0,
totalValue: 0,
buyTrades: 0,
sellTrades: 0,
lastReset: Date.now(),
};
}
// 监听实时成交
async watchTrades(symbol, limit = 100) {
console.log(`💱 开始监听 ${symbol} 实时成交记录...\n`);
this.isRunning = true;
while (this.isRunning) {
try {
const trades = await this.exchange.watchTrades(symbol, undefined, limit);
// 处理新的成交记录
this.processTrades(trades);
} catch (error) {
console.error(`❌ 监听成交记录错误: ${error.message}`);
await new Promise(resolve => setTimeout(resolve, 5000));
}
}
}
// 处理成交记录
processTrades(trades) {
if (trades.length === 0) return;
// 获取新的成交记录
const newTrades = this.getNewTrades(trades);
if (newTrades.length > 0) {
// 显示新成交
this.displayNewTrades(newTrades);
// 更新统计
this.updateStatistics(newTrades);
// 分析成交模式
this.analyzeTradingPattern(newTrades);
// 更新缓存
this.updateTradesBuffer(trades);
}
}
// 获取新的成交记录
getNewTrades(trades) {
if (this.tradesBuffer.length === 0) {
return trades.slice(-5); // 初始显示最后5笔
}
const lastTimestamp = this.tradesBuffer[this.tradesBuffer.length - 1].timestamp;
return trades.filter(trade => trade.timestamp > lastTimestamp);
}
// 显示新成交记录
displayNewTrades(newTrades) {
console.log(`🆕 新成交 (${newTrades.length} 笔):`);
newTrades.forEach(trade => {
const time = new Date(trade.timestamp).toLocaleTimeString();
const side = trade.side === 'buy' ? '买入' : '卖出';
const emoji = trade.side === 'buy' ? '🟢' : '🔴';
const amount = trade.amount.toFixed(4);
const price = trade.price.toFixed(2);
const value = (trade.amount * trade.price).toFixed(2);
// 判断大单
const isLargeTrade = trade.amount * trade.price > 10000; // 大于1万USD
const sizeEmoji = isLargeTrade ? '🐋' : '';
console.log(
` ${emoji}${sizeEmoji} ${time} | ${side} | ` +
`${amount} @ $${price} = $${value}`
);
});
console.log('');
}
// 更新统计信息
updateStatistics(newTrades) {
newTrades.forEach(trade => {
this.statistics.totalTrades++;
this.statistics.totalVolume += trade.amount;
this.statistics.totalValue += trade.amount * trade.price;
if (trade.side === 'buy') {
this.statistics.buyTrades++;
} else {
this.statistics.sellTrades++;
}
});
// 每分钟显示一次统计
const now = Date.now();
if (now - this.statistics.lastReset > 60000) {
this.displayStatistics();
this.resetStatistics();
}
}
// 显示统计信息
displayStatistics() {
const duration = (Date.now() - this.statistics.lastReset) / 1000;
const buyRatio = (this.statistics.buyTrades / this.statistics.totalTrades * 100).toFixed(1);
const avgTradeSize = (this.statistics.totalVolume / this.statistics.totalTrades).toFixed(4);
const avgTradeValue = (this.statistics.totalValue / this.statistics.totalTrades).toFixed(2);
console.log('📊 过去1分钟统计:');
console.log(`总成交: ${this.statistics.totalTrades} 笔`);
console.log(`买入: ${this.statistics.buyTrades} 笔 (${buyRatio}%)`);
console.log(`卖出: ${this.statistics.sellTrades} 笔 (${(100-buyRatio).toFixed(1)}%)`);
console.log(`总成交量: ${this.statistics.totalVolume.toFixed(4)}`);
console.log(`总成交额: $${this.statistics.totalValue.toLocaleString()}`);
console.log(`平均成交量: ${avgTradeSize}`);
console.log(`平均成交额: $${avgTradeValue}`);
console.log(`成交频率: ${(this.statistics.totalTrades / duration * 60).toFixed(1)} 笔/分钟`);
console.log('-'.repeat(50));
}
// 重置统计
resetStatistics() {
this.statistics = {
totalTrades: 0,
totalVolume: 0,
totalValue: 0,
buyTrades: 0,
sellTrades: 0,
lastReset: Date.now(),
};
}
// 分析交易模式
analyzeTradingPattern(newTrades) {
if (newTrades.length < 3) return;
// 检测连续大单
const largeTrades = newTrades.filter(trade =>
trade.amount * trade.price > 10000
);
if (largeTrades.length >= 3) {
const direction = largeTrades.every(t => t.side === 'buy') ? '买入' :
largeTrades.every(t => t.side === 'sell') ? '卖出' : '混合';
console.log(`🐋 检测到连续大单: ${largeTrades.length} 笔 (${direction})`);
}
// 检测价格异动
if (newTrades.length >= 2) {
const prices = newTrades.map(t => t.price);
const minPrice = Math.min(...prices);
const maxPrice = Math.max(...prices);
const priceRange = ((maxPrice - minPrice) / minPrice * 100);
if (priceRange > 0.5) { // 0.5% 以上的价格波动
console.log(`⚡ 检测到价格异动: ${priceRange.toFixed(2)}% (${minPrice.toFixed(2)} - ${maxPrice.toFixed(2)})`);
}
}
}
// 更新成交缓存
updateTradesBuffer(trades) {
this.tradesBuffer = [...trades];
// 限制缓存大小
if (this.tradesBuffer.length > 1000) {
this.tradesBuffer = this.tradesBuffer.slice(-1000);
}
}
// 获取最近成交统计
getRecentTradeStats(minutes = 5) {
const cutoffTime = Date.now() - (minutes * 60 * 1000);
const recentTrades = this.tradesBuffer.filter(trade =>
trade.timestamp > cutoffTime
);
if (recentTrades.length === 0) return null;
const buyTrades = recentTrades.filter(t => t.side === 'buy').length;
const sellTrades = recentTrades.length - buyTrades;
const totalVolume = recentTrades.reduce((sum, t) => sum + t.amount, 0);
const totalValue = recentTrades.reduce((sum, t) => sum + (t.amount * t.price), 0);
return {
period: `${minutes} 分钟`,
totalTrades: recentTrades.length,
buyTrades,
sellTrades,
buyRatio: (buyTrades / recentTrades.length * 100).toFixed(1),
totalVolume: totalVolume.toFixed(4),
totalValue: totalValue.toFixed(2),
avgPrice: (totalValue / totalVolume).toFixed(2),
};
}
// 停止监听
stop() {
this.isRunning = false;
console.log('⏹️ 停止成交记录监听');
}
}
// 使用示例
async function tradesExample() {
const exchange = new ccxtpro.binance({ enableRateLimit: true });
const watcher = new TradesWatcher(exchange);
// 监听 ETH/USDT 成交记录
await watcher.watchTrades('ETH/USDT', 100);
}
// tradesExample();
📊 实时K线数据
K线数据监听器
javascript
class OHLCVWatcher {
constructor(exchange) {
this.exchange = exchange;
this.isRunning = false;
this.candleBuffer = new Map(); // 按时间周期存储
this.indicators = new Map(); // 技术指标缓存
}
// 监听K线数据
async watchOHLCV(symbol, timeframe = '1m', limit = 100) {
console.log(`📊 开始监听 ${symbol} ${timeframe} K线数据...\n`);
if (!this.exchange.has['watchOHLCV']) {
throw new Error('该交易所不支持实时K线数据');
}
this.isRunning = true;
while (this.isRunning) {
try {
const ohlcv = await this.exchange.watchOHLCV(symbol, timeframe, undefined, limit);
// 处理K线数据
this.processOHLCV(ohlcv, symbol, timeframe);
} catch (error) {
console.error(`❌ 监听K线数据错误: ${error.message}`);
await new Promise(resolve => setTimeout(resolve, 5000));
}
}
}
// 处理K线数据
processOHLCV(ohlcv, symbol, timeframe) {
if (ohlcv.length === 0) return;
const key = `${symbol}:${timeframe}`;
const previousData = this.candleBuffer.get(key) || [];
// 检查是否有新的K线
const newCandles = this.getNewCandles(previousData, ohlcv);
if (newCandles.length > 0) {
// 显示新K线
this.displayNewCandles(newCandles, symbol, timeframe);
// 计算技术指标
this.calculateIndicators(ohlcv, symbol, timeframe);
// 分析K线模式
this.analyzeCandles(ohlcv, symbol, timeframe);
}
// 更新缓存
this.candleBuffer.set(key, [...ohlcv]);
}
// 获取新K线
getNewCandles(previousData, currentData) {
if (previousData.length === 0) {
return currentData.slice(-3); // 初始显示最后3根
}
const lastTimestamp = previousData[previousData.length - 1][0];
return currentData.filter(candle => candle[0] > lastTimestamp);
}
// 显示新K线
displayNewCandles(newCandles, symbol, timeframe) {
console.log(`🕯️ ${symbol} ${timeframe} 新K线:`);
newCandles.forEach(candle => {
const [timestamp, open, high, low, close, volume] = candle;
const time = new Date(timestamp).toLocaleString();
const change = ((close - open) / open * 100);
const emoji = change >= 0 ? '🟢' : '🔴';
// 判断K线大小
const bodySize = Math.abs(close - open);
const shadowSize = high - low;
const bodyRatio = bodySize / shadowSize;
let patternEmoji = '';
if (bodyRatio > 0.8) patternEmoji = '📏'; // 长实体
else if (bodyRatio < 0.2) patternEmoji = '🕯️'; // 十字星
console.log(
` ${emoji}${patternEmoji} ${time} | ` +
`开: $${open.toFixed(2)} | 高: $${high.toFixed(2)} | ` +
`低: $${low.toFixed(2)} | 收: $${close.toFixed(2)} | ` +
`量: ${volume.toFixed(2)} | 涨跌: ${change.toFixed(2)}%`
);
});
console.log('');
}
// 计算技术指标
calculateIndicators(ohlcv, symbol, timeframe) {
if (ohlcv.length < 20) return; // 需要足够的数据
const key = `${symbol}:${timeframe}`;
const closes = ohlcv.map(candle => candle[4]);
// 计算简单移动平均线
const sma5 = this.calculateSMA(closes, 5);
const sma10 = this.calculateSMA(closes, 10);
const sma20 = this.calculateSMA(closes, 20);
// 计算RSI
const rsi = this.calculateRSI(closes, 14);
// 计算MACD
const macd = this.calculateMACD(closes);
const indicators = {
sma5: sma5[sma5.length - 1],
sma10: sma10[sma10.length - 1],
sma20: sma20[sma20.length - 1],
rsi: rsi[rsi.length - 1],
macd: macd,
timestamp: Date.now(),
};
this.indicators.set(key, indicators);
// 显示技术指标
this.displayIndicators(indicators, symbol, timeframe);
}
// 显示技术指标
displayIndicators(indicators, symbol, timeframe) {
console.log(`📈 ${symbol} ${timeframe} 技术指标:`);
console.log(`SMA5: $${indicators.sma5?.toFixed(2) || 'N/A'}`);
console.log(`SMA10: $${indicators.sma10?.toFixed(2) || 'N/A'}`);
console.log(`SMA20: $${indicators.sma20?.toFixed(2) || 'N/A'}`);
console.log(`RSI: ${indicators.rsi?.toFixed(2) || 'N/A'}`);
if (indicators.macd) {
console.log(`MACD: ${indicators.macd.macd?.toFixed(4) || 'N/A'}`);
console.log(`信号线: ${indicators.macd.signal?.toFixed(4) || 'N/A'}`);
console.log(`柱状图: ${indicators.macd.histogram?.toFixed(4) || 'N/A'}`);
}
// 简单信号判断
this.generateSignals(indicators);
console.log('-'.repeat(50));
}
// 生成交易信号
generateSignals(indicators) {
const signals = [];
// RSI信号
if (indicators.rsi) {
if (indicators.rsi > 80) {
signals.push('🔴 RSI超买 (>80)');
} else if (indicators.rsi < 20) {
signals.push('🟢 RSI超卖 (<20)');
}
}
// SMA信号
if (indicators.sma5 && indicators.sma10 && indicators.sma20) {
if (indicators.sma5 > indicators.sma10 && indicators.sma10 > indicators.sma20) {
signals.push('📈 短期上升趋势');
} else if (indicators.sma5 < indicators.sma10 && indicators.sma10 < indicators.sma20) {
signals.push('📉 短期下降趋势');
}
}
// MACD信号
if (indicators.macd && indicators.macd.histogram) {
if (indicators.macd.histogram > 0) {
signals.push('🟢 MACD多头排列');
} else {
signals.push('🔴 MACD空头排列');
}
}
if (signals.length > 0) {
console.log('🎯 交易信号:');
signals.forEach(signal => console.log(` ${signal}`));
}
}
// 分析K线模式
analyzeCandles(ohlcv, symbol, timeframe) {
if (ohlcv.length < 3) return;
const recent = ohlcv.slice(-3); // 最近3根K线
const patterns = [];
// 检测锤子线
const lastCandle = recent[recent.length - 1];
const [, open, high, low, close] = lastCandle;
const bodySize = Math.abs(close - open);
const upperShadow = high - Math.max(open, close);
const lowerShadow = Math.min(open, close) - low;
if (lowerShadow > bodySize * 2 && upperShadow < bodySize * 0.5) {
patterns.push('🔨 锤子线');
}
// 检测吞噬模式
if (recent.length >= 2) {
const prev = recent[recent.length - 2];
const curr = recent[recent.length - 1];
if (prev[4] < prev[1] && curr[4] > curr[1] &&
curr[1] < prev[4] && curr[4] > prev[1]) {
patterns.push('🐻➡️🐂 看涨吞噬');
}
}
// 检测三连阳/阴
if (recent.length >= 3) {
const allUp = recent.every(candle => candle[4] > candle[1]);
const allDown = recent.every(candle => candle[4] < candle[1]);
if (allUp) patterns.push('📈 三连阳');
if (allDown) patterns.push('📉 三连阴');
}
if (patterns.length > 0) {
console.log('🕯️ K线模式:');
patterns.forEach(pattern => console.log(` ${pattern}`));
}
}
// 简单移动平均线
calculateSMA(data, period) {
const sma = [];
for (let i = period - 1; i < data.length; i++) {
const sum = data.slice(i - period + 1, i + 1).reduce((a, b) => a + b, 0);
sma.push(sum / period);
}
return sma;
}
// RSI计算
calculateRSI(closes, period = 14) {
const rsi = [];
const changes = [];
for (let i = 1; i < closes.length; i++) {
changes.push(closes[i] - closes[i - 1]);
}
for (let i = period; i < changes.length; i++) {
const recentChanges = changes.slice(i - period, i);
const gains = recentChanges.filter(change => change > 0);
const losses = recentChanges.filter(change => change < 0).map(Math.abs);
const avgGain = gains.reduce((a, b) => a + b, 0) / period;
const avgLoss = losses.reduce((a, b) => a + b, 0) / period;
if (avgLoss === 0) {
rsi.push(100);
} else {
const rs = avgGain / avgLoss;
rsi.push(100 - (100 / (1 + rs)));
}
}
return rsi;
}
// MACD计算(简化版)
calculateMACD(closes) {
if (closes.length < 26) return null;
const ema12 = this.calculateEMA(closes, 12);
const ema26 = this.calculateEMA(closes, 26);
if (ema12.length === 0 || ema26.length === 0) return null;
const macdLine = ema12[ema12.length - 1] - ema26[ema26.length - 1];
return {
macd: macdLine,
signal: null, // 简化版本
histogram: null,
};
}
// EMA计算
calculateEMA(data, period) {
const ema = [];
const multiplier = 2 / (period + 1);
ema[0] = data[0];
for (let i = 1; i < data.length; i++) {
ema[i] = (data[i] * multiplier) + (ema[i - 1] * (1 - multiplier));
}
return ema;
}
// 获取指标数据
getIndicators(symbol, timeframe) {
const key = `${symbol}:${timeframe}`;
return this.indicators.get(key);
}
// 停止监听
stop() {
this.isRunning = false;
console.log('⏹️ 停止K线数据监听');
}
}
// 使用示例
async function ohlcvExample() {
const exchange = new ccxtpro.binance({ enableRateLimit: true });
const watcher = new OHLCVWatcher(exchange);
// 监听 BTC/USDT 1分钟K线
await watcher.watchOHLCV('BTC/USDT', '1m', 50);
}
// ohlcvExample();
🔄 连接管理与错误处理
连接管理器
javascript
class ConnectionManager {
constructor() {
this.connections = new Map();
this.reconnectAttempts = new Map();
this.maxReconnectAttempts = 5;
this.reconnectDelay = 5000;
}
// 创建带重连功能的监听器
async createResilientWatcher(exchange, watchFunction, ...args) {
const key = `${exchange.id}_${watchFunction.name}_${args.join('_')}`;
console.log(`🔌 创建弹性连接: ${key}`);
this.connections.set(key, {
exchange,
watchFunction,
args,
isActive: true,
lastError: null,
});
this.reconnectAttempts.set(key, 0);
return this.startWatching(key);
}
async startWatching(key) {
const connection = this.connections.get(key);
if (!connection || !connection.isActive) return;
try {
console.log(`📡 开始监听: ${key}`);
while (connection.isActive) {
try {
const result = await connection.watchFunction.call(
connection.exchange,
...connection.args
);
// 重置重连计数
this.reconnectAttempts.set(key, 0);
// 这里可以添加数据处理逻辑
this.handleData(key, result);
} catch (error) {
console.error(`❌ 连接 ${key} 错误:`, error.message);
connection.lastError = error;
if (connection.isActive) {
await this.handleReconnect(key);
}
}
}
} catch (error) {
console.error(`❌ 严重错误 ${key}:`, error.message);
await this.handleReconnect(key);
}
}
async handleReconnect(key) {
const attempts = this.reconnectAttempts.get(key) || 0;
if (attempts >= this.maxReconnectAttempts) {
console.error(`💀 连接 ${key} 超过最大重连次数,停止重连`);
this.stopConnection(key);
return;
}
this.reconnectAttempts.set(key, attempts + 1);
const delay = this.reconnectDelay * Math.pow(2, attempts); // 指数退避
console.log(`🔄 ${attempts + 1}/${this.maxReconnectAttempts} 次重连 ${key},${delay/1000}秒后重试...`);
await new Promise(resolve => setTimeout(resolve, delay));
// 重新开始监听
if (this.connections.get(key)?.isActive) {
this.startWatching(key);
}
}
handleData(key, data) {
// 这里可以根据不同的监听类型处理数据
// 实际应用中可以通过回调函数或事件来处理
console.log(`📊 收到 ${key} 数据,长度: ${Array.isArray(data) ? data.length : 'N/A'}`);
}
// 停止特定连接
stopConnection(key) {
const connection = this.connections.get(key);
if (connection) {
connection.isActive = false;
console.log(`⏹️ 停止连接: ${key}`);
}
}
// 停止所有连接
stopAllConnections() {
console.log('⏹️ 停止所有连接...');
for (const [key, connection] of this.connections) {
connection.isActive = false;
}
this.connections.clear();
this.reconnectAttempts.clear();
}
// 获取连接状态
getConnectionStatus() {
const status = [];
for (const [key, connection] of this.connections) {
status.push({
key,
isActive: connection.isActive,
reconnectAttempts: this.reconnectAttempts.get(key) || 0,
lastError: connection.lastError?.message || null,
});
}
return status;
}
// 显示连接状态
displayConnectionStatus() {
const status = this.getConnectionStatus();
console.log('\n📊 连接状态报告:');
console.log('='.repeat(60));
status.forEach(conn => {
const statusEmoji = conn.isActive ? '🟢' : '🔴';
const errorInfo = conn.lastError ? ` (错误: ${conn.lastError})` : '';
console.log(
`${statusEmoji} ${conn.key} | ` +
`重连次数: ${conn.reconnectAttempts}${errorInfo}`
);
});
console.log('='.repeat(60));
}
}
// 综合监控示例
class ComprehensiveMonitor {
constructor() {
this.connectionManager = new ConnectionManager();
this.isRunning = false;
}
async start() {
console.log('🚀 启动综合监控系统...\n');
const exchange = new ccxtpro.binance({ enableRateLimit: true });
this.isRunning = true;
// 创建多个监听器
const watchers = [
{
name: 'BTC行情',
func: exchange.watchTicker.bind(exchange),
args: ['BTC/USDT'],
},
{
name: 'ETH订单簿',
func: exchange.watchOrderBook.bind(exchange),
args: ['ETH/USDT', 10],
},
{
name: 'BTC成交',
func: exchange.watchTrades.bind(exchange),
args: ['BTC/USDT'],
},
];
// 启动所有监听器
const promises = watchers.map(watcher =>
this.connectionManager.createResilientWatcher(
exchange,
watcher.func,
...watcher.args
).catch(error => {
console.error(`启动 ${watcher.name} 失败:`, error.message);
})
);
// 定期显示状态
const statusInterval = setInterval(() => {
if (this.isRunning) {
this.connectionManager.displayConnectionStatus();
} else {
clearInterval(statusInterval);
}
}, 30000); // 每30秒显示一次状态
// 等待所有监听器
await Promise.all(promises);
}
stop() {
console.log('🛑 停止综合监控系统...');
this.isRunning = false;
this.connectionManager.stopAllConnections();
}
}
// 使用示例
async function comprehensiveExample() {
const monitor = new ComprehensiveMonitor();
// 启动监控
monitor.start().catch(error => {
console.error('监控系统错误:', error.message);
});
// 10分钟后停止(示例)
setTimeout(() => {
monitor.stop();
}, 600000);
}
// comprehensiveExample();
🎯 章节总结
本章我们深入学习了 CCXT Pro 的实时数据流功能:
✅ 实时行情监听
- 单个和多个交易对行情监控
- 价格警报和市场情绪分析
- 实时数据面板显示
✅ 实时订单簿
- 深度数据监听和变化检测
- 流动性分析和异常警报
- 支撑阻力位识别
✅ 实时成交数据
- 成交记录监听和统计
- 大单检测和交易模式分析
- 买卖力度对比
✅ 实时K线数据
- 多时间周期K线监听
- 技术指标实时计算
- K线模式识别
✅ 连接管理
- 自动重连机制
- 错误处理和故障恢复
- 连接状态监控
💡 实战建议
- 渐进式学习 - 从单一数据类型开始,逐步增加复杂性
- 资源管理 - 合理控制同时监听的数据流数量
- 错误处理 - 实施完善的重连和错误恢复机制
- 数据存储 - 考虑将重要数据持久化存储
- 性能优化 - 监控内存使用和CPU消耗
🔗 下一步
掌握了实时数据流后,接下来学习:
下一章:错误处理与最佳实践