第六章:错误处理与最佳实践
在生产环境中使用 CCXT 时,完善的错误处理机制是确保系统稳定性的关键。本章将详细介绍各种错误类型、处理方法和生产环境的最佳实践。
🛡️ CCXT 错误类型
异常层次结构
javascript
const ccxt = require('ccxt');
// CCXT 错误继承关系
// BaseError
// ├── ExchangeError
// │ ├── AuthenticationError
// │ ├── PermissionDenied
// │ ├── AccountSuspended
// │ ├── InsufficientFunds
// │ ├── InvalidOrder
// │ ├── OrderNotFound
// │ ├── OrderNotCached
// │ ├── CancelPending
// │ ├── OrderImmediatelyFillable
// │ ├── OrderNotFillable
// │ ├── DuplicateOrderId
// │ ├── NotSupported
// │ ├── BadSymbol
// │ ├── BadRequest
// │ ├── ExchangeNotAvailable
// │ ├── OnMaintenance
// │ └── RateLimitExceeded
// └── NetworkError
// ├── DDoSProtection
// ├── RequestTimeout
// └── ExchangeNotAvailable
console.log('CCXT 错误类型:', Object.keys(ccxt).filter(key =>
key.endsWith('Error') || key.endsWith('Exception')
));
错误处理框架
javascript
class ErrorHandler {
constructor() {
this.errorCounts = new Map();
this.lastErrors = new Map();
this.circuitBreakers = new Map();
}
// 通用错误处理器
async handleError(error, context = {}) {
const errorType = error.constructor.name;
const errorKey = `${context.exchange || 'unknown'}_${errorType}`;
// 记录错误统计
this.recordError(errorKey, error);
// 根据错误类型决定处理策略
const strategy = this.getErrorStrategy(error, context);
console.error(`❌ ${errorType}: ${error.message}`);
console.log(`🔧 处理策略: ${strategy.action}`);
return strategy;
}
// 获取错误处理策略
getErrorStrategy(error, context) {
if (error instanceof ccxt.NetworkError) {
return this.handleNetworkError(error, context);
} else if (error instanceof ccxt.ExchangeError) {
return this.handleExchangeError(error, context);
} else {
return { action: 'log', retryable: false, delay: 0 };
}
}
// 网络错误处理
handleNetworkError(error, context) {
if (error instanceof ccxt.RequestTimeout) {
return {
action: 'retry_with_backoff',
retryable: true,
delay: 5000,
maxRetries: 3,
message: '请求超时,将使用指数退避重试'
};
} else if (error instanceof ccxt.DDoSProtection) {
return {
action: 'wait_and_retry',
retryable: true,
delay: 60000, // 1分钟
maxRetries: 5,
message: 'DDoS保护触发,等待后重试'
};
} else if (error instanceof ccxt.ExchangeNotAvailable) {
return {
action: 'circuit_breaker',
retryable: true,
delay: 300000, // 5分钟
maxRetries: 10,
message: '交易所不可用,启用熔断器'
};
}
return {
action: 'retry',
retryable: true,
delay: 10000,
maxRetries: 3,
message: '网络错误,短暂等待后重试'
};
}
// 交易所错误处理
handleExchangeError(error, context) {
if (error instanceof ccxt.AuthenticationError) {
return {
action: 'stop',
retryable: false,
delay: 0,
message: '认证失败,请检查API密钥配置'
};
} else if (error instanceof ccxt.PermissionDenied) {
return {
action: 'stop',
retryable: false,
delay: 0,
message: '权限不足,请检查API权限设置'
};
} else if (error instanceof ccxt.InsufficientFunds) {
return {
action: 'notify',
retryable: false,
delay: 0,
message: '余额不足,无法执行交易'
};
} else if (error instanceof ccxt.RateLimitExceeded) {
return {
action: 'wait_and_retry',
retryable: true,
delay: this.calculateRateLimitDelay(error),
maxRetries: 5,
message: '触发频率限制,等待后重试'
};
} else if (error instanceof ccxt.OrderNotFound) {
return {
action: 'ignore',
retryable: false,
delay: 0,
message: '订单未找到,可能已被处理'
};
} else if (error instanceof ccxt.InvalidOrder) {
return {
action: 'fix_and_retry',
retryable: true,
delay: 1000,
maxRetries: 1,
message: '订单参数无效,尝试修正后重试'
};
} else if (error instanceof ccxt.OnMaintenance) {
return {
action: 'wait_maintenance',
retryable: true,
delay: 1800000, // 30分钟
maxRetries: Infinity,
message: '交易所维护中,等待维护结束'
};
}
return {
action: 'log',
retryable: false,
delay: 0,
message: '未知交易所错误'
};
}
// 计算频率限制延迟
calculateRateLimitDelay(error) {
// 尝试从错误消息中提取延迟时间
const message = error.message.toLowerCase();
if (message.includes('retry after')) {
const match = message.match(/retry after (\d+)/);
if (match) {
return parseInt(match[1]) * 1000; // 转换为毫秒
}
}
// 默认延迟
return 60000; // 1分钟
}
// 记录错误统计
recordError(errorKey, error) {
const count = this.errorCounts.get(errorKey) || 0;
this.errorCounts.set(errorKey, count + 1);
this.lastErrors.set(errorKey, {
error: error.message,
timestamp: Date.now(),
});
}
// 获取错误统计
getErrorStats() {
const stats = {};
for (const [key, count] of this.errorCounts) {
const lastError = this.lastErrors.get(key);
stats[key] = {
count,
lastError: lastError?.error,
lastTime: lastError?.timestamp ? new Date(lastError.timestamp).toLocaleString() : null,
};
}
return stats;
}
// 清理错误统计
clearErrorStats() {
this.errorCounts.clear();
this.lastErrors.clear();
console.log('🧹 错误统计已清空');
}
}
// 使用示例
const errorHandler = new ErrorHandler();
async function safeApiCall(apiFunction, context = {}) {
let retries = 0;
const maxRetries = 3;
while (retries <= maxRetries) {
try {
return await apiFunction();
} catch (error) {
const strategy = await errorHandler.handleError(error, context);
if (!strategy.retryable || retries >= (strategy.maxRetries || maxRetries)) {
console.error('🚫 达到最大重试次数或不可重试错误');
throw error;
}
retries++;
console.log(`🔄 第 ${retries} 次重试,${strategy.delay}ms 后执行...`);
if (strategy.delay > 0) {
await new Promise(resolve => setTimeout(resolve, strategy.delay));
}
}
}
}
🔄 重试机制实现
智能重试器
javascript
class SmartRetrier {
constructor(options = {}) {
this.maxRetries = options.maxRetries || 3;
this.baseDelay = options.baseDelay || 1000;
this.maxDelay = options.maxDelay || 30000;
this.backoffFactor = options.backoffFactor || 2;
this.jitter = options.jitter || true;
}
// 执行带重试的操作
async execute(operation, context = {}) {
let lastError;
for (let attempt = 0; attempt <= this.maxRetries; attempt++) {
try {
const result = await operation();
if (attempt > 0) {
console.log(`✅ 第 ${attempt + 1} 次尝试成功`);
}
return result;
} catch (error) {
lastError = error;
if (attempt === this.maxRetries) {
console.error(`💀 达到最大重试次数 (${this.maxRetries + 1})`);
break;
}
// 检查是否应该重试
if (!this.shouldRetry(error, attempt)) {
console.error('🚫 错误不可重试');
break;
}
// 计算延迟时间
const delay = this.calculateDelay(attempt, error);
console.log(
`⏳ 第 ${attempt + 1} 次尝试失败: ${error.message}\n` +
`🔄 ${delay}ms 后进行第 ${attempt + 2} 次尝试...`
);
await this.sleep(delay);
}
}
throw lastError;
}
// 判断是否应该重试
shouldRetry(error, attempt) {
// 不可重试的错误类型
const nonRetryableErrors = [
ccxt.AuthenticationError,
ccxt.PermissionDenied,
ccxt.AccountSuspended,
ccxt.BadSymbol,
ccxt.BadRequest,
];
for (const ErrorType of nonRetryableErrors) {
if (error instanceof ErrorType) {
return false;
}
}
// 特殊处理
if (error instanceof ccxt.InsufficientFunds) {
return false; // 余额不足通常不应该重试
}
if (error instanceof ccxt.InvalidOrder) {
return attempt < 1; // 订单错误只重试一次
}
return true;
}
// 计算延迟时间
calculateDelay(attempt, error) {
let delay = this.baseDelay * Math.pow(this.backoffFactor, attempt);
// 频率限制特殊处理
if (error instanceof ccxt.RateLimitExceeded) {
delay = Math.max(delay, 60000); // 至少等待1分钟
}
// 限制最大延迟
delay = Math.min(delay, this.maxDelay);
// 添加随机抖动避免惊群效应
if (this.jitter) {
delay = delay * (0.5 + Math.random() * 0.5);
}
return Math.floor(delay);
}
// 睡眠函数
async sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
// 使用示例
async function retryExample() {
const exchange = new ccxt.binance({
apiKey: process.env.BINANCE_API_KEY,
secret: process.env.BINANCE_SECRET,
enableRateLimit: true,
});
const retrier = new SmartRetrier({
maxRetries: 5,
baseDelay: 2000,
maxDelay: 60000,
backoffFactor: 2,
});
try {
// 重试获取余额
const balance = await retrier.execute(async () => {
return await exchange.fetchBalance();
});
console.log('✅ 余额获取成功:', Object.keys(balance).length);
} catch (error) {
console.error('❌ 最终失败:', error.message);
}
}
// retryExample();
⚡ 限速管理
高级限速管理器
javascript
class AdvancedRateLimiter {
constructor(exchange) {
this.exchange = exchange;
this.requestTimes = [];
this.burstRequests = [];
this.isLimited = false;
this.adaptiveDelay = exchange.rateLimit || 1000;
this.consecutiveErrors = 0;
this.lastErrorTime = null;
}
// 智能延迟计算
async smartDelay() {
const now = Date.now();
// 清理过期的请求记录
this.cleanupOldRequests(now);
// 计算当前请求频率
const currentRate = this.calculateCurrentRate(now);
// 动态调整延迟
const dynamicDelay = this.calculateDynamicDelay(currentRate, now);
if (dynamicDelay > 0) {
console.log(`⏳ 智能限速: ${dynamicDelay}ms`);
await this.sleep(dynamicDelay);
}
// 记录请求时间
this.requestTimes.push(now);
}
// 清理过期请求记录
cleanupOldRequests(now) {
const timeWindow = 60000; // 1分钟窗口
this.requestTimes = this.requestTimes.filter(time =>
now - time < timeWindow
);
this.burstRequests = this.burstRequests.filter(time =>
now - time < 10000 // 10秒窗口
);
}
// 计算当前请求频率
calculateCurrentRate(now) {
const recentRequests = this.requestTimes.filter(time =>
now - time < 60000 // 过去1分钟
);
return recentRequests.length;
}
// 计算动态延迟
calculateDynamicDelay(currentRate, now) {
let delay = this.adaptiveDelay;
// 基于连续错误调整
if (this.consecutiveErrors > 0) {
const errorPenalty = Math.min(this.consecutiveErrors * 1000, 30000);
delay = Math.max(delay, errorPenalty);
}
// 基于请求频率调整
if (currentRate > 50) { // 每分钟超过50次请求
delay = delay * 2;
} else if (currentRate > 30) {
delay = delay * 1.5;
}
// 突发请求检测
const burstWindow = 10000; // 10秒
const recentBurst = this.burstRequests.filter(time =>
now - time < burstWindow
).length;
if (recentBurst > 10) { // 10秒内超过10次请求
delay = delay * 3;
this.burstRequests.push(now);
}
// 限制最大延迟
return Math.min(delay, 60000); // 最多1分钟
}
// 处理限速错误
async handleRateLimitError(error) {
this.consecutiveErrors++;
this.lastErrorTime = Date.now();
this.isLimited = true;
console.warn(`🚨 触发限速 (连续错误: ${this.consecutiveErrors})`);
// 从错误消息中提取等待时间
let waitTime = this.extractWaitTimeFromError(error);
if (!waitTime) {
// 使用指数退避
waitTime = Math.min(1000 * Math.pow(2, this.consecutiveErrors), 300000);
}
console.log(`⏰ 等待 ${waitTime}ms 后恢复...`);
await this.sleep(waitTime);
this.isLimited = false;
}
// 从错误消息提取等待时间
extractWaitTimeFromError(error) {
const message = error.message.toLowerCase();
// 匹配不同格式的等待时间
const patterns = [
/retry after (\d+) seconds?/,
/wait (\d+) seconds?/,
/try again in (\d+) seconds?/,
/rate limit.*?(\d+)\s*seconds?/,
];
for (const pattern of patterns) {
const match = message.match(pattern);
if (match) {
return parseInt(match[1]) * 1000; // 转换为毫秒
}
}
return null;
}
// 请求成功回调
onRequestSuccess() {
// 重置连续错误计数
if (this.consecutiveErrors > 0) {
this.consecutiveErrors = Math.max(0, this.consecutiveErrors - 1);
}
// 动态调整基础延迟
if (this.consecutiveErrors === 0 && this.adaptiveDelay > this.exchange.rateLimit) {
this.adaptiveDelay = Math.max(
this.exchange.rateLimit,
this.adaptiveDelay * 0.9
);
}
}
// 包装API调用
async wrapApiCall(apiFunction, ...args) {
if (this.isLimited) {
throw new Error('限速保护激活,请稍后重试');
}
await this.smartDelay();
try {
const result = await apiFunction.apply(this.exchange, args);
this.onRequestSuccess();
return result;
} catch (error) {
if (error instanceof ccxt.RateLimitExceeded) {
await this.handleRateLimitError(error);
throw error;
} else {
throw error;
}
}
}
// 获取限速统计
getRateLimitStats() {
const now = Date.now();
const recentRequests = this.requestTimes.filter(time =>
now - time < 60000
);
return {
requestsLastMinute: recentRequests.length,
consecutiveErrors: this.consecutiveErrors,
currentDelay: this.adaptiveDelay,
isLimited: this.isLimited,
lastErrorTime: this.lastErrorTime ?
new Date(this.lastErrorTime).toLocaleString() : null,
};
}
async sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
// 使用示例
async function rateLimitExample() {
const exchange = new ccxt.binance({
apiKey: process.env.BINANCE_API_KEY,
secret: process.env.BINANCE_SECRET,
enableRateLimit: true,
});
const rateLimiter = new AdvancedRateLimiter(exchange);
// 包装常用API调用
const safeFetchTicker = (...args) =>
rateLimiter.wrapApiCall(exchange.fetchTicker, ...args);
const safeFetchBalance = (...args) =>
rateLimiter.wrapApiCall(exchange.fetchBalance, ...args);
try {
// 批量获取行情(会自动限速)
const symbols = ['BTC/USDT', 'ETH/USDT', 'BNB/USDT'];
for (const symbol of symbols) {
const ticker = await safeFetchTicker(symbol);
console.log(`${symbol}: $${ticker.last}`);
// 显示限速统计
const stats = rateLimiter.getRateLimitStats();
console.log(`限速状态: ${stats.requestsLastMinute} 请求/分钟`);
}
} catch (error) {
console.error('❌ 限速示例失败:', error.message);
}
}
// rateLimitExample();
🔧 生产环境配置
生产级交易所管理器
javascript
class ProductionExchangeManager {
constructor(config) {
this.config = config;
this.exchanges = new Map();
this.healthChecks = new Map();
this.monitors = new Map();
this.alerts = [];
this.initializeExchanges();
this.startHealthMonitoring();
}
// 初始化交易所
initializeExchanges() {
for (const exchangeConfig of this.config.exchanges) {
try {
const exchange = this.createExchange(exchangeConfig);
this.exchanges.set(exchangeConfig.id, exchange);
console.log(`✅ 交易所 ${exchangeConfig.id} 初始化成功`);
} catch (error) {
console.error(`❌ 交易所 ${exchangeConfig.id} 初始化失败:`, error.message);
}
}
}
// 创建交易所实例
createExchange(config) {
const ExchangeClass = ccxt[config.name];
if (!ExchangeClass) {
throw new Error(`不支持的交易所: ${config.name}`);
}
return new ExchangeClass({
...config.options,
apiKey: process.env[config.apiKeyEnv],
secret: process.env[config.secretEnv],
// 生产环境配置
enableRateLimit: true,
timeout: 30000,
// 安全配置
sandbox: config.sandbox || false,
// 自定义配置
options: {
adjustForTimeDifference: true,
recvWindow: 5000,
...config.exchangeOptions,
},
});
}
// 健康检查
async healthCheck(exchangeId) {
const exchange = this.exchanges.get(exchangeId);
if (!exchange) {
return { status: 'error', message: '交易所未找到' };
}
try {
// 基础连接测试
const serverTime = await exchange.fetchTime();
const timeDiff = Math.abs(Date.now() - serverTime);
// 市场数据测试
await exchange.loadMarkets();
// API权限测试(如果有API密钥)
let balanceStatus = 'skip';
if (exchange.apiKey) {
try {
await exchange.fetchBalance();
balanceStatus = 'ok';
} catch (error) {
if (error instanceof ccxt.AuthenticationError) {
balanceStatus = 'auth_error';
} else {
balanceStatus = 'error';
}
}
}
const health = {
status: 'healthy',
timestamp: Date.now(),
serverTimeDiff: timeDiff,
marketCount: exchange.symbols.length,
balanceCheck: balanceStatus,
lastError: null,
};
this.healthChecks.set(exchangeId, health);
return health;
} catch (error) {
const health = {
status: 'unhealthy',
timestamp: Date.now(),
error: error.message,
errorType: error.constructor.name,
};
this.healthChecks.set(exchangeId, health);
this.sendAlert('health_check_failed', {
exchangeId,
error: error.message,
});
return health;
}
}
// 启动健康监控
startHealthMonitoring() {
console.log('🔍 启动健康监控...');
// 每5分钟进行一次健康检查
setInterval(async () => {
for (const exchangeId of this.exchanges.keys()) {
await this.healthCheck(exchangeId);
}
this.displayHealthStatus();
}, 300000); // 5分钟
// 立即执行一次
setTimeout(() => {
this.exchanges.forEach((_, exchangeId) => {
this.healthCheck(exchangeId);
});
}, 1000);
}
// 显示健康状态
displayHealthStatus() {
console.log('\n📊 交易所健康状态报告:');
console.log('='.repeat(60));
for (const [exchangeId, health] of this.healthChecks) {
const statusEmoji = health.status === 'healthy' ? '🟢' : '🔴';
const time = new Date(health.timestamp).toLocaleTimeString();
console.log(`${statusEmoji} ${exchangeId} (${time})`);
if (health.status === 'healthy') {
console.log(` └─ 市场: ${health.marketCount} 个 | 时差: ${health.serverTimeDiff}ms | 认证: ${health.balanceCheck}`);
} else {
console.log(` └─ 错误: ${health.error}`);
}
}
console.log('='.repeat(60) + '\n');
}
// 获取最佳交易所
getBestExchange(criteria = {}) {
let bestExchange = null;
let bestScore = -1;
for (const [exchangeId, exchange] of this.exchanges) {
const health = this.healthChecks.get(exchangeId);
if (!health || health.status !== 'healthy') {
continue; // 跳过不健康的交易所
}
let score = 100;
// 根据时间差评分
if (health.serverTimeDiff > 10000) score -= 30;
else if (health.serverTimeDiff > 5000) score -= 15;
// 根据市场数量评分
score += Math.min(health.marketCount / 100, 20);
// 根据认证状态评分
if (health.balanceCheck === 'ok') score += 10;
else if (health.balanceCheck === 'auth_error') score -= 20;
// 自定义标准
if (criteria.preferredExchanges && criteria.preferredExchanges.includes(exchangeId)) {
score += 50;
}
if (score > bestScore) {
bestScore = score;
bestExchange = { id: exchangeId, exchange, score, health };
}
}
return bestExchange;
}
// 安全API调用包装器
async safeApiCall(exchangeId, method, ...args) {
const exchange = this.exchanges.get(exchangeId);
if (!exchange) {
throw new Error(`交易所 ${exchangeId} 未找到`);
}
const retrier = new SmartRetrier({
maxRetries: 3,
baseDelay: 2000,
});
const rateLimiter = new AdvancedRateLimiter(exchange);
return retrier.execute(async () => {
return rateLimiter.wrapApiCall(exchange[method], ...args);
});
}
// 发送告警
sendAlert(type, data) {
const alert = {
type,
data,
timestamp: Date.now(),
id: `${type}_${Date.now()}`,
};
this.alerts.push(alert);
// 限制告警历史数量
if (this.alerts.length > 1000) {
this.alerts = this.alerts.slice(-1000);
}
console.warn(`🚨 告警: ${type}`, data);
// 这里可以集成真实的告警系统
// 如: 邮件、短信、Slack、钉钉等
}
// 获取告警历史
getAlerts(type = null, limit = 50) {
let alerts = this.alerts;
if (type) {
alerts = alerts.filter(alert => alert.type === type);
}
return alerts
.sort((a, b) => b.timestamp - a.timestamp)
.slice(0, limit);
}
// 关闭管理器
async shutdown() {
console.log('🛑 关闭交易所管理器...');
// 这里可以添加清理逻辑
// 如: 关闭WebSocket连接、保存状态等
this.exchanges.clear();
this.healthChecks.clear();
this.monitors.clear();
console.log('✅ 交易所管理器已关闭');
}
}
// 生产环境配置示例
const productionConfig = {
exchanges: [
{
id: 'binance_spot',
name: 'binance',
apiKeyEnv: 'BINANCE_API_KEY',
secretEnv: 'BINANCE_SECRET',
sandbox: false,
options: {
timeout: 30000,
enableRateLimit: true,
},
exchangeOptions: {
defaultType: 'spot',
},
},
{
id: 'coinbase_pro',
name: 'coinbasepro',
apiKeyEnv: 'COINBASE_API_KEY',
secretEnv: 'COINBASE_SECRET',
sandbox: false,
options: {
timeout: 30000,
enableRateLimit: true,
},
},
],
};
// 使用示例
async function productionExample() {
const manager = new ProductionExchangeManager(productionConfig);
// 等待初始化完成
await new Promise(resolve => setTimeout(resolve, 2000));
try {
// 获取最佳交易所
const best = manager.getBestExchange({
preferredExchanges: ['binance_spot'],
});
if (best) {
console.log(`🏆 最佳交易所: ${best.id} (评分: ${best.score})`);
// 使用最佳交易所获取数据
const ticker = await manager.safeApiCall(best.id, 'fetchTicker', 'BTC/USDT');
console.log(`BTC/USDT: $${ticker.last}`);
}
} catch (error) {
console.error('❌ 生产示例失败:', error.message);
}
}
// productionExample();
📊 监控和日志
日志管理系统
javascript
class LogManager {
constructor(config = {}) {
this.logLevel = config.logLevel || 'info';
this.logFile = config.logFile;
this.maxLogSize = config.maxLogSize || 10 * 1024 * 1024; // 10MB
this.correlationIds = new Map();
this.levels = {
error: 0,
warn: 1,
info: 2,
debug: 3,
};
}
// 生成关联ID
generateCorrelationId() {
return `${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
}
// 设置操作关联ID
setCorrelationId(operationId, correlationId = null) {
if (!correlationId) {
correlationId = this.generateCorrelationId();
}
this.correlationIds.set(operationId, correlationId);
return correlationId;
}
// 获取关联ID
getCorrelationId(operationId) {
return this.correlationIds.get(operationId);
}
// 格式化日志
formatLog(level, message, context = {}) {
const timestamp = new Date().toISOString();
const correlationId = context.correlationId || 'unknown';
const logEntry = {
timestamp,
level,
message,
correlationId,
...context,
};
// JSON格式便于机器解析
if (this.logFile) {
return JSON.stringify(logEntry);
}
// 人类可读格式
const contextStr = Object.keys(context).length > 0 ?
` [${JSON.stringify(context)}]` : '';
return `${timestamp} [${level.toUpperCase()}] [${correlationId}] ${message}${contextStr}`;
}
// 写入日志
writeLog(level, message, context = {}) {
if (this.levels[level] > this.levels[this.logLevel]) {
return; // 跳过低级别日志
}
const formattedLog = this.formatLog(level, message, context);
// 控制台输出
const consoleMethod = level === 'error' ? 'error' :
level === 'warn' ? 'warn' : 'log';
console[consoleMethod](formattedLog);
// 文件输出(简化实现)
if (this.logFile) {
// 实际项目中应该使用专业的日志库如winston
require('fs').appendFileSync(this.logFile, formattedLog + '\n');
}
}
// 日志方法
error(message, context = {}) {
this.writeLog('error', message, context);
}
warn(message, context = {}) {
this.writeLog('warn', message, context);
}
info(message, context = {}) {
this.writeLog('info', message, context);
}
debug(message, context = {}) {
this.writeLog('debug', message, context);
}
// API调用日志
logApiCall(exchange, method, params, correlationId) {
this.info('API调用开始', {
correlationId,
exchange: exchange.id,
method,
params: JSON.stringify(params),
timestamp: Date.now(),
});
}
logApiResponse(exchange, method, duration, success, error, correlationId) {
const level = success ? 'info' : 'error';
const message = success ? 'API调用成功' : 'API调用失败';
this[level](message, {
correlationId,
exchange: exchange.id,
method,
duration,
success,
error: error?.message,
errorType: error?.constructor.name,
});
}
}
// 性能监控器
class PerformanceMonitor {
constructor() {
this.metrics = new Map();
this.timers = new Map();
}
// 开始计时
startTimer(operationId) {
this.timers.set(operationId, {
startTime: Date.now(),
startMemory: process.memoryUsage(),
});
}
// 结束计时
endTimer(operationId) {
const timer = this.timers.get(operationId);
if (!timer) return null;
const endTime = Date.now();
const endMemory = process.memoryUsage();
const metrics = {
duration: endTime - timer.startTime,
memoryDelta: {
rss: endMemory.rss - timer.startMemory.rss,
heapUsed: endMemory.heapUsed - timer.startMemory.heapUsed,
heapTotal: endMemory.heapTotal - timer.startMemory.heapTotal,
},
};
this.timers.delete(operationId);
return metrics;
}
// 记录指标
recordMetric(name, value, tags = {}) {
const key = `${name}_${JSON.stringify(tags)}`;
if (!this.metrics.has(key)) {
this.metrics.set(key, {
name,
tags,
values: [],
count: 0,
sum: 0,
min: Infinity,
max: -Infinity,
});
}
const metric = this.metrics.get(key);
metric.values.push({ value, timestamp: Date.now() });
metric.count++;
metric.sum += value;
metric.min = Math.min(metric.min, value);
metric.max = Math.max(metric.max, value);
// 限制历史数据数量
if (metric.values.length > 1000) {
metric.values = metric.values.slice(-1000);
}
}
// 获取指标统计
getMetricStats(name, tags = {}) {
const key = `${name}_${JSON.stringify(tags)}`;
const metric = this.metrics.get(key);
if (!metric) return null;
return {
name: metric.name,
tags: metric.tags,
count: metric.count,
sum: metric.sum,
average: metric.sum / metric.count,
min: metric.min,
max: metric.max,
recent: metric.values.slice(-10), // 最近10个值
};
}
// 获取所有指标
getAllMetrics() {
const stats = {};
for (const [key, metric] of this.metrics) {
stats[key] = this.getMetricStats(metric.name, metric.tags);
}
return stats;
}
}
// 监控装饰器
function withMonitoring(target, propertyName, descriptor) {
const originalMethod = descriptor.value;
const logger = new LogManager({ logLevel: 'info' });
const monitor = new PerformanceMonitor();
descriptor.value = async function(...args) {
const operationId = `${this.constructor.name}.${propertyName}`;
const correlationId = logger.generateCorrelationId();
// 开始监控
monitor.startTimer(operationId);
logger.logApiCall(this, propertyName, args, correlationId);
try {
const result = await originalMethod.apply(this, args);
// 记录成功
const metrics = monitor.endTimer(operationId);
logger.logApiResponse(this, propertyName, metrics.duration, true, null, correlationId);
monitor.recordMetric('api_call_duration', metrics.duration, {
exchange: this.id,
method: propertyName,
success: true,
});
return result;
} catch (error) {
// 记录失败
const metrics = monitor.endTimer(operationId);
logger.logApiResponse(this, propertyName, metrics.duration, false, error, correlationId);
monitor.recordMetric('api_call_duration', metrics.duration, {
exchange: this.id,
method: propertyName,
success: false,
errorType: error.constructor.name,
});
throw error;
}
};
return descriptor;
}
// 使用示例
class MonitoredExchange {
constructor(exchange) {
this.exchange = exchange;
this.id = exchange.id;
}
@withMonitoring
async fetchTicker(symbol) {
return await this.exchange.fetchTicker(symbol);
}
@withMonitoring
async fetchBalance() {
return await this.exchange.fetchBalance();
}
}
🎯 章节总结
本章我们学习了生产环境中使用 CCXT 的关键技能:
✅ 错误处理机制
- CCXT错误类型和处理策略
- 智能重试和错误恢复
- 错误统计和分析
✅ 限速管理
- 动态限速调整
- 智能延迟计算
- 突发请求控制
✅ 生产环境配置
- 多交易所管理
- 健康检查和监控
- 自动故障转移
✅ 监控和日志
- 结构化日志记录
- 性能指标监控
- 关联ID追踪
💡 生产环境最佳实践
- 安全第一 - 使用环境变量存储密钥,启用沙盒测试
- 容错设计 - 实施完善的重试和降级机制
- 监控告警 - 建立全面的监控和告警系统
- 性能优化 - 监控API调用频率和响应时间
- 文档记录 - 详细记录配置和操作流程
🔗 下一步
完成错误处理和最佳实践的学习后,可以:
下一章:高级功能与应用