diff --git a/Windows/Rust/CubeLib/Cargo.lock b/Windows/Rust/CubeLib/Cargo.lock index 67aec61..e3b35fe 100644 --- a/Windows/Rust/CubeLib/Cargo.lock +++ b/Windows/Rust/CubeLib/Cargo.lock @@ -11,6 +11,15 @@ dependencies = [ "memchr", ] +[[package]] +name = "android_system_properties" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311" +dependencies = [ + "libc", +] + [[package]] name = "anyhow" version = "1.0.102" @@ -28,6 +37,12 @@ dependencies = [ "syn", ] +[[package]] +name = "autocfg" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" + [[package]] name = "bitflags" version = "2.11.0" @@ -43,6 +58,12 @@ dependencies = [ "generic-array", ] +[[package]] +name = "bumpalo" +version = "3.20.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d20789868f4b01b2f2caec9f5c4e0213b41e3e5702a50157d699ae31ced2fcb" + [[package]] name = "byteorder" version = "1.5.0" @@ -71,6 +92,19 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9330f8b2ff13f34540b44e946ef35111825727b38d33286ef986142615121801" +[[package]] +name = "chrono" +version = "0.4.44" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c673075a2e0e5f4a1dde27ce9dee1ea4558c7ffe648f576438a20ca1d2acc4b0" +dependencies = [ + "iana-time-zone", + "js-sys", + "num-traits", + "wasm-bindgen", + "windows-link", +] + [[package]] name = "core-foundation" version = "0.10.1" @@ -111,6 +145,7 @@ name = "cube_lib" version = "0.1.0" dependencies = [ "async-trait", + "chrono", "futures-util", "http", "serde", @@ -310,6 +345,30 @@ version = "1.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6dbf3de79e51f3d586ab4cb9d5c3e2c14aa28ed23d180cf89b4df0454a69cc87" +[[package]] +name = "iana-time-zone" +version = "0.1.65" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e31bc9ad994ba00e440a8aa5c9ef0ec67d5cb5e5cb0cc7f8b744a35b389cc470" +dependencies = [ + "android_system_properties", + "core-foundation-sys", + "iana-time-zone-haiku", + "js-sys", + "log", + "wasm-bindgen", + "windows-core", +] + +[[package]] +name = "iana-time-zone-haiku" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f" +dependencies = [ + "cc", +] + [[package]] name = "icu_collections" version = "2.2.0" @@ -437,6 +496,16 @@ version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f42a60cbdf9a97f5d2305f08a87dc4e09308d1276d28c869c684d7777685682" +[[package]] +name = "js-sys" +version = "0.3.94" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e04e2ef80ce82e13552136fabeef8a5ed1f985a96805761cbb9a2c34e7664d9" +dependencies = [ + "once_cell", + "wasm-bindgen", +] + [[package]] name = "lazy_static" version = "1.5.0" @@ -534,6 +603,15 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "num-traits" +version = "0.2.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841" +dependencies = [ + "autocfg", +] + [[package]] name = "once_cell" version = "1.21.4" @@ -746,6 +824,12 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "rustversion" +version = "1.0.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d" + [[package]] name = "schannel" version = "0.1.29" @@ -1208,6 +1292,51 @@ dependencies = [ "wit-bindgen", ] +[[package]] +name = "wasm-bindgen" +version = "0.2.117" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0551fc1bb415591e3372d0bc4780db7e587d84e2a7e79da121051c5c4b89d0b0" +dependencies = [ + "cfg-if", + "once_cell", + "rustversion", + "wasm-bindgen-macro", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-macro" +version = "0.2.117" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7fbdf9a35adf44786aecd5ff89b4563a90325f9da0923236f6104e603c7e86be" +dependencies = [ + "quote", + "wasm-bindgen-macro-support", +] + +[[package]] +name = "wasm-bindgen-macro-support" +version = "0.2.117" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dca9693ef2bab6d4e6707234500350d8dad079eb508dca05530c85dc3a529ff2" +dependencies = [ + "bumpalo", + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-shared" +version = "0.2.117" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39129a682a6d2d841b6c429d0c51e5cb0ed1a03829d8b3d1e69a011e62cb3d3b" +dependencies = [ + "unicode-ident", +] + [[package]] name = "wasm-encoder" version = "0.244.0" @@ -1242,12 +1371,65 @@ dependencies = [ "semver", ] +[[package]] +name = "windows-core" +version = "0.62.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8e83a14d34d0623b51dce9581199302a221863196a1dde71a7663a4c2be9deb" +dependencies = [ + "windows-implement", + "windows-interface", + "windows-link", + "windows-result", + "windows-strings", +] + +[[package]] +name = "windows-implement" +version = "0.60.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "053e2e040ab57b9dc951b72c264860db7eb3b0200ba345b4e4c3b14f67855ddf" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "windows-interface" +version = "0.59.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f316c4a2570ba26bbec722032c4099d8c8bc095efccdc15688708623367e358" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "windows-link" version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" +[[package]] +name = "windows-result" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7781fa89eaf60850ac3d2da7af8e5242a5ea78d1a11c49bf2910bb5a73853eb5" +dependencies = [ + "windows-link", +] + +[[package]] +name = "windows-strings" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7837d08f69c77cf6b07689544538e017c1bfcf57e34b4c0ff58e6c2cd3b37091" +dependencies = [ + "windows-link", +] + [[package]] name = "windows-sys" version = "0.61.2" diff --git a/Windows/Rust/CubeLib/Cargo.toml b/Windows/Rust/CubeLib/Cargo.toml index 6be88d3..24ae9d6 100644 --- a/Windows/Rust/CubeLib/Cargo.toml +++ b/Windows/Rust/CubeLib/Cargo.toml @@ -22,6 +22,9 @@ async-trait = "0.1" tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } +# Time +chrono = "0.4" + # HTTP headers http = "1.0" diff --git a/Windows/Rust/CubeLib/README.md b/Windows/Rust/CubeLib/README.md index 1c52010..351a9c7 100644 --- a/Windows/Rust/CubeLib/README.md +++ b/Windows/Rust/CubeLib/README.md @@ -23,7 +23,7 @@ CubeLib/ - **事件驱动**:支持 Connected、Disconnected、Message、Error 等回调 - **自动重连**:支持指数退避的重连机制 - **自定义头**:支持设置 ClientType 等 HTTP 头 -- **心跳保活**:可配置的心跳间隔 +- **心跳保活**:可配置的心跳间隔,自动发送 ping、自动回复 pong、可选心跳确认回调 - **消息队列**:离线时自动缓存消息,连接后自动发送 - **调试模式**:可选的日志输出 @@ -78,6 +78,41 @@ async fn main() { | `reconnect_delay_ms` | u64 | `1000` | 初始重连延迟(毫秒) | | `debug_mode` | bool | `false` | 调试模式 | | `heartbeat_interval_ms` | u64 | `30000` | 心跳间隔(毫秒),0=禁用 | + +## 心跳机制 + +CubeLib 内置完整的心跳保活机制: + +### 自动行为 +- **发送 ping**:按配置的间隔自动向服务器发送 ping 消息 +- **回复 pong**:收到服务器的 ping 后自动回复 pong,无需应用层处理 +- **心跳回调**:收到 pong 后触发 `on_heartbeat_ack` 回调,通知应用层延迟和服务器时间 + +### 使用方式 + +```rust +use cube_lib::websocket::{WebSocketClient, WebSocketConfig}; + +#[tokio::main] +async fn main() { + let config = WebSocketConfig::new("ws://localhost:8087/ws") + .with_heartbeat(30000); // 30秒心跳间隔 + + let mut client = WebSocketClient::new(config); + + // 设置心跳确认回调(可选) + client.on_heartbeat_ack(|latency_ms, server_timestamp| { + println!("心跳延迟: {}ms, 服务端时间: {}", latency_ms, server_timestamp); + // 可根据延迟判断连接质量 + }); + + client.connect().await; +} +``` + +### 消息格式 +- **发送的 ping**:`{"Type":"ping","Data":{"timestamp":"2024-01-01 12:00:00.000"}}` +- **回复的 pong**:`{"Type":"pong","Data":{"timestamp":"2024-01-01 12:00:00.000"}}` | `connect_timeout_ms` | u64 | `10000` | 连接超时(毫秒) | | `max_queue_size` | usize | `100` | 消息队列最大长度 | | `client_type` | Option\ | `None` | 客户端类型标识 | diff --git a/Windows/Rust/CubeLib/src/lib.rs b/Windows/Rust/CubeLib/src/lib.rs index 44e6561..a0db3c9 100644 --- a/Windows/Rust/CubeLib/src/lib.rs +++ b/Windows/Rust/CubeLib/src/lib.rs @@ -42,5 +42,5 @@ pub mod websocket; pub mod version; -pub use websocket::{WebSocketClient, WebSocketConfig, WebSocketMessage, OutgoingMessage, ConnectionStatus, ReconnectedCallback, MessageSender}; +pub use websocket::{WebSocketClient, WebSocketConfig, WebSocketMessage, OutgoingMessage, ConnectionStatus, ReconnectedCallback, HeartbeatAckCallback, MessageSender}; pub use version::{version_less_than, compare_versions, needs_update, parse_version}; diff --git a/Windows/Rust/CubeLib/src/websocket/client.rs b/Windows/Rust/CubeLib/src/websocket/client.rs index 09e0802..3dc7ae0 100644 --- a/Windows/Rust/CubeLib/src/websocket/client.rs +++ b/Windows/Rust/CubeLib/src/websocket/client.rs @@ -84,6 +84,10 @@ pub type ReconnectedCallback = Arc>>>) -> Pin + Send + Sync + 'static>> + Send + Sync>; +/// Callback triggered on heartbeat acknowledgment (pong received) +/// Arguments: (latency_ms, server_timestamp) - latency calculated from ping timestamp to pong timestamp +pub type HeartbeatAckCallback = Arc; + /// WebSocket client with event-driven architecture pub struct WebSocketClient { config: WebSocketConfig, @@ -109,10 +113,16 @@ pub struct WebSocketClient { on_first_connect: Option, /// Callback triggered after successful reconnection (after the first connection) on_reconnected: Option, + /// Callback triggered when pong is received + on_heartbeat_ack: Option, // Reconnection state reconnect_attempts: Arc>, reconnect_delay_ms: Arc>, + + // Heartbeat state + /// Timestamp of last sent ping (milliseconds since Unix epoch) + last_ping_time: Arc>>, } impl WebSocketClient { @@ -136,8 +146,10 @@ impl WebSocketClient { on_reconnecting: None, on_first_connect: None, on_reconnected: None, + on_heartbeat_ack: None, reconnect_attempts: Arc::new(Mutex::new(0)), reconnect_delay_ms: Arc::new(Mutex::new(config.reconnect_delay_ms)), + last_ping_time: Arc::new(Mutex::new(None)), } } @@ -245,6 +257,17 @@ impl WebSocketClient { self } + /// Set callback for heartbeat acknowledgment (called when pong is received) + /// The callback receives: (latency_ms, server_timestamp) + /// Use this to monitor connection health and detect disconnections + pub fn on_heartbeat_ack(&mut self, callback: F) -> &mut Self + where + F: Fn(u64, String) + Send + Sync + 'static, + { + self.on_heartbeat_ack = Some(Arc::new(callback)); + self + } + /// Update the URL for future reconnection attempts /// Call this method when the server URL changes pub fn update_url(&mut self, new_url: String) { @@ -299,6 +322,7 @@ impl WebSocketClient { let reconnect_delay_ms = Arc::clone(&self.reconnect_delay_ms); let client_url = Arc::clone(&self.url); let sender = Arc::clone(&self.sender); + let last_ping_time = Arc::clone(&self.last_ping_time); // Callbacks let on_connected = self.on_connected.clone(); @@ -309,6 +333,7 @@ impl WebSocketClient { let on_reconnecting = self.on_reconnecting.clone(); let on_first_connect = self.on_first_connect.clone(); let on_reconnected = self.on_reconnected.clone(); + let on_heartbeat_ack = self.on_heartbeat_ack.clone(); // Spawn the WebSocket task *self.is_running.lock().await = true; @@ -333,6 +358,8 @@ impl WebSocketClient { on_first_connect, on_reconnected, sender, + last_ping_time, + on_heartbeat_ack, ) .await; }); @@ -366,6 +393,8 @@ impl WebSocketClient { on_first_connect: Option, on_reconnected: Option, sender: Arc>>>, + last_ping_time: Arc>>, + on_heartbeat_ack: Option, ) { loop { let should_run = *is_running.lock().await; @@ -384,7 +413,7 @@ impl WebSocketClient { // Create MessageSender for on_message callback (same underlying sender) let message_sender = MessageSender(Arc::clone(&sender)); - match Self::connect_and_handle(¤t_url, &config, &mut receiver, &queue, &on_connected, &on_message, &on_binary, &on_error, is_first, &on_first_connect, &on_reconnected, sender_clone, message_sender).await { + match Self::connect_and_handle(¤t_url, &config, &mut receiver, &queue, &on_connected, &on_message, &on_binary, &on_error, is_first, &on_first_connect, &on_reconnected, sender_clone, message_sender, last_ping_time.clone(), on_heartbeat_ack.clone()).await { Ok(_) => { if config.debug_mode { debug!("WebSocket connection closed normally"); @@ -475,6 +504,8 @@ impl WebSocketClient { on_reconnected: &Option, sender: Arc>>>, message_sender: MessageSender, + last_ping_time: Arc>>, + on_heartbeat_ack: Option, ) -> Result<(), WebSocketError> { use http::header::{HeaderName, HeaderValue}; use tokio_tungstenite::tungstenite::client::IntoClientRequest; @@ -521,26 +552,57 @@ impl WebSocketClient { callback(url.to_string()); } + // Clone sender before callbacks move it + let sender_for_callback = sender.clone(); + let sender_for_heartbeat = sender.clone(); + // Call on_first_connect callback (only on first connection, not on reconnect) if is_first { if let Some(ref callback) = on_first_connect { - callback(url.to_string(), sender).await; + callback(url.to_string(), sender_for_callback).await; } } else { // Call on_reconnected callback (after successful reconnection) if let Some(ref callback) = on_reconnected { - callback(url.to_string(), sender).await; + callback(url.to_string(), sender_for_callback).await; } } // Start heartbeat task if enabled + let ping_sender = sender_for_heartbeat; let heartbeat_handle = if config.heartbeat_interval_ms > 0 { let interval_duration = Duration::from_millis(config.heartbeat_interval_ms); + let ping_last_ping_time = last_ping_time.clone(); + let ping_debug = config.debug_mode; Some(tokio::spawn(async move { let mut ticker = tokio::time::interval(interval_duration); + // Wait for first tick immediately + ticker.tick().await; loop { + // Record ping timestamp + let now_ms = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_millis() as u64) + .unwrap_or(0); + if let Ok(mut guard) = ping_last_ping_time.try_lock() { + *guard = Some(now_ms); + } + + // Send ping message + let timestamp = format!("{}", chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f")); + let ping_msg = format!(r#"{{"Type":"ping","Data":{{"timestamp":"{}"}}}}"#, timestamp); + + if ping_debug { + debug!("[Heartbeat] Sending ping: {}", timestamp); + } + + if let Ok(guard) = ping_sender.try_lock() { + if let Some(ref tx) = *guard { + let _ = tx.send(OutgoingMessage::Text(ping_msg)).await; + } + } + ticker.tick().await; - // Heartbeat will be sent via the receiver } })) } else { @@ -565,6 +627,60 @@ impl WebSocketClient { .unwrap_or("unknown") .to_string(); + // Auto-reply to ping from server + if msg_type == "ping" { + let timestamp = parsed.get("Data") + .and_then(|d| d.get("timestamp")) + .and_then(|v| v.as_str()) + .unwrap_or("") + .to_string(); + let pong_msg = format!(r#"{{"Type":"pong","Data":{{"timestamp":"{}"}}}}"#, timestamp); + if config.debug_mode { + debug!("[Heartbeat] Received ping, sending pong"); + } + if let Ok(guard) = sender.try_lock() { + if let Some(ref tx) = *guard { + let _ = tx.try_send(OutgoingMessage::Text(pong_msg)); + } + } + // Don't pass ping to application callback + continue; + } + + // Handle pong response from server + if msg_type == "pong" { + let timestamp = parsed.get("Data") + .and_then(|d| d.get("timestamp")) + .and_then(|v| v.as_str()) + .unwrap_or("") + .to_string(); + + // Calculate latency + let now_ms = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_millis() as u64) + .unwrap_or(0); + let last_ping = if let Ok(guard) = last_ping_time.try_lock() { + *guard + } else { + None + }; + let latency_ms = last_ping + .map(|ping_time| now_ms.saturating_sub(ping_time)) + .unwrap_or(0); + + if config.debug_mode { + debug!("[Heartbeat] Received pong, latency: {}ms", latency_ms); + } + + // Trigger heartbeat ack callback + if let Some(ref callback) = on_heartbeat_ack { + callback(latency_ms, timestamp); + } + // Don't pass pong to application callback + continue; + } + if let Some(ref callback) = on_message { callback(msg_type, parsed, message_sender.clone()); } diff --git a/Windows/Rust/CubeLib/src/websocket/mod.rs b/Windows/Rust/CubeLib/src/websocket/mod.rs index 2d4aafb..86e4c2d 100644 --- a/Windows/Rust/CubeLib/src/websocket/mod.rs +++ b/Windows/Rust/CubeLib/src/websocket/mod.rs @@ -12,7 +12,7 @@ mod config; mod message; mod error; -pub use client::{WebSocketClient, ConnectionStatus, ReconnectingCallback, ReconnectedCallback, OutgoingMessage, MessageSender}; +pub use client::{WebSocketClient, ConnectionStatus, ReconnectingCallback, ReconnectedCallback, HeartbeatAckCallback, OutgoingMessage, MessageSender}; pub use config::WebSocketConfig; pub use message::{WebSocketMessage, ClientType}; pub use error::WebSocketError;