实时推送
Rust SDK 通过 PushClient 提供基于 Protobuf/TCP+TLS 的实时推送服务,支持行情推送和账户推送,内置自动重连和心跳保活机制。自 v0.3.0 起,回调通过 set_callbacks(Callbacks { ... }) 一次性注册,回调参数类型为 src/push/pb/* 下 Protobuf 生成的结构(如 pb::QuoteData、pb::TradeTickData、pb::OrderStatusData 等);connect 是独立的异步函数,接收 Arc<PushClient>。v0.4.0 修复了 Cc 数据类型 dispatcher 路由错误,并新增 subscribe_cc / unsubscribe_cc(加密货币)和 subscribe_market / unsubscribe_market(市场状态)便捷方法。以下为完整可运行示例:
use std::sync::Arc;
use tigeropen::config::ClientConfig;
use tigeropen::push::*;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let config = ClientConfig::builder().build()?;
let account = config.account.clone();
let pc = Arc::new(PushClient::new(config, None));
// 一次性注册所有需要的回调
pc.set_callbacks(Callbacks {
on_connect: Some(Arc::new(|| println!("[推送] 连接成功"))),
on_disconnect: Some(Arc::new(|| println!("[推送] 连接断开"))),
on_error: Some(Arc::new(|msg| eprintln!("[推送] 错误: {}", msg))),
on_kickout: Some(Arc::new(|msg| eprintln!("[推送] 被踢出: {}", msg))),
on_quote: Some(Arc::new(|data| {
println!("[行情] {} price={:?} volume={:?}",
data.symbol, data.latest_price, data.volume);
})),
on_tick: Some(Arc::new(|data| {
println!("[逐笔] {} sn={} prices={} volumes={}",
data.symbol, data.sn, data.price.len(), data.volume.len());
})),
on_depth: Some(Arc::new(|data| {
let ask_levels = data.ask.as_ref().map_or(0, |a| a.price.len());
let bid_levels = data.bid.as_ref().map_or(0, |b| b.price.len());
println!("[深度] {} ask_levels={} bid_levels={}",
data.symbol, ask_levels, bid_levels);
})),
on_order: Some(Arc::new(|data| {
println!("[订单] id={} {} 状态: {}", data.id, data.symbol, data.status);
})),
on_position: Some(Arc::new(|data| {
println!("[持仓] {} qty={}", data.symbol, data.position);
})),
on_asset: Some(Arc::new(|data| {
println!("[资产] {} netLiquidation={}", data.account, data.net_liquidation);
})),
on_transaction: Some(Arc::new(|data| {
println!("[成交] id={} {} price={}", data.id, data.symbol, data.filled_price);
})),
..Default::default()
});
// 建立 TCP+TLS 连接(独立的自由函数,需要 &Arc<PushClient>)
connect(&pc).await.map_err(|e| format!("连接失败: {}", e))?;
// 订阅行情(同步调用,参数均为 Option)
pc.subscribe(&SubjectType::Quote, Some("AAPL,TSLA"), None, None);
pc.subscribe(&SubjectType::Tick, Some("AAPL"), None, None);
pc.subscribe(&SubjectType::Depth, Some("AAPL"), None, None);
// 订阅账户推送(account 通过第三个参数传入)
pc.subscribe(&SubjectType::Asset, None, Some(&account), None);
pc.subscribe(&SubjectType::Position, None, Some(&account), None);
pc.subscribe(&SubjectType::Order, None, Some(&account), None);
pc.subscribe(&SubjectType::Transaction, None, Some(&account), None);
println!("已启动推送,按 Ctrl+C 退出...");
tokio::signal::ctrl_c().await?;
pc.disconnect();
Ok(())
}初始化
PushClient::new 创建推送客户端
PushClient::new(config: ClientConfig, options: Option<PushClientOptions>) -> PushClient
说明
创建 PushClient 实例。可传入可选的 PushClientOptions 自定义推送服务地址、心跳间隔、重连行为等。推送客户端通常包在 Arc 中,因为自由函数 connect 需要 &Arc<PushClient>。
参数
| 参数名 | 类型 | 是否必填 | 描述 |
|---|---|---|---|
| config | ClientConfig | 是 | 客户端配置对象(ClientConfig::builder().build()? 返回) |
| options | Option<PushClientOptions> | 否 | 可选配置项 |
PushClientOptions
| 字段 | 类型 | 默认值 | 描述 |
|---|---|---|---|
| push_url | Option<String> | openapi.tigerfintech.com:9883 | 推送服务器 host:port |
| heartbeat_interval_secs | Option<u64> | 10 | 心跳间隔(秒) |
| reconnect_interval_secs | Option<u64> | 5 | 初始重连间隔(秒),失败后指数退避,最大 60 秒 |
| auto_reconnect | Option<bool> | true | 是否启用自动重连 |
| connect_timeout_secs | Option<u64> | 30 | 连接超时(秒) |
示例
use std::sync::Arc;
use tigeropen::config::ClientConfig;
use tigeropen::push::{PushClient, PushClientOptions};
let config = ClientConfig::builder().build()?;
let options = PushClientOptions {
heartbeat_interval_secs: Some(15),
reconnect_interval_secs: Some(3),
auto_reconnect: Some(true),
..Default::default()
};
let pc = Arc::new(PushClient::new(config, Some(options)));回调注册
Rust SDK 通过 set_callbacks 方法一次性注册所有回调。Callbacks 是一个字段全部为 Option<Arc<dyn Fn(...) + Send + Sync>> 的结构体,未设置的字段保持 None 即可。回调参数类型来自 tigeropen::push::pb(Protobuf 生成)。
set_callbacks 设置回调
pc.set_callbacks(cb: Callbacks)
Callbacks 字段
| 字段 | 类型 | 描述 |
|---|---|---|
| on_connect | Option<Arc<dyn Fn() + Send + Sync>> | 连接成功时触发 |
| on_disconnect | Option<Arc<dyn Fn() + Send + Sync>> | 连接断开时触发 |
| on_error | Option<Arc<dyn Fn(String) + Send + Sync>> | 发生错误时触发 |
| on_kickout | Option<Arc<dyn Fn(String) + Send + Sync>> | 账户被踢出时触发 |
| on_quote | Option<Arc<dyn Fn(pb::QuoteData) + Send + Sync>> | 股票行情推送 |
| on_tick | Option<Arc<dyn Fn(pb::TradeTickData) + Send + Sync>> | 逐笔成交 |
| on_depth | Option<Arc<dyn Fn(pb::QuoteDepthData) + Send + Sync>> | 盘口深度 |
| on_option | Option<Arc<dyn Fn(pb::QuoteData) + Send + Sync>> | 期权行情 |
| on_future | Option<Arc<dyn Fn(pb::QuoteData) + Send + Sync>> | 期货行情 |
| on_kline | Option<Arc<dyn Fn(pb::KlineData) + Send + Sync>> | K 线推送 |
| on_stock_top | Option<Arc<dyn Fn(pb::StockTopData) + Send + Sync>> | 股票榜单 |
| on_option_top | Option<Arc<dyn Fn(pb::OptionTopData) + Send + Sync>> | 期权榜单 |
| on_full_tick | Option<Arc<dyn Fn(pb::TickData) + Send + Sync>> | 全量 tick 推送 |
| on_quote_bbo | Option<Arc<dyn Fn(pb::QuoteData) + Send + Sync>> | 最优买卖价 BBO |
| on_asset | Option<Arc<dyn Fn(pb::AssetData) + Send + Sync>> | 资产变动 |
| on_position | Option<Arc<dyn Fn(pb::PositionData) + Send + Sync>> | 持仓变动 |
| on_order | Option<Arc<dyn Fn(pb::OrderStatusData) + Send + Sync>> | 订单状态变更 |
| on_transaction | Option<Arc<dyn Fn(pb::OrderTransactionData) + Send + Sync>> | 成交明细 |
示例
use std::sync::Arc;
use tigeropen::push::Callbacks;
pc.set_callbacks(Callbacks {
on_quote: Some(Arc::new(|data| {
println!("{} price={:?}", data.symbol, data.latest_price);
})),
on_order: Some(Arc::new(|data| {
println!("订单 id={} 状态: {}", data.id, data.status);
})),
..Default::default()
});连接管理
connect / disconnect 连接与断开
pub async fn connect(client: &Arc<PushClient>) -> Result<(), String>
pub fn disconnect(&self)说明
connect是tigeropen::push模块下的自由异步函数,接收&Arc<PushClient>。它会建立 TCP+TLS 连接、完成鉴权,并启动读/写/心跳三个后台任务;鉴权成功后Futureresolve,on_connect回调触发。pc.disconnect()主动断开连接(会先发送 DISCONNECT 消息),并停止后台任务,不触发自动重连。
自动重连机制
当读循环检测到异常断开时,SDK 会按 reconnect_interval 指数退避(最大 60 秒)尝试重连;重连成功后会自动恢复之前的行情与账户订阅。每次重连成功时会重新触发 on_connect。
示例
use std::sync::Arc;
use tigeropen::push::{PushClient, connect};
let pc = Arc::new(PushClient::new(config, None));
connect(&pc).await.map_err(|e| format!("连接失败: {}", e))?;
// 主动断开
pc.disconnect();state 获取连接状态
pc.state() -> ConnectionState
说明
返回当前连接状态。ConnectionState 是一个具名枚举。
ConnectionState 变体
| 变体 | 描述 |
|---|---|
ConnectionState::Disconnected | 未连接 |
ConnectionState::Connecting | 连接中 |
ConnectionState::Connected | 已连接 |
示例
use tigeropen::push::ConnectionState;
if pc.state() == ConnectionState::Connected {
pc.subscribe(&SubjectType::Quote, Some("AAPL"), None, None);
}订阅接口
Rust SDK 通过统一的 subscribe / unsubscribe 方法管理所有订阅,SubjectType 枚举区分订阅类型。方法均为同步调用,返回 bool(写入发送通道是否成功)。
subscribe 订阅
pub fn subscribe(
&self,
subject: &SubjectType,
symbols: Option<&str>,
account: Option<&str>,
market: Option<&str>,
) -> bool说明
发送一次订阅请求。symbols 为逗号分隔字符串(如 "AAPL,TSLA"),适用于行情类订阅;账户类订阅使用 account 参数;榜单类订阅通常使用 market 参数。
参数
| 参数名 | 类型 | 描述 |
|---|---|---|
| subject | &SubjectType | 订阅类型,见下方枚举值 |
| symbols | Option<&str> | 标的代码(逗号分隔) |
| account | Option<&str> | 账户 ID(账户推送) |
| market | Option<&str> | 市场代码(榜单推送) |
SubjectType 枚举值
| 值 | 描述 |
|---|---|
SubjectType::Quote | 股票实时行情 |
SubjectType::Tick | 逐笔成交 |
SubjectType::Depth | 盘口深度 |
SubjectType::Option | 期权行情 |
SubjectType::Future | 期货行情 |
SubjectType::Kline | K 线数据 |
SubjectType::StockTop | 股票榜单 |
SubjectType::OptionTop | 期权榜单 |
SubjectType::FullTick | 全量 tick |
SubjectType::QuoteBbo | 最优买卖价 BBO |
SubjectType::Asset | 账户资产变动 |
SubjectType::Position | 持仓变动 |
SubjectType::Order | 订单状态变更 |
SubjectType::Transaction | 成交明细 |
示例
use tigeropen::push::SubjectType;
// 订阅行情(逗号分隔)
pc.subscribe(&SubjectType::Quote, Some("AAPL,TSLA,GOOG"), None, None);
pc.subscribe(&SubjectType::Tick, Some("AAPL"), None, None);
pc.subscribe(&SubjectType::Depth, Some("AAPL"), None, None);
pc.subscribe(&SubjectType::Kline, Some("AAPL"), None, None);
// 期权订阅使用 OCC identifier(注意双空格)
pc.subscribe(&SubjectType::Option, Some("AAPL 250117C00150000"), None, None);
// 期货订阅
pc.subscribe(&SubjectType::Future, Some("CL2506"), None, None);
// 榜单订阅使用 market
pc.subscribe(&SubjectType::StockTop, None, None, Some("US"));
// 账户推送:symbols=None,account 传入账户号
pc.subscribe(&SubjectType::Asset, None, Some(&account), None);
pc.subscribe(&SubjectType::Position, None, Some(&account), None);
pc.subscribe(&SubjectType::Order, None, Some(&account), None);
pc.subscribe(&SubjectType::Transaction, None, Some(&account), None);unsubscribe 退订
pub fn unsubscribe(
&self,
subject: &SubjectType,
symbols: Option<&str>,
account: Option<&str>,
market: Option<&str>,
) -> bool说明
发送一次退订请求。参数语义与 subscribe 完全一致。
示例
// 退订 TSLA 的行情
pc.unsubscribe(&SubjectType::Quote, Some("TSLA"), None, None);
// 退订账户资产推送
pc.unsubscribe(&SubjectType::Asset, None, Some(&account), None);订阅状态查询
get_subscriptions / get_account_subscriptions
pub fn get_subscriptions(&self) -> HashMap<SubjectType, Vec<String>>
pub fn get_account_subscriptions(&self) -> Vec<SubjectType>说明
查询当前已订阅的行情主题与账户主题。行情订阅返回 subject -> symbols 的映射;账户订阅返回类型列表。
示例
let quote_subs = pc.get_subscriptions();
let acct_subs = pc.get_account_subscriptions();
println!("行情订阅: {:?}", quote_subs);
println!("账户订阅: {:?}", acct_subs);便捷订阅方法 (v0.4.0 新增)
subscribe_cc / unsubscribe_cc 加密货币订阅 (v0.4.0 新增)
pub fn subscribe_cc(&self, symbols: &str) -> bool
pub fn unsubscribe_cc(&self, symbols: &str) -> bool说明
加密货币行情的便捷订阅/退订方法。等价于 subscribe(&SubjectType::Quote, Some(symbols), None, None),但语义更清晰。Cc 数据通过 on_quote 回调接收。
v0.4.0 Bug 修复:此前
Cc类型推送数据会错误落入QuoteBBOfallback 分支,现已修复,Cc明确路由到on_quote回调,与 Go/Python/Java SDK 一致。
示例
// 订阅加密货币行情
pc.subscribe_cc("BTC,ETH");
// 退订
pc.unsubscribe_cc("BTC");subscribe_market / unsubscribe_market 市场状态订阅 (v0.4.0 新增)
pub fn subscribe_market(&self, market: &str) -> bool
pub fn unsubscribe_market(&self, market: &str) -> bool说明
市场状态变更的便捷订阅/退订方法。等价于 subscribe(&SubjectType::Quote, None, None, Some(market))。市场状态数据通过 on_quote 回调接收。
示例
// 订阅美股市场状态
pc.subscribe_market("US");
// 退订
pc.unsubscribe_market("US");Updated 16 days ago
