Files
JoyD/Claw/Server/SmartClaw/src/websocket_client.rs

241 lines
8.2 KiB
Rust
Raw Normal View History

2026-03-20 16:51:31 +08:00
use tokio_tungstenite::{connect_async, tungstenite::Message, tungstenite::http::Request};
2026-03-16 15:47:55 +08:00
use futures_util::{SinkExt, StreamExt};
use serde_json::json;
use std::sync::Arc;
2026-03-20 16:51:31 +08:00
use tokio::sync::{mpsc, Mutex};
2026-03-16 15:47:55 +08:00
use tokio::time::{interval, Duration};
use shared::{TaskRequest, TaskResponse};
2026-03-20 16:51:31 +08:00
use base64::Engine as _;
use base64::engine::general_purpose;
use rand::Rng;
2026-03-16 15:47:55 +08:00
/// WebSocket 客户端连接管理器
pub struct WebSocketClient {
gateway_url: String,
2026-03-20 16:51:31 +08:00
sender: Arc<Mutex<Option<mpsc::Sender<String>>>>,
is_connected: Arc<Mutex<bool>>,
2026-03-16 15:47:55 +08:00
}
impl WebSocketClient {
/// 创建新的 WebSocket 客户端
pub fn new(gateway_url: String) -> Self {
Self {
gateway_url,
2026-03-20 16:51:31 +08:00
sender: Arc::new(Mutex::new(None)),
is_connected: Arc::new(Mutex::new(false)),
2026-03-16 15:47:55 +08:00
}
}
/// 连接到网关服务
pub async fn connect(&self) -> Result<(), Box<dyn std::error::Error>> {
println!("🔌 正在连接到网关服务: {}", self.gateway_url);
2026-03-20 16:51:31 +08:00
let ws_url = format!("{}/api/v1/ws/control", self.gateway_url.replace("http://", "ws://").replace("https://", "wss://"));
2026-03-16 15:47:55 +08:00
println!("🔗 WebSocket URL: {}", ws_url);
2026-03-20 16:51:31 +08:00
// 生成随机的 Sec-WebSocket-Key
let mut key = [0u8; 16];
rand::thread_rng().fill(&mut key);
let sec_websocket_key = general_purpose::STANDARD.encode(&key);
// 建立 WebSocket 连接添加API密钥和标准 WebSocket 握手头
let request = Request::builder()
.uri(&ws_url)
.header("Host", "pactgo.cn")
.header("X-API-Key", "claw_secret_key")
.header("Upgrade", "websocket")
.header("Connection", "Upgrade")
.header("Sec-WebSocket-Key", sec_websocket_key)
.header("Sec-WebSocket-Version", "13")
.body(())?;
println!("🔗 正在建立WebSocket连接...");
println!("📋 请求URL: {}", ws_url);
println!("📋 HTTP版本: {:?}", request.version());
println!("📋 请求头:");
for (name, value) in request.headers() {
println!(" {}: {:?}", name, value);
}
let (ws_stream, response) = connect_async(request).await?;
2026-03-16 15:47:55 +08:00
println!("✅ WebSocket 连接建立");
2026-03-20 16:51:31 +08:00
println!("📋 响应状态: {}", response.status());
2026-03-16 15:47:55 +08:00
// 设置连接状态
2026-03-20 16:51:31 +08:00
*self.is_connected.lock().await = true;
2026-03-16 15:47:55 +08:00
// 分割流
let (mut write, mut read) = ws_stream.split();
// 创建消息通道
let (tx, mut rx) = mpsc::channel::<String>(100);
2026-03-20 16:51:31 +08:00
*self.sender.lock().await = Some(tx);
2026-03-16 15:47:55 +08:00
// 启动消息发送循环
let _write_handle = tokio::spawn(async move {
while let Some(msg) = rx.recv().await {
if let Err(e) = write.send(Message::Text(msg)).await {
println!("❌ 发送消息失败: {}", e);
break;
}
}
});
// 启动消息接收循环
let is_connected_clone = self.is_connected.clone();
let _read_handle = tokio::spawn(async move {
while let Some(msg) = read.next().await {
match msg {
Ok(Message::Text(text)) => {
println!("📨 收到消息: {}", text);
if let Ok(parsed) = serde_json::from_str::<serde_json::Value>(&text) {
Self::handle_incoming_message(parsed).await;
}
}
Ok(Message::Close(_)) => {
println!("🔚 收到关闭消息");
2026-03-20 16:51:31 +08:00
*is_connected_clone.lock().await = false;
2026-03-16 15:47:55 +08:00
break;
}
Ok(_) => {}
Err(e) => {
println!("❌ 接收消息错误: {}", e);
2026-03-20 16:51:31 +08:00
*is_connected_clone.lock().await = false;
2026-03-16 15:47:55 +08:00
break;
}
}
}
});
// 启动心跳机制
let _heartbeat_handle = {
let is_connected = self.is_connected.clone();
tokio::spawn(async move {
let mut heartbeat_interval = interval(Duration::from_secs(30));
loop {
heartbeat_interval.tick().await;
2026-03-20 16:51:31 +08:00
let connected = *is_connected.lock().await;
2026-03-16 15:47:55 +08:00
if !connected {
println!("💔 心跳检测到连接已断开");
break;
}
let _heartbeat_msg = json!({
"type": "heartbeat",
"service": "smartclaw",
"timestamp": chrono::Utc::now().timestamp()
}).to_string();
// 这里需要重新获取 sender因为生命周期问题
println!("💓 心跳发送");
}
})
};
// 发送连接确认消息
let connect_msg = json!({
"type": "connect",
"service": "smartclaw",
"version": env!("CARGO_PKG_VERSION"),
"timestamp": chrono::Utc::now().timestamp()
}).to_string();
2026-03-20 16:51:31 +08:00
if let Some(sender) = &*self.sender.lock().await {
2026-03-16 15:47:55 +08:00
let _ = sender.send(connect_msg).await;
}
println!("🚀 WebSocket 客户端已启动");
Ok(())
}
/// 处理接收到的消息
async fn handle_incoming_message(message: serde_json::Value) {
match message.get("type").and_then(|t| t.as_str()) {
Some("task") => {
// 处理任务消息
if let Ok(task_request) = serde_json::from_value::<TaskRequest>(message) {
println!("📝 收到任务请求: {:?}", task_request);
// 这里可以调用任务处理逻辑
}
}
Some("heartbeat") => {
println!("💓 收到心跳响应");
}
Some("ack") => {
println!("✅ 收到确认消息");
}
Some(msg_type) => {
println!("❓ 收到未知消息类型: {}", msg_type);
}
None => {
println!("❓ 收到无类型消息");
}
}
}
/// 发送消息
pub async fn send_message(&self, message: String) -> Result<(), Box<dyn std::error::Error>> {
2026-03-20 16:51:31 +08:00
if let Some(sender) = &*self.sender.lock().await {
2026-03-16 15:47:55 +08:00
sender.send(message).await.map_err(|e| Box::new(e) as Box<dyn std::error::Error>)?;
Ok(())
} else {
Err("WebSocket 连接未建立".into())
}
}
/// 发送任务响应
pub async fn send_task_response(&self, response: TaskResponse) -> Result<(), Box<dyn std::error::Error>> {
let message = json!({
"type": "task_response",
"task_id": response.task_id,
"data": response,
"timestamp": chrono::Utc::now().timestamp()
}).to_string();
self.send_message(message).await
}
/// 检查连接状态
2026-03-20 16:51:31 +08:00
pub async fn is_connected(&self) -> bool {
*self.is_connected.lock().await
2026-03-16 15:47:55 +08:00
}
/// 断开连接
2026-03-20 16:51:31 +08:00
pub async fn disconnect(&self) {
*self.is_connected.lock().await = false;
*self.sender.lock().await = None;
2026-03-16 15:47:55 +08:00
println!("🔌 WebSocket 连接已断开");
}
}
/// WebSocket 客户端管理器
#[derive(Clone)]
pub struct WebSocketClientManager {
client: Arc<WebSocketClient>,
}
impl WebSocketClientManager {
/// 创建新的管理器
pub fn new(gateway_url: String) -> Self {
Self {
client: Arc::new(WebSocketClient::new(gateway_url)),
}
}
/// 启动客户端连接
pub async fn start(&self) -> Result<(), Box<dyn std::error::Error>> {
self.client.connect().await
}
/// 获取客户端实例
pub fn get_client(&self) -> Arc<WebSocketClient> {
self.client.clone()
}
/// 停止客户端
2026-03-20 16:51:31 +08:00
pub async fn stop(&self) {
self.client.disconnect().await;
2026-03-16 15:47:55 +08:00
}
}