From e4390842869ffba2dc215dc518a7f5601f995f7b Mon Sep 17 00:00:00 2001 From: zqm Date: Wed, 8 Apr 2026 10:50:49 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=20MessageSender=20=E7=B1=BB?= =?UTF-8?q?=E5=9E=8B=EF=BC=88on=5Fmessage=20=E5=9B=9E=E8=B0=83=E7=9A=84?= =?UTF-8?q?=E7=AC=AC=E4=B8=89=E4=B8=AA=E5=8F=82=E6=95=B0=EF=BC=89=EF=BC=8C?= =?UTF-8?q?=E5=8F=AF=E5=9C=A8=E5=9B=9E=E8=B0=83=E4=B8=AD=E5=90=8C=E6=AD=A5?= =?UTF-8?q?=E5=8F=91=E9=80=81=E6=B6=88=E6=81=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Windows/Rust/CubeLib/src/lib.rs | 2 +- Windows/Rust/CubeLib/src/websocket/client.rs | 40 +++++++++++++++++--- Windows/Rust/CubeLib/src/websocket/mod.rs | 2 +- 3 files changed, 37 insertions(+), 7 deletions(-) diff --git a/Windows/Rust/CubeLib/src/lib.rs b/Windows/Rust/CubeLib/src/lib.rs index 7764351..a10c581 100644 --- a/Windows/Rust/CubeLib/src/lib.rs +++ b/Windows/Rust/CubeLib/src/lib.rs @@ -30,4 +30,4 @@ pub mod websocket; -pub use websocket::{WebSocketClient, WebSocketConfig, WebSocketMessage, OutgoingMessage, ConnectionStatus, ReconnectedCallback}; +pub use websocket::{WebSocketClient, WebSocketConfig, WebSocketMessage, OutgoingMessage, ConnectionStatus, ReconnectedCallback, MessageSender}; diff --git a/Windows/Rust/CubeLib/src/websocket/client.rs b/Windows/Rust/CubeLib/src/websocket/client.rs index b71203f..09e0802 100644 --- a/Windows/Rust/CubeLib/src/websocket/client.rs +++ b/Windows/Rust/CubeLib/src/websocket/client.rs @@ -51,7 +51,21 @@ impl std::fmt::Display for ConnectionStatus { /// Event callback types (wrapped in Arc for cloneability) pub type ConnectedCallback = Arc; pub type DisconnectedCallback = Arc; -pub type MessageCallback = Arc; +/// Callback for text messages - args: (msg_type, parsed_data, sender) +/// sender can be used to send messages back synchronously +#[derive(Clone)] +pub struct MessageSender(std::sync::Arc>>>); +impl MessageSender { + /// Send a text message synchronously (from within the sync callback) + pub fn send(&self, text: String) { + if let Ok(guard) = self.0.try_lock() { + if let Some(ref tx) = *guard { + let _ = tx.try_send(OutgoingMessage::Text(text)); + } + } + } +} +pub type MessageCallback = Arc; pub type BinaryCallback = Arc) + Send + Sync>; pub type ErrorCallback = Arc; pub type StatusCallback = Arc; @@ -153,9 +167,11 @@ impl WebSocketClient { } /// Set callback for text message received + /// The callback receives: (msg_type, parsed_data, sender) + /// The sender can be used to send messages back synchronously pub fn on_message(&mut self, callback: F) -> &mut Self where - F: Fn(String, Value) + Send + Sync + 'static, + F: Fn(String, Value, MessageSender) + Send + Sync + 'static, { self.on_message = Some(Arc::new(callback)); self @@ -239,6 +255,17 @@ impl WebSocketClient { *self.url.blocking_lock() = new_url; } + /// Send a text message through the WebSocket (can be called from any context) + /// Returns Ok(()) if the message was sent, Err(msg) if not connected + pub async fn send_text(&self, text: String) -> Result<(), String> { + let sender = self.sender.lock().await; + if let Some(ref tx) = *sender { + tx.send(OutgoingMessage::Text(text)).await.map_err(|e| e.to_string()) + } else { + Err("Not connected".to_string()) + } + } + // ==================== Connection Management ==================== /// Connect to the WebSocket server (uses dynamic URL that can be updated) @@ -354,8 +381,10 @@ impl WebSocketClient { // Clone sender for first_connect callback let sender_clone = Arc::clone(&sender); + // Create MessageSender for on_message callback (same underlying sender) + let message_sender = MessageSender(Arc::clone(&sender)); - match Self::connect_and_handle(¤t_url, &config, &mut receiver, &queue, &on_connected, &on_message, &on_binary, &on_error, is_first, &on_first_connect, &on_reconnected, sender_clone).await { + match Self::connect_and_handle(¤t_url, &config, &mut receiver, &queue, &on_connected, &on_message, &on_binary, &on_error, is_first, &on_first_connect, &on_reconnected, sender_clone, message_sender).await { Ok(_) => { if config.debug_mode { debug!("WebSocket connection closed normally"); @@ -445,6 +474,7 @@ impl WebSocketClient { on_first_connect: &Option, on_reconnected: &Option, sender: Arc>>>, + message_sender: MessageSender, ) -> Result<(), WebSocketError> { use http::header::{HeaderName, HeaderValue}; use tokio_tungstenite::tungstenite::client::IntoClientRequest; @@ -536,10 +566,10 @@ impl WebSocketClient { .to_string(); if let Some(ref callback) = on_message { - callback(msg_type, parsed); + callback(msg_type, parsed, message_sender.clone()); } } else if let Some(ref callback) = on_message { - callback("raw".to_string(), serde_json::json!(text)); + callback("raw".to_string(), serde_json::json!(text), message_sender.clone()); } } Some(Ok(tokio_tungstenite::tungstenite::Message::Binary(data))) => { diff --git a/Windows/Rust/CubeLib/src/websocket/mod.rs b/Windows/Rust/CubeLib/src/websocket/mod.rs index 3d6a416..2d4aafb 100644 --- a/Windows/Rust/CubeLib/src/websocket/mod.rs +++ b/Windows/Rust/CubeLib/src/websocket/mod.rs @@ -12,7 +12,7 @@ mod config; mod message; mod error; -pub use client::{WebSocketClient, ConnectionStatus, ReconnectingCallback, ReconnectedCallback, OutgoingMessage}; +pub use client::{WebSocketClient, ConnectionStatus, ReconnectingCallback, ReconnectedCallback, OutgoingMessage, MessageSender}; pub use config::WebSocketConfig; pub use message::{WebSocketMessage, ClientType}; pub use error::WebSocketError;