From 6cbd9e1e30ad52b5c601a0d10f3dc79e840014bd Mon Sep 17 00:00:00 2001 From: zqm Date: Tue, 7 Apr 2026 15:33:22 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=20CubeLib=20=E4=B8=AD?= =?UTF-8?q?=E7=9A=84=E5=8F=AF=E8=A7=81=E6=80=A7=E8=AD=A6=E5=91=8A=EF=BC=8C?= =?UTF-8?q?=E5=B0=86=20OutgoingMessage=20=E6=9E=9A=E4=B8=BE=E7=9A=84?= =?UTF-8?q?=E5=8F=AF=E8=A7=81=E6=80=A7=E4=BB=8E=20pub(crate)=20=E6=8F=90?= =?UTF-8?q?=E5=8D=87=E4=B8=BA=20pub?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Windows/Rust/CubeLib/src/websocket/client.rs | 138 +++++++++++++++++-- Windows/Rust/CubeLib/src/websocket/config.rs | 12 ++ Windows/Rust/CubeLib/src/websocket/mod.rs | 2 +- Windows/Rust/CubeLib/target/.rustc_info.json | 2 +- 4 files changed, 142 insertions(+), 12 deletions(-) diff --git a/Windows/Rust/CubeLib/src/websocket/client.rs b/Windows/Rust/CubeLib/src/websocket/client.rs index daf1657..ad8e825 100644 --- a/Windows/Rust/CubeLib/src/websocket/client.rs +++ b/Windows/Rust/CubeLib/src/websocket/client.rs @@ -10,7 +10,7 @@ use serde_json::Value; /// Message to send through the channel #[derive(Debug)] -enum OutgoingMessage { +pub enum OutgoingMessage { /// Text message Text(String), /// Binary message @@ -55,10 +55,18 @@ pub type BinaryCallback = Arc) + Send + Sync>; pub type ErrorCallback = Arc; pub type StatusCallback = Arc; pub type SentCallback = Arc; +/// Callback triggered before reconnecting +/// Arguments: (attempt_number, url_arc) - app can update the URL via url_arc +pub type ReconnectingCallback = Arc>) + Send + Sync>; +/// Callback triggered on first successful connection (before any reconnect) +/// Arguments: (url, send_fn) - send_fn can be called to send messages +pub type FirstConnectCallback = Arc>>>) + Send + Sync>; /// WebSocket client with event-driven architecture pub struct WebSocketClient { config: WebSocketConfig, + /// Dynamic URL (can be updated for reconnecting with new URL) + url: Arc>, status: Arc>, sender: Arc>>>, message_queue: Arc>>, @@ -73,6 +81,10 @@ pub struct WebSocketClient { on_error: Option, on_status_changed: Option, on_message_sent: Option, + /// Callback triggered before reconnecting (attempt number passed as argument) + on_reconnecting: Option, + /// Callback triggered on first successful connection + on_first_connect: Option, // Reconnection state reconnect_attempts: Arc>, @@ -83,7 +95,8 @@ impl WebSocketClient { /// Create a new WebSocket client pub fn new(config: WebSocketConfig) -> Self { Self { - config, + config: config.clone(), + url: Arc::new(Mutex::new(config.ws_url.clone())), status: Arc::new(Mutex::new(ConnectionStatus::Disconnected)), sender: Arc::new(Mutex::new(None)), message_queue: Arc::new(Mutex::new(Vec::new())), @@ -96,8 +109,10 @@ impl WebSocketClient { on_error: None, on_status_changed: None, on_message_sent: None, + on_reconnecting: None, + on_first_connect: None, reconnect_attempts: Arc::new(Mutex::new(0)), - reconnect_delay_ms: Arc::new(Mutex::new(1000)), + reconnect_delay_ms: Arc::new(Mutex::new(config.reconnect_delay_ms)), } } @@ -171,15 +186,48 @@ impl WebSocketClient { self } + /// Set callback for reconnecting (called before each reconnect attempt) + /// The callback receives: (attempt_number, url_arc) + /// Use url_arc to update the URL for the next connection attempt + pub fn on_reconnecting(&mut self, callback: F) -> &mut Self + where + F: Fn(u32, Arc>) + Send + Sync + 'static, + { + self.on_reconnecting = Some(Arc::new(callback)); + self + } + + /// Set callback for first successful connection (before any reconnect) + /// This is called only on the first connection, allowing the app to send initial messages + pub fn on_first_connect(&mut self, callback: F) -> &mut Self + where + F: Fn(String, Arc>>>) + Send + Sync + 'static, + { + self.on_first_connect = Some(Arc::new(callback)); + self + } + + /// Update the URL for future reconnection attempts + /// Call this method when the server URL changes + pub fn update_url(&mut self, new_url: String) { + if self.config.debug_mode { + debug!("Updating URL: {} -> {}", self.config.ws_url, new_url); + } + self.config.ws_url = new_url.clone(); + *self.url.blocking_lock() = new_url; + } + // ==================== Connection Management ==================== - /// Connect to the WebSocket server + /// Connect to the WebSocket server (uses dynamic URL that can be updated) + /// This method blocks until the connection ends completely (or is stopped via disconnect()) pub async fn connect(&mut self) { - let url = self.config.ws_url.clone(); + let url = self.url.lock().await.clone(); self.connect_with_url(&url).await; } /// Connect to a specific URL (overrides config URL) + /// Spawns the WebSocket task and waits for it to complete pub async fn connect_with_url(&mut self, url: &str) { let _old_status = *self.status.lock().await; self.set_status(ConnectionStatus::Connecting).await; @@ -200,6 +248,8 @@ impl WebSocketClient { let is_running = Arc::clone(&self.is_running); let reconnect_attempts = Arc::clone(&self.reconnect_attempts); let reconnect_delay_ms = Arc::clone(&self.reconnect_delay_ms); + let client_url = Arc::clone(&self.url); + let sender = Arc::clone(&self.sender); // Callbacks let on_connected = self.on_connected.clone(); @@ -207,6 +257,8 @@ impl WebSocketClient { let on_message = self.on_message.clone(); let on_binary = self.on_binary.clone(); let on_error = self.on_error.clone(); + let on_reconnecting = self.on_reconnecting.clone(); + let on_first_connect = self.on_first_connect.clone(); // Spawn the WebSocket task *self.is_running.lock().await = true; @@ -221,21 +273,31 @@ impl WebSocketClient { is_running, reconnect_attempts, reconnect_delay_ms, + client_url, on_connected, on_disconnected, on_message, on_binary, on_error, + on_reconnecting, + on_first_connect, + sender, ) .await; }); + // Store handle and wait for task to complete *self.task_handle.lock().await = Some(task_handle); + + // Wait for the task to complete (blocks until websocket_loop ends) + if let Some(handle) = self.task_handle.lock().await.take() { + let _ = handle.await; + } } /// Main WebSocket loop async fn websocket_loop( - url: String, + _url: String, // Initial URL (superseded by client_url for reconnecting) config: WebSocketConfig, mut receiver: mpsc::Receiver, status: Arc>, @@ -243,11 +305,15 @@ impl WebSocketClient { is_running: Arc>, reconnect_attempts: Arc>, reconnect_delay_ms: Arc>, + client_url: Arc>, on_connected: Option, on_disconnected: Option, on_message: Option, on_binary: Option, on_error: Option, + on_reconnecting: Option, + on_first_connect: Option, + sender: Arc>>>, ) { loop { let should_run = *is_running.lock().await; @@ -255,12 +321,46 @@ impl WebSocketClient { break; } - match Self::connect_and_handle(&url, &config, &mut receiver, &queue, &on_connected, &on_message, &on_binary, &on_error).await { + // Get current URL (may have been updated) + let current_url = client_url.lock().await.clone(); + + // Check if this is the first connection (not a reconnect) + let is_first = *reconnect_attempts.lock().await == 0; + + // Clone sender for first_connect callback + let sender_clone = Arc::clone(&sender); + + match Self::connect_and_handle(¤t_url, &config, &mut receiver, &queue, &on_connected, &on_message, &on_binary, &on_error, is_first, &on_first_connect, sender_clone).await { Ok(_) => { if config.debug_mode { debug!("WebSocket connection closed normally"); } - break; + // 即使正常关闭,也尝试重连(如果启用) + if config.reconnect && *is_running.lock().await { + let delay = *reconnect_delay_ms.lock().await; + let attempts = *reconnect_attempts.lock().await + 1; + *reconnect_attempts.lock().await = attempts; + + if config.debug_mode { + debug!("Reconnecting in {}ms (attempt {})", delay, attempts); + } + + // Trigger reconnecting callback + if let Some(ref callback) = on_reconnecting { + callback(attempts, client_url.clone()); + } + + *status.lock().await = ConnectionStatus::Reconnecting; + tokio::time::sleep(Duration::from_millis(delay)).await; + + // Exponential backoff + let new_delay = std::cmp::min(delay * 2, config.max_reconnect_delay_ms); + *reconnect_delay_ms.lock().await = new_delay; + + continue; + } else { + break; + } } Err(e) => { if config.debug_mode { @@ -280,6 +380,11 @@ impl WebSocketClient { debug!("Reconnecting in {}ms (attempt {})", delay, attempts); } + // Trigger reconnecting callback (allows app to update URL, reload config, etc.) + if let Some(ref callback) = on_reconnecting { + callback(attempts, client_url.clone()); + } + *status.lock().await = ConnectionStatus::Reconnecting; tokio::time::sleep(Duration::from_millis(delay)).await; @@ -311,6 +416,9 @@ impl WebSocketClient { on_message: &Option, on_binary: &Option, on_error: &Option, + is_first: bool, + on_first_connect: &Option, + sender: Arc>>>, ) -> Result<(), WebSocketError> { use http::header::{HeaderName, HeaderValue}; use tokio_tungstenite::tungstenite::client::IntoClientRequest; @@ -357,6 +465,13 @@ impl WebSocketClient { callback(url.to_string()); } + // Call on_first_connect callback (only on first connection, not on reconnect) + if is_first { + if let Some(ref callback) = on_first_connect { + callback(url.to_string(), sender); + } + } + // Start heartbeat task if enabled let heartbeat_handle = if config.heartbeat_interval_ms > 0 { let interval_duration = Duration::from_millis(config.heartbeat_interval_ms); @@ -605,8 +720,11 @@ impl WebSocketClient { impl Drop for WebSocketClient { fn drop(&mut self) { - // Attempt to disconnect synchronously - let _ = tokio::runtime::Handle::current().block_on(self.disconnect()); + // 不在这里调用异步 disconnect(),因为: + // 1. 无法在 Drop 中安全地使用 block_on (会导致 "Cannot start a runtime from within a runtime") + // 2. 调用者应该显式调用 disconnect() 方法 + // 3. Tokio 的 JoinHandle 会在 task 结束时自动清理资源 + // 如果 client 未被显式 disconnect() 就被 drop,资源会通过 JoinHandle 的 abort 自然清理 } } diff --git a/Windows/Rust/CubeLib/src/websocket/config.rs b/Windows/Rust/CubeLib/src/websocket/config.rs index e789944..46992c4 100644 --- a/Windows/Rust/CubeLib/src/websocket/config.rs +++ b/Windows/Rust/CubeLib/src/websocket/config.rs @@ -129,4 +129,16 @@ impl WebSocketConfig { self.heartbeat_interval_ms = interval_ms; self } + + /// Set initial reconnect delay in milliseconds + pub fn with_reconnect_delay(mut self, delay_ms: u64) -> Self { + self.reconnect_delay_ms = delay_ms; + self + } + + /// Set maximum reconnect delay in milliseconds + pub fn with_max_reconnect_delay(mut self, delay_ms: u64) -> Self { + self.max_reconnect_delay_ms = delay_ms; + self + } } diff --git a/Windows/Rust/CubeLib/src/websocket/mod.rs b/Windows/Rust/CubeLib/src/websocket/mod.rs index 8752350..811f17d 100644 --- a/Windows/Rust/CubeLib/src/websocket/mod.rs +++ b/Windows/Rust/CubeLib/src/websocket/mod.rs @@ -12,7 +12,7 @@ mod config; mod message; mod error; -pub use client::WebSocketClient; +pub use client::{WebSocketClient, ConnectionStatus, ReconnectingCallback, OutgoingMessage}; pub use config::WebSocketConfig; pub use message::{WebSocketMessage, ClientType}; pub use error::WebSocketError; diff --git a/Windows/Rust/CubeLib/target/.rustc_info.json b/Windows/Rust/CubeLib/target/.rustc_info.json index d391746..93bee1d 100644 --- a/Windows/Rust/CubeLib/target/.rustc_info.json +++ b/Windows/Rust/CubeLib/target/.rustc_info.json @@ -1 +1 @@ -{"rustc_fingerprint":17656983458485528297,"outputs":{"17747080675513052775":{"success":true,"status":"","code":0,"stdout":"rustc 1.94.1 (e408947bf 2026-03-25)\nbinary: rustc\ncommit-hash: e408947bfd200af42db322daf0fadfe7e26d3bd1\ncommit-date: 2026-03-25\nhost: x86_64-pc-windows-msvc\nrelease: 1.94.1\nLLVM version: 21.1.8\n","stderr":""},"7971740275564407648":{"success":true,"status":"","code":0,"stdout":"___.exe\nlib___.rlib\n___.dll\n___.dll\n___.lib\n___.dll\nC:\\Users\\xyzqm\\.rustup\\toolchains\\stable-x86_64-pc-windows-msvc\npacked\n___\ndebug_assertions\npanic=\"unwind\"\nproc_macro\ntarget_abi=\"\"\ntarget_arch=\"x86_64\"\ntarget_endian=\"little\"\ntarget_env=\"msvc\"\ntarget_family=\"windows\"\ntarget_feature=\"cmpxchg16b\"\ntarget_feature=\"fxsr\"\ntarget_feature=\"sse\"\ntarget_feature=\"sse2\"\ntarget_feature=\"sse3\"\ntarget_has_atomic=\"128\"\ntarget_has_atomic=\"16\"\ntarget_has_atomic=\"32\"\ntarget_has_atomic=\"64\"\ntarget_has_atomic=\"8\"\ntarget_has_atomic=\"ptr\"\ntarget_os=\"windows\"\ntarget_pointer_width=\"64\"\ntarget_vendor=\"pc\"\nwindows\n","stderr":""}},"successes":{}} \ No newline at end of file +{"rustc_fingerprint":17656983458485528297,"outputs":{"7971740275564407648":{"success":true,"status":"","code":0,"stdout":"___.exe\nlib___.rlib\n___.dll\n___.dll\n___.lib\n___.dll\nC:\\Users\\xyzqm\\.rustup\\toolchains\\stable-x86_64-pc-windows-msvc\npacked\n___\ndebug_assertions\npanic=\"unwind\"\nproc_macro\ntarget_abi=\"\"\ntarget_arch=\"x86_64\"\ntarget_endian=\"little\"\ntarget_env=\"msvc\"\ntarget_family=\"windows\"\ntarget_feature=\"cmpxchg16b\"\ntarget_feature=\"fxsr\"\ntarget_feature=\"sse\"\ntarget_feature=\"sse2\"\ntarget_feature=\"sse3\"\ntarget_has_atomic=\"128\"\ntarget_has_atomic=\"16\"\ntarget_has_atomic=\"32\"\ntarget_has_atomic=\"64\"\ntarget_has_atomic=\"8\"\ntarget_has_atomic=\"ptr\"\ntarget_os=\"windows\"\ntarget_pointer_width=\"64\"\ntarget_vendor=\"pc\"\nwindows\n","stderr":""},"17747080675513052775":{"success":true,"status":"","code":0,"stdout":"rustc 1.94.1 (e408947bf 2026-03-25)\nbinary: rustc\ncommit-hash: e408947bfd200af42db322daf0fadfe7e26d3bd1\ncommit-date: 2026-03-25\nhost: x86_64-pc-windows-msvc\nrelease: 1.94.1\nLLVM version: 21.1.8\n","stderr":""}},"successes":{}} \ No newline at end of file