实现Ping,Pong心跳

This commit is contained in:
zqm
2026-04-10 16:06:45 +08:00
parent b650404e4a
commit 9954ee2567
6 changed files with 343 additions and 7 deletions

View File

@@ -11,6 +11,15 @@ dependencies = [
"memchr",
]
[[package]]
name = "android_system_properties"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311"
dependencies = [
"libc",
]
[[package]]
name = "anyhow"
version = "1.0.102"
@@ -28,6 +37,12 @@ dependencies = [
"syn",
]
[[package]]
name = "autocfg"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8"
[[package]]
name = "bitflags"
version = "2.11.0"
@@ -43,6 +58,12 @@ dependencies = [
"generic-array",
]
[[package]]
name = "bumpalo"
version = "3.20.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5d20789868f4b01b2f2caec9f5c4e0213b41e3e5702a50157d699ae31ced2fcb"
[[package]]
name = "byteorder"
version = "1.5.0"
@@ -71,6 +92,19 @@ version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9330f8b2ff13f34540b44e946ef35111825727b38d33286ef986142615121801"
[[package]]
name = "chrono"
version = "0.4.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c673075a2e0e5f4a1dde27ce9dee1ea4558c7ffe648f576438a20ca1d2acc4b0"
dependencies = [
"iana-time-zone",
"js-sys",
"num-traits",
"wasm-bindgen",
"windows-link",
]
[[package]]
name = "core-foundation"
version = "0.10.1"
@@ -111,6 +145,7 @@ name = "cube_lib"
version = "0.1.0"
dependencies = [
"async-trait",
"chrono",
"futures-util",
"http",
"serde",
@@ -310,6 +345,30 @@ version = "1.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6dbf3de79e51f3d586ab4cb9d5c3e2c14aa28ed23d180cf89b4df0454a69cc87"
[[package]]
name = "iana-time-zone"
version = "0.1.65"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e31bc9ad994ba00e440a8aa5c9ef0ec67d5cb5e5cb0cc7f8b744a35b389cc470"
dependencies = [
"android_system_properties",
"core-foundation-sys",
"iana-time-zone-haiku",
"js-sys",
"log",
"wasm-bindgen",
"windows-core",
]
[[package]]
name = "iana-time-zone-haiku"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f"
dependencies = [
"cc",
]
[[package]]
name = "icu_collections"
version = "2.2.0"
@@ -437,6 +496,16 @@ version = "1.0.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f42a60cbdf9a97f5d2305f08a87dc4e09308d1276d28c869c684d7777685682"
[[package]]
name = "js-sys"
version = "0.3.94"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2e04e2ef80ce82e13552136fabeef8a5ed1f985a96805761cbb9a2c34e7664d9"
dependencies = [
"once_cell",
"wasm-bindgen",
]
[[package]]
name = "lazy_static"
version = "1.5.0"
@@ -534,6 +603,15 @@ dependencies = [
"windows-sys",
]
[[package]]
name = "num-traits"
version = "0.2.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841"
dependencies = [
"autocfg",
]
[[package]]
name = "once_cell"
version = "1.21.4"
@@ -746,6 +824,12 @@ dependencies = [
"windows-sys",
]
[[package]]
name = "rustversion"
version = "1.0.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d"
[[package]]
name = "schannel"
version = "0.1.29"
@@ -1208,6 +1292,51 @@ dependencies = [
"wit-bindgen",
]
[[package]]
name = "wasm-bindgen"
version = "0.2.117"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0551fc1bb415591e3372d0bc4780db7e587d84e2a7e79da121051c5c4b89d0b0"
dependencies = [
"cfg-if",
"once_cell",
"rustversion",
"wasm-bindgen-macro",
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-macro"
version = "0.2.117"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7fbdf9a35adf44786aecd5ff89b4563a90325f9da0923236f6104e603c7e86be"
dependencies = [
"quote",
"wasm-bindgen-macro-support",
]
[[package]]
name = "wasm-bindgen-macro-support"
version = "0.2.117"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dca9693ef2bab6d4e6707234500350d8dad079eb508dca05530c85dc3a529ff2"
dependencies = [
"bumpalo",
"proc-macro2",
"quote",
"syn",
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-shared"
version = "0.2.117"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "39129a682a6d2d841b6c429d0c51e5cb0ed1a03829d8b3d1e69a011e62cb3d3b"
dependencies = [
"unicode-ident",
]
[[package]]
name = "wasm-encoder"
version = "0.244.0"
@@ -1242,12 +1371,65 @@ dependencies = [
"semver",
]
[[package]]
name = "windows-core"
version = "0.62.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b8e83a14d34d0623b51dce9581199302a221863196a1dde71a7663a4c2be9deb"
dependencies = [
"windows-implement",
"windows-interface",
"windows-link",
"windows-result",
"windows-strings",
]
[[package]]
name = "windows-implement"
version = "0.60.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "053e2e040ab57b9dc951b72c264860db7eb3b0200ba345b4e4c3b14f67855ddf"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "windows-interface"
version = "0.59.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f316c4a2570ba26bbec722032c4099d8c8bc095efccdc15688708623367e358"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "windows-link"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5"
[[package]]
name = "windows-result"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7781fa89eaf60850ac3d2da7af8e5242a5ea78d1a11c49bf2910bb5a73853eb5"
dependencies = [
"windows-link",
]
[[package]]
name = "windows-strings"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7837d08f69c77cf6b07689544538e017c1bfcf57e34b4c0ff58e6c2cd3b37091"
dependencies = [
"windows-link",
]
[[package]]
name = "windows-sys"
version = "0.61.2"

View File

@@ -22,6 +22,9 @@ async-trait = "0.1"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
# Time
chrono = "0.4"
# HTTP headers
http = "1.0"

View File

@@ -23,7 +23,7 @@ CubeLib/
- **事件驱动**:支持 Connected、Disconnected、Message、Error 等回调
- **自动重连**:支持指数退避的重连机制
- **自定义头**:支持设置 ClientType 等 HTTP 头
- **心跳保活**:可配置的心跳间隔
- **心跳保活**:可配置的心跳间隔,自动发送 ping、自动回复 pong、可选心跳确认回调
- **消息队列**:离线时自动缓存消息,连接后自动发送
- **调试模式**:可选的日志输出
@@ -78,6 +78,41 @@ async fn main() {
| `reconnect_delay_ms` | u64 | `1000` | 初始重连延迟(毫秒) |
| `debug_mode` | bool | `false` | 调试模式 |
| `heartbeat_interval_ms` | u64 | `30000` | 心跳间隔(毫秒)0=禁用 |
## 心跳机制
CubeLib 内置完整的心跳保活机制:
### 自动行为
- **发送 ping**:按配置的间隔自动向服务器发送 ping 消息
- **回复 pong**:收到服务器的 ping 后自动回复 pong无需应用层处理
- **心跳回调**:收到 pong 后触发 `on_heartbeat_ack` 回调,通知应用层延迟和服务器时间
### 使用方式
```rust
use cube_lib::websocket::{WebSocketClient, WebSocketConfig};
#[tokio::main]
async fn main() {
let config = WebSocketConfig::new("ws://localhost:8087/ws")
.with_heartbeat(30000); // 30秒心跳间隔
let mut client = WebSocketClient::new(config);
// 设置心跳确认回调(可选)
client.on_heartbeat_ack(|latency_ms, server_timestamp| {
println!("心跳延迟: {}ms, 服务端时间: {}", latency_ms, server_timestamp);
// 可根据延迟判断连接质量
});
client.connect().await;
}
```
### 消息格式
- **发送的 ping**`{"Type":"ping","Data":{"timestamp":"2024-01-01 12:00:00.000"}}`
- **回复的 pong**`{"Type":"pong","Data":{"timestamp":"2024-01-01 12:00:00.000"}}`
| `connect_timeout_ms` | u64 | `10000` | 连接超时(毫秒) |
| `max_queue_size` | usize | `100` | 消息队列最大长度 |
| `client_type` | Option\<String\> | `None` | 客户端类型标识 |

View File

@@ -42,5 +42,5 @@
pub mod websocket;
pub mod version;
pub use websocket::{WebSocketClient, WebSocketConfig, WebSocketMessage, OutgoingMessage, ConnectionStatus, ReconnectedCallback, MessageSender};
pub use websocket::{WebSocketClient, WebSocketConfig, WebSocketMessage, OutgoingMessage, ConnectionStatus, ReconnectedCallback, HeartbeatAckCallback, MessageSender};
pub use version::{version_less_than, compare_versions, needs_update, parse_version};

View File

@@ -84,6 +84,10 @@ pub type ReconnectedCallback = Arc<dyn Fn(String, Arc<Mutex<Option<mpsc::Sender<
/// Note: This is an async callback - return a boxed Future
pub type FirstConnectCallback = Arc<dyn Fn(String, Arc<Mutex<Option<mpsc::Sender<OutgoingMessage>>>>) -> Pin<Box<dyn std::future::Future<Output = ()> + Send + Sync + 'static>> + Send + Sync>;
/// Callback triggered on heartbeat acknowledgment (pong received)
/// Arguments: (latency_ms, server_timestamp) - latency calculated from ping timestamp to pong timestamp
pub type HeartbeatAckCallback = Arc<dyn Fn(u64, String) + Send + Sync>;
/// WebSocket client with event-driven architecture
pub struct WebSocketClient {
config: WebSocketConfig,
@@ -109,10 +113,16 @@ pub struct WebSocketClient {
on_first_connect: Option<FirstConnectCallback>,
/// Callback triggered after successful reconnection (after the first connection)
on_reconnected: Option<ReconnectedCallback>,
/// Callback triggered when pong is received
on_heartbeat_ack: Option<HeartbeatAckCallback>,
// Reconnection state
reconnect_attempts: Arc<Mutex<u32>>,
reconnect_delay_ms: Arc<Mutex<u64>>,
// Heartbeat state
/// Timestamp of last sent ping (milliseconds since Unix epoch)
last_ping_time: Arc<Mutex<Option<u64>>>,
}
impl WebSocketClient {
@@ -136,8 +146,10 @@ impl WebSocketClient {
on_reconnecting: None,
on_first_connect: None,
on_reconnected: None,
on_heartbeat_ack: None,
reconnect_attempts: Arc::new(Mutex::new(0)),
reconnect_delay_ms: Arc::new(Mutex::new(config.reconnect_delay_ms)),
last_ping_time: Arc::new(Mutex::new(None)),
}
}
@@ -245,6 +257,17 @@ impl WebSocketClient {
self
}
/// Set callback for heartbeat acknowledgment (called when pong is received)
/// The callback receives: (latency_ms, server_timestamp)
/// Use this to monitor connection health and detect disconnections
pub fn on_heartbeat_ack<F>(&mut self, callback: F) -> &mut Self
where
F: Fn(u64, String) + Send + Sync + 'static,
{
self.on_heartbeat_ack = Some(Arc::new(callback));
self
}
/// Update the URL for future reconnection attempts
/// Call this method when the server URL changes
pub fn update_url(&mut self, new_url: String) {
@@ -299,6 +322,7 @@ impl WebSocketClient {
let reconnect_delay_ms = Arc::clone(&self.reconnect_delay_ms);
let client_url = Arc::clone(&self.url);
let sender = Arc::clone(&self.sender);
let last_ping_time = Arc::clone(&self.last_ping_time);
// Callbacks
let on_connected = self.on_connected.clone();
@@ -309,6 +333,7 @@ impl WebSocketClient {
let on_reconnecting = self.on_reconnecting.clone();
let on_first_connect = self.on_first_connect.clone();
let on_reconnected = self.on_reconnected.clone();
let on_heartbeat_ack = self.on_heartbeat_ack.clone();
// Spawn the WebSocket task
*self.is_running.lock().await = true;
@@ -333,6 +358,8 @@ impl WebSocketClient {
on_first_connect,
on_reconnected,
sender,
last_ping_time,
on_heartbeat_ack,
)
.await;
});
@@ -366,6 +393,8 @@ impl WebSocketClient {
on_first_connect: Option<FirstConnectCallback>,
on_reconnected: Option<ReconnectedCallback>,
sender: Arc<Mutex<Option<mpsc::Sender<OutgoingMessage>>>>,
last_ping_time: Arc<Mutex<Option<u64>>>,
on_heartbeat_ack: Option<HeartbeatAckCallback>,
) {
loop {
let should_run = *is_running.lock().await;
@@ -384,7 +413,7 @@ impl WebSocketClient {
// Create MessageSender for on_message callback (same underlying sender)
let message_sender = MessageSender(Arc::clone(&sender));
match Self::connect_and_handle(&current_url, &config, &mut receiver, &queue, &on_connected, &on_message, &on_binary, &on_error, is_first, &on_first_connect, &on_reconnected, sender_clone, message_sender).await {
match Self::connect_and_handle(&current_url, &config, &mut receiver, &queue, &on_connected, &on_message, &on_binary, &on_error, is_first, &on_first_connect, &on_reconnected, sender_clone, message_sender, last_ping_time.clone(), on_heartbeat_ack.clone()).await {
Ok(_) => {
if config.debug_mode {
debug!("WebSocket connection closed normally");
@@ -475,6 +504,8 @@ impl WebSocketClient {
on_reconnected: &Option<ReconnectedCallback>,
sender: Arc<Mutex<Option<mpsc::Sender<OutgoingMessage>>>>,
message_sender: MessageSender,
last_ping_time: Arc<Mutex<Option<u64>>>,
on_heartbeat_ack: Option<HeartbeatAckCallback>,
) -> Result<(), WebSocketError> {
use http::header::{HeaderName, HeaderValue};
use tokio_tungstenite::tungstenite::client::IntoClientRequest;
@@ -521,26 +552,57 @@ impl WebSocketClient {
callback(url.to_string());
}
// Clone sender before callbacks move it
let sender_for_callback = sender.clone();
let sender_for_heartbeat = sender.clone();
// Call on_first_connect callback (only on first connection, not on reconnect)
if is_first {
if let Some(ref callback) = on_first_connect {
callback(url.to_string(), sender).await;
callback(url.to_string(), sender_for_callback).await;
}
} else {
// Call on_reconnected callback (after successful reconnection)
if let Some(ref callback) = on_reconnected {
callback(url.to_string(), sender).await;
callback(url.to_string(), sender_for_callback).await;
}
}
// Start heartbeat task if enabled
let ping_sender = sender_for_heartbeat;
let heartbeat_handle = if config.heartbeat_interval_ms > 0 {
let interval_duration = Duration::from_millis(config.heartbeat_interval_ms);
let ping_last_ping_time = last_ping_time.clone();
let ping_debug = config.debug_mode;
Some(tokio::spawn(async move {
let mut ticker = tokio::time::interval(interval_duration);
loop {
// Wait for first tick immediately
ticker.tick().await;
loop {
// Record ping timestamp
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
if let Ok(mut guard) = ping_last_ping_time.try_lock() {
*guard = Some(now_ms);
}
// Send ping message
let timestamp = format!("{}", chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f"));
let ping_msg = format!(r#"{{"Type":"ping","Data":{{"timestamp":"{}"}}}}"#, timestamp);
if ping_debug {
debug!("[Heartbeat] Sending ping: {}", timestamp);
}
if let Ok(guard) = ping_sender.try_lock() {
if let Some(ref tx) = *guard {
let _ = tx.send(OutgoingMessage::Text(ping_msg)).await;
}
}
ticker.tick().await;
// Heartbeat will be sent via the receiver
}
}))
} else {
@@ -565,6 +627,60 @@ impl WebSocketClient {
.unwrap_or("unknown")
.to_string();
// Auto-reply to ping from server
if msg_type == "ping" {
let timestamp = parsed.get("Data")
.and_then(|d| d.get("timestamp"))
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let pong_msg = format!(r#"{{"Type":"pong","Data":{{"timestamp":"{}"}}}}"#, timestamp);
if config.debug_mode {
debug!("[Heartbeat] Received ping, sending pong");
}
if let Ok(guard) = sender.try_lock() {
if let Some(ref tx) = *guard {
let _ = tx.try_send(OutgoingMessage::Text(pong_msg));
}
}
// Don't pass ping to application callback
continue;
}
// Handle pong response from server
if msg_type == "pong" {
let timestamp = parsed.get("Data")
.and_then(|d| d.get("timestamp"))
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
// Calculate latency
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
let last_ping = if let Ok(guard) = last_ping_time.try_lock() {
*guard
} else {
None
};
let latency_ms = last_ping
.map(|ping_time| now_ms.saturating_sub(ping_time))
.unwrap_or(0);
if config.debug_mode {
debug!("[Heartbeat] Received pong, latency: {}ms", latency_ms);
}
// Trigger heartbeat ack callback
if let Some(ref callback) = on_heartbeat_ack {
callback(latency_ms, timestamp);
}
// Don't pass pong to application callback
continue;
}
if let Some(ref callback) = on_message {
callback(msg_type, parsed, message_sender.clone());
}

View File

@@ -12,7 +12,7 @@ mod config;
mod message;
mod error;
pub use client::{WebSocketClient, ConnectionStatus, ReconnectingCallback, ReconnectedCallback, OutgoingMessage, MessageSender};
pub use client::{WebSocketClient, ConnectionStatus, ReconnectingCallback, ReconnectedCallback, HeartbeatAckCallback, OutgoingMessage, MessageSender};
pub use config::WebSocketConfig;
pub use message::{WebSocketMessage, ClientType};
pub use error::WebSocketError;