实时推送

Rust SDK 通过 PushClient 提供基于 Protobuf/TCP+TLS 的实时推送服务,支持行情推送和账户推送,内置自动重连和心跳保活机制。自 v0.3.0 起,回调通过 set_callbacks(Callbacks { ... }) 一次性注册,回调参数类型为 src/push/pb/* 下 Protobuf 生成的结构(如 pb::QuoteDatapb::TradeTickDatapb::OrderStatusData 等);connect 是独立的异步函数,接收 Arc<PushClient>v0.4.0 修复了 Cc 数据类型 dispatcher 路由错误,并新增 subscribe_cc / unsubscribe_cc(加密货币)和 subscribe_market / unsubscribe_market(市场状态)便捷方法。以下为完整可运行示例:

use std::sync::Arc;
use tigeropen::config::ClientConfig;
use tigeropen::push::*;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let config = ClientConfig::builder().build()?;
    let account = config.account.clone();
    let pc = Arc::new(PushClient::new(config, None));

    // 一次性注册所有需要的回调
    pc.set_callbacks(Callbacks {
        on_connect: Some(Arc::new(|| println!("[推送] 连接成功"))),
        on_disconnect: Some(Arc::new(|| println!("[推送] 连接断开"))),
        on_error: Some(Arc::new(|msg| eprintln!("[推送] 错误: {}", msg))),
        on_kickout: Some(Arc::new(|msg| eprintln!("[推送] 被踢出: {}", msg))),

        on_quote: Some(Arc::new(|data| {
            println!("[行情] {} price={:?} volume={:?}",
                data.symbol, data.latest_price, data.volume);
        })),
        on_tick: Some(Arc::new(|data| {
            println!("[逐笔] {} sn={} prices={} volumes={}",
                data.symbol, data.sn, data.price.len(), data.volume.len());
        })),
        on_depth: Some(Arc::new(|data| {
            let ask_levels = data.ask.as_ref().map_or(0, |a| a.price.len());
            let bid_levels = data.bid.as_ref().map_or(0, |b| b.price.len());
            println!("[深度] {} ask_levels={} bid_levels={}",
                data.symbol, ask_levels, bid_levels);
        })),

        on_order: Some(Arc::new(|data| {
            println!("[订单] id={} {} 状态: {}", data.id, data.symbol, data.status);
        })),
        on_position: Some(Arc::new(|data| {
            println!("[持仓] {} qty={}", data.symbol, data.position);
        })),
        on_asset: Some(Arc::new(|data| {
            println!("[资产] {} netLiquidation={}", data.account, data.net_liquidation);
        })),
        on_transaction: Some(Arc::new(|data| {
            println!("[成交] id={} {} price={}", data.id, data.symbol, data.filled_price);
        })),
        ..Default::default()
    });

    // 建立 TCP+TLS 连接(独立的自由函数,需要 &Arc<PushClient>)
    connect(&pc).await.map_err(|e| format!("连接失败: {}", e))?;

    // 订阅行情(同步调用,参数均为 Option)
    pc.subscribe(&SubjectType::Quote, Some("AAPL,TSLA"), None, None);
    pc.subscribe(&SubjectType::Tick, Some("AAPL"), None, None);
    pc.subscribe(&SubjectType::Depth, Some("AAPL"), None, None);

    // 订阅账户推送(account 通过第三个参数传入)
    pc.subscribe(&SubjectType::Asset, None, Some(&account), None);
    pc.subscribe(&SubjectType::Position, None, Some(&account), None);
    pc.subscribe(&SubjectType::Order, None, Some(&account), None);
    pc.subscribe(&SubjectType::Transaction, None, Some(&account), None);

    println!("已启动推送,按 Ctrl+C 退出...");
    tokio::signal::ctrl_c().await?;
    pc.disconnect();
    Ok(())
}

初始化

PushClient::new 创建推送客户端

PushClient::new(config: ClientConfig, options: Option<PushClientOptions>) -> PushClient

说明

创建 PushClient 实例。可传入可选的 PushClientOptions 自定义推送服务地址、心跳间隔、重连行为等。推送客户端通常包在 Arc 中,因为自由函数 connect 需要 &Arc<PushClient>

参数

参数名类型是否必填描述
configClientConfig客户端配置对象(ClientConfig::builder().build()? 返回)
optionsOption<PushClientOptions>可选配置项

PushClientOptions

字段类型默认值描述
push_urlOption<String>openapi.tigerfintech.com:9883推送服务器 host:port
heartbeat_interval_secsOption<u64>10心跳间隔(秒)
reconnect_interval_secsOption<u64>5初始重连间隔(秒),失败后指数退避,最大 60 秒
auto_reconnectOption<bool>true是否启用自动重连
connect_timeout_secsOption<u64>30连接超时(秒)

示例

use std::sync::Arc;
use tigeropen::config::ClientConfig;
use tigeropen::push::{PushClient, PushClientOptions};

let config = ClientConfig::builder().build()?;
let options = PushClientOptions {
    heartbeat_interval_secs: Some(15),
    reconnect_interval_secs: Some(3),
    auto_reconnect: Some(true),
    ..Default::default()
};
let pc = Arc::new(PushClient::new(config, Some(options)));

回调注册

Rust SDK 通过 set_callbacks 方法一次性注册所有回调。Callbacks 是一个字段全部为 Option<Arc<dyn Fn(...) + Send + Sync>> 的结构体,未设置的字段保持 None 即可。回调参数类型来自 tigeropen::push::pb(Protobuf 生成)。

set_callbacks 设置回调

pc.set_callbacks(cb: Callbacks)

Callbacks 字段

字段类型描述
on_connectOption<Arc<dyn Fn() + Send + Sync>>连接成功时触发
on_disconnectOption<Arc<dyn Fn() + Send + Sync>>连接断开时触发
on_errorOption<Arc<dyn Fn(String) + Send + Sync>>发生错误时触发
on_kickoutOption<Arc<dyn Fn(String) + Send + Sync>>账户被踢出时触发
on_quoteOption<Arc<dyn Fn(pb::QuoteData) + Send + Sync>>股票行情推送
on_tickOption<Arc<dyn Fn(pb::TradeTickData) + Send + Sync>>逐笔成交
on_depthOption<Arc<dyn Fn(pb::QuoteDepthData) + Send + Sync>>盘口深度
on_optionOption<Arc<dyn Fn(pb::QuoteData) + Send + Sync>>期权行情
on_futureOption<Arc<dyn Fn(pb::QuoteData) + Send + Sync>>期货行情
on_klineOption<Arc<dyn Fn(pb::KlineData) + Send + Sync>>K 线推送
on_stock_topOption<Arc<dyn Fn(pb::StockTopData) + Send + Sync>>股票榜单
on_option_topOption<Arc<dyn Fn(pb::OptionTopData) + Send + Sync>>期权榜单
on_full_tickOption<Arc<dyn Fn(pb::TickData) + Send + Sync>>全量 tick 推送
on_quote_bboOption<Arc<dyn Fn(pb::QuoteData) + Send + Sync>>最优买卖价 BBO
on_assetOption<Arc<dyn Fn(pb::AssetData) + Send + Sync>>资产变动
on_positionOption<Arc<dyn Fn(pb::PositionData) + Send + Sync>>持仓变动
on_orderOption<Arc<dyn Fn(pb::OrderStatusData) + Send + Sync>>订单状态变更
on_transactionOption<Arc<dyn Fn(pb::OrderTransactionData) + Send + Sync>>成交明细

示例

use std::sync::Arc;
use tigeropen::push::Callbacks;

pc.set_callbacks(Callbacks {
    on_quote: Some(Arc::new(|data| {
        println!("{} price={:?}", data.symbol, data.latest_price);
    })),
    on_order: Some(Arc::new(|data| {
        println!("订单 id={} 状态: {}", data.id, data.status);
    })),
    ..Default::default()
});

连接管理

connect / disconnect 连接与断开

pub async fn connect(client: &Arc<PushClient>) -> Result<(), String>
pub fn disconnect(&self)

说明

  • connecttigeropen::push 模块下的自由异步函数,接收 &Arc<PushClient>。它会建立 TCP+TLS 连接、完成鉴权,并启动读/写/心跳三个后台任务;鉴权成功后 Future resolve,on_connect 回调触发。
  • pc.disconnect() 主动断开连接(会先发送 DISCONNECT 消息),并停止后台任务,不触发自动重连。

自动重连机制

当读循环检测到异常断开时,SDK 会按 reconnect_interval 指数退避(最大 60 秒)尝试重连;重连成功后会自动恢复之前的行情与账户订阅。每次重连成功时会重新触发 on_connect

示例

use std::sync::Arc;
use tigeropen::push::{PushClient, connect};

let pc = Arc::new(PushClient::new(config, None));
connect(&pc).await.map_err(|e| format!("连接失败: {}", e))?;

// 主动断开
pc.disconnect();

state 获取连接状态

pc.state() -> ConnectionState

说明

返回当前连接状态。ConnectionState 是一个具名枚举。

ConnectionState 变体

变体描述
ConnectionState::Disconnected未连接
ConnectionState::Connecting连接中
ConnectionState::Connected已连接

示例

use tigeropen::push::ConnectionState;

if pc.state() == ConnectionState::Connected {
    pc.subscribe(&SubjectType::Quote, Some("AAPL"), None, None);
}

订阅接口

Rust SDK 通过统一的 subscribe / unsubscribe 方法管理所有订阅,SubjectType 枚举区分订阅类型。方法均为同步调用,返回 bool(写入发送通道是否成功)。

subscribe 订阅

pub fn subscribe(
    &self,
    subject: &SubjectType,
    symbols: Option<&str>,
    account: Option<&str>,
    market: Option<&str>,
) -> bool

说明

发送一次订阅请求。symbols 为逗号分隔字符串(如 "AAPL,TSLA"),适用于行情类订阅;账户类订阅使用 account 参数;榜单类订阅通常使用 market 参数。

参数

参数名类型描述
subject&SubjectType订阅类型,见下方枚举值
symbolsOption<&str>标的代码(逗号分隔)
accountOption<&str>账户 ID(账户推送)
marketOption<&str>市场代码(榜单推送)

SubjectType 枚举值

描述
SubjectType::Quote股票实时行情
SubjectType::Tick逐笔成交
SubjectType::Depth盘口深度
SubjectType::Option期权行情
SubjectType::Future期货行情
SubjectType::KlineK 线数据
SubjectType::StockTop股票榜单
SubjectType::OptionTop期权榜单
SubjectType::FullTick全量 tick
SubjectType::QuoteBbo最优买卖价 BBO
SubjectType::Asset账户资产变动
SubjectType::Position持仓变动
SubjectType::Order订单状态变更
SubjectType::Transaction成交明细

示例

use tigeropen::push::SubjectType;

// 订阅行情(逗号分隔)
pc.subscribe(&SubjectType::Quote, Some("AAPL,TSLA,GOOG"), None, None);
pc.subscribe(&SubjectType::Tick, Some("AAPL"), None, None);
pc.subscribe(&SubjectType::Depth, Some("AAPL"), None, None);
pc.subscribe(&SubjectType::Kline, Some("AAPL"), None, None);

// 期权订阅使用 OCC identifier(注意双空格)
pc.subscribe(&SubjectType::Option, Some("AAPL  250117C00150000"), None, None);

// 期货订阅
pc.subscribe(&SubjectType::Future, Some("CL2506"), None, None);

// 榜单订阅使用 market
pc.subscribe(&SubjectType::StockTop, None, None, Some("US"));

// 账户推送:symbols=None,account 传入账户号
pc.subscribe(&SubjectType::Asset, None, Some(&account), None);
pc.subscribe(&SubjectType::Position, None, Some(&account), None);
pc.subscribe(&SubjectType::Order, None, Some(&account), None);
pc.subscribe(&SubjectType::Transaction, None, Some(&account), None);

unsubscribe 退订

pub fn unsubscribe(
    &self,
    subject: &SubjectType,
    symbols: Option<&str>,
    account: Option<&str>,
    market: Option<&str>,
) -> bool

说明

发送一次退订请求。参数语义与 subscribe 完全一致。

示例

// 退订 TSLA 的行情
pc.unsubscribe(&SubjectType::Quote, Some("TSLA"), None, None);

// 退订账户资产推送
pc.unsubscribe(&SubjectType::Asset, None, Some(&account), None);

订阅状态查询

get_subscriptions / get_account_subscriptions

pub fn get_subscriptions(&self) -> HashMap<SubjectType, Vec<String>>
pub fn get_account_subscriptions(&self) -> Vec<SubjectType>

说明

查询当前已订阅的行情主题与账户主题。行情订阅返回 subject -> symbols 的映射;账户订阅返回类型列表。

示例

let quote_subs = pc.get_subscriptions();
let acct_subs = pc.get_account_subscriptions();
println!("行情订阅: {:?}", quote_subs);
println!("账户订阅: {:?}", acct_subs);

便捷订阅方法 (v0.4.0 新增)

subscribe_cc / unsubscribe_cc 加密货币订阅 (v0.4.0 新增)

pub fn subscribe_cc(&self, symbols: &str) -> bool
pub fn unsubscribe_cc(&self, symbols: &str) -> bool

说明

加密货币行情的便捷订阅/退订方法。等价于 subscribe(&SubjectType::Quote, Some(symbols), None, None),但语义更清晰。Cc 数据通过 on_quote 回调接收。

v0.4.0 Bug 修复:此前 Cc 类型推送数据会错误落入 QuoteBBO fallback 分支,现已修复,Cc 明确路由到 on_quote 回调,与 Go/Python/Java SDK 一致。

示例

// 订阅加密货币行情
pc.subscribe_cc("BTC,ETH");

// 退订
pc.unsubscribe_cc("BTC");

subscribe_market / unsubscribe_market 市场状态订阅 (v0.4.0 新增)

pub fn subscribe_market(&self, market: &str) -> bool
pub fn unsubscribe_market(&self, market: &str) -> bool

说明

市场状态变更的便捷订阅/退订方法。等价于 subscribe(&SubjectType::Quote, None, None, Some(market))。市场状态数据通过 on_quote 回调接收。

示例

// 订阅美股市场状态
pc.subscribe_market("US");

// 退订
pc.unsubscribe_market("US");