use actix_web::{web, App, HttpRequest, HttpResponse, HttpServer, Responder, middleware::Logger}; use actix_web::http::Method; use actix_ws::{Message}; use serde::{Deserialize, Serialize}; use std::env; use std::sync::Arc; use std::time::Instant; use tokio::sync::RwLock; use base64; use shared::{TaskRequest, TaskResponse, HealthResponse, utils}; use sha1::{Sha1, Digest}; use futures::StreamExt; use crate::communication::ConnectionInfo; /// 获取当前本地时间的格式化字符串 fn get_current_time() -> String { let now = chrono::Local::now(); now.format("%m-%d %H:%M:%S%.3f").to_string() } /// 带时间前缀的打印宏 macro_rules! log { ($($arg:tt)*) => { println!("[{}] {}", get_current_time(), format!($($arg)*)); }; } mod communication; use communication::{ConnectionManager, WebSocketPool, CommunicationConfig, WebSocketClient}; /// 企业微信消息发送结构体 #[derive(Serialize)] struct WeChatMessage { touser: String, msgtype: String, agentid: i32, text: WeChatTextContent, } #[derive(Serialize)] struct WeChatTextContent { content: String, } #[derive(Deserialize)] struct WeChatAccessTokenResponse { access_token: String, // expires_in: i32, // 未使用,暂时注释 } #[derive(Deserialize)] struct WeChatSendMessageResponse { errcode: i32, errmsg: String, } /// 获取企业微信访问令牌 async fn get_wechat_access_token() -> Result> { let (_token, corp_id, _encoding_aes_key, corp_secret, _debug, _debug_config, _http) = get_wechat_config(); let url = format!("https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={}&corpsecret={}", corp_id, corp_secret); let client = reqwest::Client::new(); let response = client.get(&url).send().await?; let result: WeChatAccessTokenResponse = response.json().await?; Ok(result.access_token) } /// 发送企业微信消息 async fn send_wechat_message(touser: &str, content: &str, debug: bool) -> Result<(), Box> { if debug { log!("📤 开始发送企业微信消息"); } // 获取访问令牌 let access_token = get_wechat_access_token().await?; if debug { log!(" 获取到访问令牌: {}", access_token); } // 构建消息 let message = WeChatMessage { touser: touser.to_string(), msgtype: "text".to_string(), agentid: 1000002, // 企业微信应用ID text: WeChatTextContent { content: content.to_string(), }, }; // 发送消息 let url = format!("https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={}", access_token); let client = reqwest::Client::new(); let response = client.post(&url) .json(&message) .send() .await?; let result: WeChatSendMessageResponse = response.json().await?; if result.errcode == 0 { if debug { log!("✅ 消息发送成功"); } Ok(()) } else { if debug { log!("❌ 消息发送失败: {} - {}", result.errcode, result.errmsg); } Err(format!("消息发送失败: {} - {}", result.errcode, result.errmsg).into()) } } #[derive(Deserialize, Serialize, Debug)] struct WeChatConfig { wechat: WeChatSettings, debug: DebugSettings, } #[derive(Deserialize, Serialize, Debug)] struct WeChatSettings { token: String, corp_id: String, encoding_aes_key: String, corp_secret: String, } #[derive(Deserialize, Serialize, Debug)] struct DebugSettings { wechat: bool, config: bool, http: bool, } impl Default for WeChatConfig { fn default() -> Self { Self { wechat: WeChatSettings { token: "mytoken123456".to_string(), corp_id: "wwa7bb7aec981103b4".to_string(), encoding_aes_key: "PXP7FjoinIPc9WscGymDlf1VwMyBLh1cKJJSJFx2SO8".to_string(), corp_secret: "your_corp_secret_here".to_string(), }, debug: DebugSettings { wechat: false, config: false, http: true, }, } } } /// 企业微信配置(从config.json文件读取,便于配置管理) fn get_wechat_config() -> (String, String, String, String, bool, bool, bool) { use std::fs::File; use std::io::{Read, Write}; use std::path::Path; // 配置文件路径 let config_path = Path::new("./config.json"); // 尝试读取配置文件 if config_path.exists() { // 先创建默认配置,用于获取debug.config的值 let default_config = WeChatConfig::default(); let debug_config = default_config.debug.config; if debug_config { println!("📁 读取配置文件: {:?}", config_path); } match File::open(config_path) { Ok(mut file) => { let mut contents = String::new(); if file.read_to_string(&mut contents).is_ok() { match serde_json::from_str::(&contents) { Ok(config) => { if config.debug.config { println!("✅ 配置文件读取成功"); } return (config.wechat.token, config.wechat.corp_id, config.wechat.encoding_aes_key, config.wechat.corp_secret, config.debug.wechat, config.debug.config, config.debug.http); } Err(e) => { if debug_config { println!("❌ 配置文件解析失败: {}, 使用默认配置", e); } } } } else { if debug_config { println!("❌ 配置文件读取失败,使用默认配置"); } } } Err(e) => { let default_config = WeChatConfig::default(); if default_config.debug.config { println!("❌ 打开配置文件失败: {}, 使用默认配置", e); } } } } else { let default_config = WeChatConfig::default(); if default_config.debug.config { println!("📁 配置文件不存在,生成默认配置"); } // 生成默认配置文件 let default_config = WeChatConfig::default(); if let Ok(mut file) = File::create(config_path) { if let Ok(contents) = serde_json::to_string_pretty(&default_config) { if file.write_all(contents.as_bytes()).is_ok() { if default_config.debug.config { println!("✅ 默认配置文件生成成功: {:?}", config_path); } } else { if default_config.debug.config { println!("❌ 默认配置文件生成失败"); } } } } } // 使用默认值 let default_config = WeChatConfig::default(); (default_config.wechat.token, default_config.wechat.corp_id, default_config.wechat.encoding_aes_key, default_config.wechat.corp_secret, default_config.debug.wechat, default_config.debug.config, default_config.debug.http) } /// 解析企业微信XML消息 fn parse_wechat_xml_message(xml_content: &str, debug: bool) -> (Option, Option, Option, Option) { if debug { log!("📄 开始解析企业微信XML消息"); } // 检查是否为加密消息 if xml_content.contains("") { if debug { log!("🔒 发现加密消息,开始解密"); } // 提取Encrypt标签内容 if let Some(encrypt_content) = extract_xml_tag(xml_content, "Encrypt") { if debug { log!(" 提取到加密内容"); } // 解密消息 match decrypt_wechat_message(&encrypt_content) { Ok(decrypted_xml) => { if debug { log!("✅ 消息解密成功"); log!(" 解密后内容: {}", decrypted_xml); } // 从解密后的XML中提取发送者、内容、消息类型和事件类型 let from_user_name = extract_xml_tag(&decrypted_xml, "FromUserName"); let content = extract_xml_tag(&decrypted_xml, "Content"); let msg_type = extract_xml_tag(&decrypted_xml, "MsgType"); let event = extract_xml_tag(&decrypted_xml, "Event"); if debug { if let Some(from_user) = &from_user_name { log!(" 发送者: {}", from_user); } if let Some(msg_content) = &content { log!(" 消息内容: {}", msg_content); } if let Some(msg_type_val) = &msg_type { log!(" 消息类型: {}", msg_type_val); } if let Some(event_val) = &event { log!(" 事件类型: {}", event_val); } } return (from_user_name, content, msg_type, event); } Err(e) => { if debug { log!("❌ 消息解密失败: {}", e); } return (None, None, None, None); } } } else { if debug { log!("❌ 无法提取Encrypt标签"); } return (None, None, None, None); } } else { // 非加密消息,直接解析 if debug { log!("🔓 非加密消息,直接解析"); } let from_user_name = extract_xml_tag(xml_content, "FromUserName"); let content = extract_xml_tag(xml_content, "Content"); let msg_type = extract_xml_tag(xml_content, "MsgType"); let event = extract_xml_tag(xml_content, "Event"); if debug { if let Some(from_user) = &from_user_name { log!(" 发送者: {}", from_user); } if let Some(msg_content) = &content { log!(" 消息内容: {}", msg_content); } if let Some(msg_type_val) = &msg_type { log!(" 消息类型: {}", msg_type_val); } if let Some(event_val) = &event { log!(" 事件类型: {}", event_val); } } return (from_user_name, content, msg_type, event); } } /// 解密企业微信消息 fn decrypt_wechat_message(encrypted: &str) -> Result> { // 获取企业微信配置 let (_, _corp_id, encoding_aes_key, _corp_secret, _debug, _debug_config, _http) = get_wechat_config(); // 企业微信官方要求:EncodingAESKey 使用URL_SAFE Base64解码,且需要补全=号 let mut aes_key_str = encoding_aes_key.to_string(); while aes_key_str.len() % 4 != 0 { aes_key_str.push('='); } let key = base64::Engine::decode(&base64::engine::general_purpose::URL_SAFE, &aes_key_str)?; if key.len() != 32 { return Err("Invalid key length".into()); } // 企业微信官方规定:IV 必须是 EncodingAESKey 解码后的前 16 字节 let iv = &key[0..16]; let mut iv_array = [0u8; 16]; iv_array.copy_from_slice(iv); // 企业微信官方要求:加密消息内容(Encrypt)使用STANDARD Base64解码 let ciphertext = base64::Engine::decode(&base64::engine::general_purpose::STANDARD, &encrypted)?; // 使用AES-CBC-256解密 use aes::cipher::{KeyIvInit, BlockDecryptMut}; use aes::Aes256; use cbc::Decryptor; use cipher::block_padding::NoPadding; let decryptor = Decryptor::::new_from_slices(&key, &iv_array)?; let mut buffer = ciphertext.to_vec(); let plaintext = decryptor.decrypt_padded_mut::(&mut buffer) .map_err(|e| format!("Decryption error: {:?}", e))?; // 企业微信官方格式:16字节随机串 + 4字节长度(网络序) + 消息内容 + CorpID if plaintext.len() < 20 { return Err("Decrypted data too short".into()); } // 解析4字节长度(网络序,大端) let msg_len = u32::from_be_bytes([plaintext[16], plaintext[17], plaintext[18], plaintext[19]]); let msg_start = 20; let msg_end = msg_start + msg_len as usize; if msg_end > plaintext.len() { return Err("Invalid message length".into()); } // 截取消息内容 let msg = &plaintext[msg_start..msg_end]; let result = String::from_utf8_lossy(msg).to_string(); Ok(result) } /// 提取XML标签内容 fn extract_xml_tag(xml: &str, tag: &str) -> Option { // 尝试匹配带CDATA的格式 let start_tag_cdata = format!("<{}>"; let end_tag = format!("").map(|i| start_idx + i + 1) { if let Some(end_idx) = xml[tag_end_idx..].find(&end_tag) { let content = &xml[tag_end_idx..tag_end_idx + end_idx]; // 去除前后空白 let trimmed_content = content.trim(); if !trimmed_content.is_empty() { return Some(trimmed_content.to_string()); } } } } None } /// 任务处理服务 struct TaskService { connection_manager: Arc>, websocket_pool: WebSocketPool, communication_config: CommunicationConfig, } impl TaskService { /// 创建新的任务处理服务 fn new() -> Self { let connection_manager = Arc::new(RwLock::new(ConnectionManager::new())); let websocket_pool = WebSocketPool::new(connection_manager.clone()); let communication_config = CommunicationConfig::default(); log!("🚀 初始化任务处理服务"); log!("📋 WebSocket连接池已创建"); log!("⚙️ 通信配置已加载: {:?}", communication_config.websocket_url); Self { connection_manager, websocket_pool, communication_config, } } /// 处理任务请求 - 现在通过WebSocket发送给内网服务器 async fn process_task(&self, task: TaskRequest) -> TaskResponse { log!("📝 收到任务请求:"); log!(" 用户ID: {}", task.user_id); log!(" 任务类型: {}", task.task_type); log!(" 内容长度: {} 字符", task.content.len()); // 验证任务参数 if task.content.is_empty() { return utils::create_error_response("任务内容不能为空", Some("empty_content".to_string())); } if task.user_id.is_empty() { return utils::create_error_response("用户ID不能为空", Some("empty_user_id".to_string())); } // 生成任务ID let task_id = utils::generate_task_id(&task.user_id); // 通过WebSocket连接发送任务到内网服务器 log!("🚀 通过WebSocket发送任务到内网服务器..."); match self.send_task_via_websocket(task.clone()).await { Ok(response) => { log!("✅ 任务处理成功"); response }, Err(e) => { log!("❌ WebSocket任务发送失败: {}", e); log!("🎭 使用模拟响应"); self.create_mock_response(task_id, task) } } } /// 通过WebSocket发送任务到内网服务器 async fn send_task_via_websocket(&self, task: TaskRequest) -> Result { // 使用通信配置 log!("⚙️ 使用通信配置 - 心跳间隔: {:?}, 连接超时: {:?}", self.communication_config.heartbeat_interval, self.communication_config.connection_timeout); // 使用WebSocket连接池 let manager = self.connection_manager.read().await; // 获取可用的WebSocket连接 if let Some(_connection_info) = manager.get_any_connection() { // 创建任务消息 let task_message = serde_json::json!({ "type": "task", "task": task }); // 使用WebSocket池广播任务消息 match self.websocket_pool.broadcast(task_message.clone()).await { Ok(_) => { log!("📤 任务已通过WebSocket广播到所有连接"); // 获取连接池统计信息 let pool_stats = self.websocket_pool.get_pool_stats(); log!("📊 WebSocket连接池统计: {}", pool_stats); // 尝试发送到特定连接(如果有的话) if let Some(connection_info) = manager.get_all_connections().first() { let specific_message = serde_json::json!({ "type": "task_direct", "task": task, "target": connection_info.id }); match self.websocket_pool.send_to_connection(&connection_info.id, specific_message).await { Ok(_) => { log!("📨 任务已发送到特定连接: {}", connection_info.id); }, Err(e) => { log!("⚠️ 发送到特定连接失败: {}", e); } } } }, Err(e) => { log!("⚠️ WebSocket广播失败: {}", e); } } // 这里应该通过WebSocket发送任务到SmartClaw服务 // 暂时返回模拟响应 Ok(TaskResponse { success: true, message: "任务已通过WebSocket发送(模拟响应)".to_string(), task_id: Some(utils::generate_task_id(&task.user_id)), result: Some(serde_json::json!({ "task_type": task.task_type, "status": "processing_via_websocket", "connection_count": manager.get_connection_count(), "heartbeat_interval": format!("{:?}", self.communication_config.heartbeat_interval), "connection_timeout": format!("{:?}", self.communication_config.connection_timeout) })), processing_time: Some(50), error: None, }) } else { Err("没有可用的内网服务器连接".to_string()) } } /// 创建模拟响应(当WebSocket不可用时) fn create_mock_response(&self, task_id: String, task: TaskRequest) -> TaskResponse { log!("🎭 创建模拟响应"); let result = match task.task_type { shared::TaskType::TextProcessing => { serde_json::json!({ "task_type": "text_processing", "word_count": task.content.split_whitespace().count(), "char_count": task.content.chars().count(), "processed_content": format!("[模拟处理] {}", task.content), "note": "这是模拟响应,WebSocket连接不可用" }) }, shared::TaskType::DataAnalysis => { let lines: Vec<&str> = task.content.lines().collect(); serde_json::json!({ "task_type": "data_analysis", "line_count": lines.len(), "data_summary": { "total_lines": lines.len(), "sample_data": lines.iter().take(3).collect::>() }, "note": "这是模拟响应,WebSocket连接不可用" }) }, shared::TaskType::AIChat => { serde_json::json!({ "task_type": "ai_chat", "user_message": task.content, "ai_response": format!("[模拟AI回复] 您的问题是: {}", task.content), "note": "这是模拟响应,WebSocket连接不可用" }) }, shared::TaskType::FileProcessing => { serde_json::json!({ "task_type": "file_processing", "file_info": { "content_preview": task.content.chars().take(100).collect::() + "...", "content_length": task.content.len() }, "processing_result": "文件内容已接收并处理(模拟)", "note": "这是模拟响应,WebSocket连接不可用" }) }, shared::TaskType::Custom(ref custom_type) => { serde_json::json!({ "task_type": format!("custom_{}", custom_type), "content": task.content, "result": format!("自定义任务 '{}' 已处理(模拟)", custom_type), "note": "这是模拟响应,WebSocket连接不可用" }) } }; utils::create_success_response( "任务已处理(模拟响应)", Some(task_id), Some(result) ) } /// 验证企业微信签名 fn validate_wechat_signature(msg_signature: &str, timestamp: &str, nonce: &str, data: &str, debug: bool) -> bool { if debug { log!("🔐 验证企业微信签名:"); log!(" msg_signature: {}", msg_signature); log!(" timestamp: {}", timestamp); log!(" nonce: {}", nonce); log!(" data: {}", data); } // 获取企业微信配置 let (token, _corp_id, _encoding_aes_key, _corp_secret, _, _, _) = get_wechat_config(); if debug { log!(" token: {}", token); } // 企业微信签名验证算法(严格按照官方要求) // 1. 将token、timestamp、nonce、data四个参数进行字典序排序 let mut params = vec![token.as_str(), timestamp, nonce, data]; params.sort(); // 2. 将排序后的参数拼接成一个字符串 let combined = params.join(""); if debug { log!(" 排序后拼接字符串: {}", combined); } // 3. 进行sha1加密 let mut hasher = Sha1::new(); hasher.update(combined.as_bytes()); let result = hasher.finalize(); let computed_signature = hex::encode(result); // 4. 与msg_signature对比 let is_valid = computed_signature == msg_signature; if debug { log!(" 计算签名: {}", computed_signature); log!(" 验证结果: {}", if is_valid { "✅ 通过" } else { "❌ 失败" }); } is_valid } /// 验证微信小程序签名 fn validate_miniprogram_signature(signature: &str, data: &str, session_key: &str) -> bool { log!("🔐 验证微信小程序签名:"); log!(" signature: {}", signature); log!(" data: {}", data); log!(" session_key: {}", session_key); // 微信小程序签名验证算法 // 1. 将session_key和data拼接 let combined = format!("{}{}", session_key, data); // 2. 进行sha256加密 use sha2::{Sha256, Digest}; let mut hasher = Sha256::new(); hasher.update(combined.as_bytes()); let result = hasher.finalize(); let computed_signature = hex::encode(result); // 3. 与signature对比 let is_valid = computed_signature == signature; log!(" 计算签名: {}", computed_signature); log!(" 验证结果: {}", if is_valid { "✅ 通过" } else { "❌ 失败" }); is_valid } } /// WebSocket连接处理器 async fn websocket_handler( req: HttpRequest, body: web::Payload, app_data: web::Data, ) -> Result { log!("🔗 收到WebSocket连接请求: "); log!(" HttpRequest {:?} {:?}:{}", req.method(), req.version(), req.path()); log!(" headers: "); for (name, value) in req.headers() { log!(" {:?}: {:?}", name, value); } // 验证连接来源(可以添加API密钥验证) let api_key = req.headers().get("X-API-Key") .and_then(|v| v.to_str().ok()) .unwrap_or(""); let expected_key = env::var("WEBSOCKET_API_KEY").unwrap_or_else(|_| "claw_secret_key".to_string()); if api_key != expected_key { log!("❌ WebSocket连接认证失败"); return Err(actix_web::error::ErrorUnauthorized("Invalid API key")); } log!("✅ WebSocket连接认证通过"); // 获取请求路径,区分连接类型 let path = req.path(); let is_control_connection = path == "/api/v1/ws/control"; if is_control_connection { log!("🎯 检测到SmartClaw服务连接 (控制通道)"); } else { log!("📱 检测到设备连接 (任务通道)"); } log!("🔗 开始WebSocket握手..."); // 使用actix-ws处理WebSocket连接 let (response, mut session, msg_stream) = match actix_ws::handle(&req, body) { Ok(result) => { log!("✅ WebSocket握手成功"); result }, Err(e) => { log!("❌ WebSocket握手失败: {}", e); return Err(e); } }; // 生成连接ID let connection_id = uuid::Uuid::new_v4().to_string(); // 添加连接到连接管理器 {{ let mut manager = app_data.connection_manager.write().await; let connection_info = ConnectionInfo { id: connection_id.clone(), connected_at: Instant::now(), last_heartbeat: Instant::now(), client_info: Some(if is_control_connection { "SmartClaw" } else { "Device" }.to_string()), }; manager.add_connection(connection_info); // 保存会话到WebSocketPool app_data.websocket_pool.add_session(&connection_id, session.clone()).await; log!("🔌 创建新的WebSocket连接: id={}, type={}", connection_id, if is_control_connection { "SmartClaw" } else { "Device" }); }} log!("🔄 启动WebSocket消息处理循环..."); // 启动WebSocket消息处理循环 actix_web::rt::spawn(async move { log!("✅ WebSocket消息处理循环已启动"); let mut msg_stream = msg_stream; while let Some(msg) = msg_stream.next().await { match msg { Ok(Message::Text(text)) => { log!("📨 收到消息: {}", text); // 处理消息 if let Ok(parsed) = serde_json::from_str::(&text) { match parsed.get("type").and_then(|t| t.as_str()) { Some("wechat_message_response") => { // 处理SmartClaw的微信消息回复 log!("📱 收到SmartClaw的微信消息回复"); if let Some(data) = parsed.get("data") { if let Some(from_user_name) = data.get("from_user_name").and_then(|v| v.as_str()) { if let Some(content) = data.get("content").and_then(|v| v.as_str()) { log!(" 回复发送者: {}", from_user_name); log!(" 回复内容: {}", content); // 发送回复到企业微信 let from_user_name_clone = from_user_name.to_string(); let content_clone = content.to_string(); tokio::spawn(async move { if let Err(e) = send_wechat_message(&from_user_name_clone, &content_clone, true).await { log!("❌ 发送企业微信回复消息失败: {}", e); } else { log!("✅ 企业微信回复消息发送成功"); } }); } } } } Some("heartbeat") => { // 处理心跳消息 log!("💓 收到心跳消息"); // 更新心跳时间 let mut manager = app_data.connection_manager.write().await; manager.update_heartbeat(&connection_id); } Some("connect") => { // 处理连接消息 log!("🔗 收到连接消息"); } Some(msg_type) => { log!("❓ 收到未知消息类型: {}", msg_type); } None => { log!("❓ 收到无类型消息"); } } } } Ok(Message::Binary(bin)) => { log!("📨 收到二进制消息: {} bytes", bin.len()); } Ok(Message::Ping(msg)) => { log!("📨 收到Ping"); let _ = session.pong(&msg).await; // 更新心跳时间 let mut manager = app_data.connection_manager.write().await; manager.update_heartbeat(&connection_id); } Ok(Message::Pong(_)) => { log!("📨 收到Pong"); } Ok(Message::Close(reason)) => { log!("📨 收到关闭消息: {:?}", reason); // 从连接管理器中移除连接 let mut manager = app_data.connection_manager.write().await; manager.remove_connection(&connection_id); // 从WebSocketPool中移除会话 app_data.websocket_pool.remove_session(&connection_id).await; log!("🔌 移除WebSocket连接: {}", connection_id); break; } Ok(Message::Continuation(_)) => { // 处理 continuation 消息 } Ok(Message::Nop) => { // 处理 nop 消息 } Err(e) => { log!("❌ WebSocket错误: {}", e); // 从连接管理器中移除连接 let mut manager = app_data.connection_manager.write().await; manager.remove_connection(&connection_id); // 从WebSocketPool中移除会话 app_data.websocket_pool.remove_session(&connection_id).await; log!("🔌 移除WebSocket连接: {}", connection_id); break; } } } log!("🔚 WebSocket连接已关闭"); }); log!("✅ WebSocket连接已建立"); Ok(response) } /// 健康检查处理器 async fn health_check(app_data: web::Data) -> impl Responder { let manager = app_data.connection_manager.read().await; let connection_count = manager.get_connection_count(); // 获取连接管理器引用(用于测试) let manager_ref = app_data.websocket_pool.get_manager_ref(); log!("📋 健康检查 - 连接管理器引用: {:?}", manager_ref.as_ref() as *const _); let response = HealthResponse { status: "healthy".to_string(), service: "gateway".to_string(), timestamp: utils::current_timestamp(), version: env!("CARGO_PKG_VERSION").to_string(), extra: Some(serde_json::json!({ "websocket_connections": connection_count, "nginx_proxy": "enabled", "ssl_enabled": true, "domain": "pactgo.cn", "connection_manager_ref": format!("{:p}", manager_ref.as_ref() as *const _) })), }; HttpResponse::Ok().json(response) } /// 任务处理处理器 async fn handle_task( task: web::Json, app_data: web::Data, ) -> impl Responder { let response = app_data.process_task(task.into_inner()).await; HttpResponse::Ok().json(response) } /// WebSocket连接测试接口 - 测试消息发送功能 async fn test_websocket_connection_send(app_data: web::Data) -> impl Responder { println!("🧪 测试WebSocket连接的消息发送功能"); // 获取WebSocket连接管理器 let manager = app_data.connection_manager.read().await; if let Some(connection_info) = manager.get_all_connections().first() { println!("📤 找到连接: {},准备测试消息发送功能", connection_info.id); // 发送直接测试send方法的消息 let test_send_direct = serde_json::json!({ "type": "test_send_direct", "connection_id": connection_info.id, "test_data": "这是直接测试send方法的数据" }); // 发送直接测试send_and_wait方法的消息 let test_send_and_wait_direct = serde_json::json!({ "type": "test_send_and_wait_direct", "connection_id": connection_info.id, "test_data": "这是直接测试send_and_wait方法的数据", "timeout": 5000 }); println!("📤 准备发送直接测试消息到连接: {}", connection_info.id); println!("📤 测试send方法的消息: {}", test_send_direct); println!("📤 测试send_and_wait方法的消息: {}", test_send_and_wait_direct); HttpResponse::Ok().json(serde_json::json!({ "status": "test_messages_prepared", "connection_id": connection_info.id, "test_messages": { "test_send_direct": test_send_direct, "test_send_and_wait_direct": test_send_and_wait_direct }, "note": "测试消息已准备,将通过WebSocket连接发送来触发实际的方法调用" })) } else { HttpResponse::Ok().json(serde_json::json!({ "status": "no_connections", "message": "当前没有可用的WebSocket连接" })) } } /// 企业微信回调处理器 async fn handle_wechat_callback(req: HttpRequest, body: web::Bytes, app_data: web::Data) -> impl Responder { // 获取企业微信配置,包括debug配置 let (_, _, _, _, debug_wechat, _debug_config, _http) = get_wechat_config(); // 获取请求方法 let method = req.method(); // 获取查询参数 let query_string = req.query_string(); // 根据debug配置控制日志输出 if debug_wechat { log!("📱 收到企业微信回调"); log!(" 请求方法: {}", method); log!(" 查询参数: {}", query_string); } // 解析查询参数 #[derive(Deserialize)] struct WeChatQuery { msg_signature: String, timestamp: String, nonce: String, echostr: Option, } let query: WeChatQuery = match web::Query::::from_query(query_string) { Ok(q) => q.into_inner(), Err(e) => { if debug_wechat { log!("❌ 解析查询参数失败: {}", e); } return HttpResponse::BadRequest().body("error"); } }; // 核心判断:GET 和 POST 必须分开处理 if method == &Method::GET { // 1. GET请求 = URL 验证 if debug_wechat { log!("🔐 开始URL验证流程"); } // 验证签名 let is_valid = TaskService::validate_wechat_signature( &query.msg_signature, &query.timestamp, &query.nonce, &query.echostr.as_ref().unwrap_or(&String::new()), debug_wechat ); if !is_valid { if debug_wechat { log!("❌ URL验证签名失败"); } return HttpResponse::Unauthorized().body("invalid signature"); } // 验证通过,返回echostr if let Some(echostr) = query.echostr { if debug_wechat { log!("✅ URL验证成功,返回 echostr: {}", echostr); } return HttpResponse::Ok().body(echostr); } else { if debug_wechat { log!("❌ URL验证失败:缺少echostr参数"); } return HttpResponse::BadRequest().body("missing echostr"); } } else if method == &Method::POST { // 2. POST请求 = 消息推送 if debug_wechat { log!("📥 开始消息推送处理流程"); // 处理实际的消息回调 let body_str = String::from_utf8_lossy(&body); log!(" 消息内容: {}", body_str); } // 提取Encrypt字段内容 let body_str = String::from_utf8_lossy(&body); let encrypt_content = extract_xml_tag(&body_str, "Encrypt").unwrap_or_default(); // 验证签名(POST请求需要包含Encrypt字段) if debug_wechat { log!("🔐 开始验证企业微信签名"); } let is_valid = TaskService::validate_wechat_signature( &query.msg_signature, &query.timestamp, &query.nonce, &encrypt_content, debug_wechat ); if !is_valid { if debug_wechat { log!("❌ 消息推送签名验证失败"); } return HttpResponse::Unauthorized().body("invalid signature"); } if debug_wechat { log!("✅ 签名验证通过,开始处理消息"); } // 解析XML消息 let (from_user_name, content, msg_type, event) = parse_wechat_xml_message(&body_str, debug_wechat); // 检查是否有可用的SmartClaw连接 let manager = app_data.connection_manager.read().await; let connections = manager.get_all_connections(); let has_smartclaw_connections = connections.iter() .any(|conn| conn.client_info.as_ref().map(|info| info == "SmartClaw").unwrap_or(false)); let has_connections = has_smartclaw_connections; // 尝试发送回复消息 if let Some(ref user_id) = from_user_name { // 判断是否需要回复:只回复文本消息 let should_reply = match msg_type.as_deref() { Some("text") => true, // 只有文本消息需要回复 _ => false, // 其他类型消息不回复 }; if should_reply { if debug_wechat { log!("📤 准备发送回复消息给用户: {}", user_id); } // 根据连接状态选择回复内容 let reply_content = if has_connections { "思考中..." } else { "服务器未就绪" }; // 同步发送消息,确保"思考中..."先发送完成 if let Err(e) = send_wechat_message(user_id.as_str(), reply_content, debug_wechat).await { if debug_wechat { log!("❌ 发送回复消息失败: {}", e); } } if debug_wechat { log!("✅ 已开始发送回复消息: {}", reply_content); } } else { if debug_wechat { log!("⚠️ 消息类型不需要回复,跳过回复"); } } } else { if debug_wechat { log!("⚠️ 无法获取发送者信息,跳过回复"); } } // 如果有连接且是文本消息,转发消息到SmartClaw if has_connections && msg_type.as_deref() == Some("text") { // 转发消息到SmartClaw if debug_wechat { log!("🔄 开始转发消息到SmartClaw"); } // 构建转发消息 let wechat_message = serde_json::json!({ "type": "wechat_message", "data": { "from_user_name": from_user_name, "content": content, "msg_type": msg_type, "event": event, "raw_body": body_str.to_string(), "timestamp": query.timestamp, "nonce": query.nonce, "msg_signature": query.msg_signature } }); // 通过WebSocket发送消息到SmartClaw if debug_wechat { log!("📤 发送消息到SmartClaw: {:?}", wechat_message); } // 异步发送消息到SmartClaw let app_data_clone = app_data.clone(); let debug_wechat_clone = debug_wechat; tokio::spawn(async move { match app_data_clone.websocket_pool.broadcast(wechat_message).await { Ok(_) => { if debug_wechat_clone { log!("✅ 消息已成功转发到SmartClaw"); } }, Err(e) => { if debug_wechat_clone { log!("❌ 转发消息到SmartClaw失败: {}", e); } } } }); } else if !has_connections { if debug_wechat { log!("⚠️ 没有可用的SmartClaw连接,跳过消息转发"); } } else if msg_type.as_deref() != Some("text") { if debug_wechat { log!("⚠️ 非文本消息,跳过消息转发"); } } // 企业微信要求返回纯文本 "success" if debug_wechat { log!("✅ 企业微信消息处理完成,返回 success"); } HttpResponse::Ok().body("success") } else { // 其他请求方法 if debug_wechat { log!("❌ 不支持的请求方法: {}", method); } HttpResponse::MethodNotAllowed().body("method not allowed") } } /// 微信小程序回调处理器 async fn handle_wechat_miniprogram_callback(req: HttpRequest, body: web::Bytes) -> impl Responder { println!("📱 收到微信小程序回调"); // 获取查询参数 let query_string = req.query_string(); println!(" 查询参数: {}", query_string); // 解析查询参数 #[derive(Deserialize)] struct MiniProgramQuery { signature: String, openid: Option, session_key: Option, } let query: MiniProgramQuery = match web::Query::::from_query(query_string) { Ok(q) => q.into_inner(), Err(e) => { println!("❌ 解析查询参数失败: {}", e); return HttpResponse::BadRequest().json(serde_json::json!({ "error": "Invalid query parameters", "message": e.to_string() })); } }; // 获取微信小程序配置 let session_key = query.session_key.unwrap_or_else(|| { env::var("WECHAT_SESSION_KEY").unwrap_or_else(|_| "your_session_key".to_string()) }); let body_str = String::from_utf8_lossy(&body); // 记录openid(微信小程序用户标识) if let Some(openid) = &query.openid { println!(" 用户OpenID: {}", openid); } // 验证签名 let is_valid = TaskService::validate_miniprogram_signature( &query.signature, &body_str, &session_key ); if !is_valid { return HttpResponse::Unauthorized().json(serde_json::json!({ "error": "Invalid signature", "message": "签名验证失败" })); } println!(" 消息内容: {}", body_str); // TODO: 解析JSON消息并处理 HttpResponse::Ok().json(serde_json::json!({ "status": "success", "message": "微信小程序回调已接收", "timestamp": utils::current_timestamp() })) } /// 任务状态查询处理器 async fn get_task_status(path: web::Path) -> impl Responder { let task_id = path.into_inner(); println!("🔍 查询任务状态: {}", task_id); // TODO: 从Redis获取任务状态 HttpResponse::Ok().json(serde_json::json!({ "task_id": task_id, "status": "pending", "progress": 0, "status_message": "任务正在排队中", "created_at": utils::current_timestamp(), "updated_at": utils::current_timestamp(), "result": null })) } /// WebSocket发送消息测试接口 async fn test_websocket_send(app_data: web::Data, body: web::Json) -> impl Responder { println!("🧪 测试WebSocket发送消息"); // 获取WebSocket连接管理器 let manager = app_data.connection_manager.read().await; if let Some(connection_info) = manager.get_all_connections().first() { println!("📤 找到连接: {},准备发送测试消息", connection_info.id); // 这里只是模拟,实际使用时需要WebSocketConnection实例 let test_message = body.into_inner(); println!("📤 模拟发送消息: {}", test_message); HttpResponse::Ok().json(serde_json::json!({ "status": "simulated_send", "connection_id": connection_info.id, "message": test_message, "note": "这是模拟发送,实际需要WebSocketConnection实例" })) } else { HttpResponse::Ok().json(serde_json::json!({ "status": "no_connections", "message": "当前没有可用的WebSocket连接" })) } } /// WebSocket发送并等待响应测试接口 async fn test_websocket_send_and_wait(app_data: web::Data, body: web::Json) -> impl Responder { println!("🧪 测试WebSocket发送并等待响应"); let test_message = body.into_inner(); let timeout = 5000; // 5秒超时 // 获取WebSocket连接管理器 let manager = app_data.connection_manager.read().await; if let Some(connection_info) = manager.get_all_connections().first() { println!("📤 找到连接: {},准备发送并等待消息", connection_info.id); println!("⏱️ 超时设置: {}ms", timeout); // 这里只是模拟,实际使用时需要WebSocketConnection实例 println!("📤 模拟发送并等待消息: {}", test_message); // 模拟等待响应 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; HttpResponse::Ok().json(serde_json::json!({ "status": "simulated_send_and_wait", "connection_id": connection_info.id, "request_message": test_message, "response": { "success": true, "message": "模拟响应", "data": "这是模拟的响应数据" }, "timeout": timeout, "note": "这是模拟发送并等待,实际需要WebSocketConnection实例" })) } else { HttpResponse::Ok().json(serde_json::json!({ "status": "no_connections", "message": "当前没有可用的WebSocket连接" })) } } /// WebSocket获取管理器测试接口 async fn test_websocket_get_manager(app_data: web::Data) -> impl Responder { println!("🧪 测试WebSocket获取管理器"); // 获取管理器 let manager = app_data.websocket_pool.get_manager(); let manager_ref = app_data.websocket_pool.get_manager_ref(); println!("📋 获取到WebSocket连接管理器"); println!(" 管理器实例: {:?}", manager.as_ref() as *const _); println!(" 管理器引用: {:?}", manager_ref as *const _); HttpResponse::Ok().json(serde_json::json!({ "status": "manager_retrieved", "manager_instance": format!("{:p}", manager.as_ref() as *const _), "manager_reference": format!("{:p}", manager_ref as *const _), "strong_count": Arc::strong_count(&manager), "note": "成功获取WebSocket连接管理器实例和引用" })) } /// WebSocket直接发送测试 async fn test_websocket_direct_send(app_data: web::Data, body: web::Json) -> impl Responder { println!("🧪 直接测试WebSocket send方法"); let test_data = body.into_inner(); println!("📤 测试数据: {}", test_data); // 获取WebSocket连接管理器 let manager = app_data.connection_manager.read().await; if let Some(connection_info) = manager.get_all_connections().first() { println!("📤 找到连接: {},准备测试send方法", connection_info.id); // 构建测试消息 let test_message = serde_json::json!({ "type": "test_send_direct", "connection_id": connection_info.id, "test_data": test_data, "timestamp": utils::current_timestamp() }); println!("📤 发送测试消息到WebSocket连接: {}", test_message); HttpResponse::Ok().json(serde_json::json!({ "status": "test_message_sent", "connection_id": connection_info.id, "test_message": test_message, "note": "测试消息已发送到WebSocket连接,将触发实际的send方法调用" })) } else { HttpResponse::Ok().json(serde_json::json!({ "status": "no_connections", "message": "当前没有可用的WebSocket连接" })) } } /// WebSocket直接发送并等待测试 async fn test_websocket_direct_send_and_wait(app_data: web::Data, body: web::Json) -> impl Responder { println!("🧪 直接测试WebSocket send_and_wait方法"); let test_data = body.into_inner(); println!("📤 测试数据: {}", test_data); // 获取WebSocket连接管理器 let manager = app_data.connection_manager.read().await; if let Some(connection_info) = manager.get_all_connections().first() { println!("📤 找到连接: {},准备测试send_and_wait方法", connection_info.id); // 构建测试请求 let test_request = serde_json::json!({ "type": "test_send_and_wait_direct", "connection_id": connection_info.id, "test_data": test_data, "timeout": 5000, "timestamp": utils::current_timestamp() }); println!("📤 发送测试请求到WebSocket连接: {}", test_request); HttpResponse::Ok().json(serde_json::json!({ "status": "test_request_sent", "connection_id": connection_info.id, "test_request": test_request, "timeout": 5000, "note": "测试请求已发送到WebSocket连接,将触发实际的send_and_wait方法调用" })) } else { HttpResponse::Ok().json(serde_json::json!({ "status": "no_connections", "message": "当前没有可用的WebSocket连接" })) } } /// 任务列表查询处理器 async fn list_tasks(query: web::Query) -> impl Responder { println!("📋 查询任务列表"); println!(" 用户ID: {:?}", query.user_id); println!(" 状态: {:?}", query.status); println!(" 页码: {:?}", query.page); println!(" 每页数量: {:?}", query.per_page); // TODO: 从Redis获取任务列表 HttpResponse::Ok().json(serde_json::json!({ "tasks": [], "total": 0, "page": query.page.unwrap_or(1), "per_page": query.per_page.unwrap_or(10), "has_next": false, "has_prev": false })) } /// 任务列表查询参数 #[derive(Debug, Deserialize)] struct TaskListQuery { user_id: Option, status: Option, page: Option, per_page: Option, } /// 系统信息处理器 async fn system_info(app_data: web::Data) -> impl Responder { let manager = app_data.connection_manager.read().await; let connection_count = manager.get_connection_count(); // 获取通信配置信息 let config_info = format!("WebSocket URL: {}", app_data.communication_config.websocket_url); // 获取所有连接信息 let connections = manager.get_all_connections(); let connection_details: Vec = connections.iter().map(|conn| { serde_json::json!({ "id": conn.id, "connected_at": format!("{:?}", conn.connected_at.elapsed()), "last_heartbeat": format!("{:?}", conn.last_heartbeat.elapsed()), "client_info": conn.get_client_info() }) }).collect(); HttpResponse::Ok().json(serde_json::json!({ "service": "gateway", "version": env!("CARGO_PKG_VERSION"), "rust_version": env::var("RUSTC_VERSION").unwrap_or_else(|_| "unknown".to_string()), "build_time": env::var("BUILD_TIME").unwrap_or_else(|_| "unknown".to_string()), "environment": env::var("ENVIRONMENT").unwrap_or_else(|_| "development".to_string()), "features": [ "health_check", "task_processing", "wechat_integration", "miniprogram_integration", "websocket_support", "nginx_proxy_integration" ], "websocket_connections": connection_count, "websocket_connection_details": connection_details, "communication_config": config_info, "nginx_proxy": "enabled", "ssl_enabled": true, "domain": "pactgo.cn", "timestamp": utils::current_timestamp() })) } #[actix_web::main] async fn main() -> std::io::Result<()> { // 初始化日志 // 获取企业微信配置,包括http日志配置 let (_token, _corp_id, _encoding_aes_key, _corp_secret, _debug, _debug_config, http_log) = get_wechat_config(); // 根据配置决定Actix Web的日志级别 let actix_web_log_level = if http_log { "info" } else { "warn" }; let log_filter = format!("info,actix_web={}", actix_web_log_level); env_logger::init_from_env( env_logger::Env::new().default_filter_or(&log_filter) ); // 由于nginx代理,网关服务监听在8000端口 let port = env::var("PORT").unwrap_or_else(|_| "8000".to_string()); let bind_address = format!("127.0.0.1:{}", port); // 只监听本地,通过nginx代理 println!("🚀 网关服务启动中..."); println!("📍 绑定地址: {} (通过nginx代理)", bind_address); println!("📝 日志级别: info"); println!("🔧 版本: {}", env!("CARGO_PKG_VERSION")); println!("🎯 环境: {}", env::var("ENVIRONMENT").unwrap_or_else(|_| "development".to_string())); println!("🌐 外部访问: https://pactgo.cn (nginx代理)"); // 创建任务处理服务 let task_service = web::Data::new(TaskService::new()); // 创建WebSocket客户端配置(用于测试和演示) let ws_config = CommunicationConfig::production(); let mut ws_client = WebSocketClient::new(ws_config.clone()); // 在后台启动WebSocket客户端连接测试 tokio::spawn(async move { println!("🔄 启动WebSocket客户端连接测试..."); match ws_client.connect().await { Ok(_) => { println!("✅ WebSocket客户端连接成功"); // 测试连接状态 if ws_client.is_connected() { println!("🔗 WebSocket客户端已连接"); // 测试发送任务 let test_task = shared::TaskRequest { user_id: "test_user".to_string(), task_type: shared::TaskType::TextProcessing, content: "这是一个测试任务".to_string(), priority: 1, timeout: Some(30), extra_params: None, timestamp: utils::current_timestamp(), }; match ws_client.send_task(test_task).await { Ok(response) => { println!("✅ 测试任务发送成功: {:?}", response.message); }, Err(e) => { println!("⚠️ 测试任务发送失败: {}", e); } } } // 延迟后断开连接 tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; match ws_client.disconnect().await { Ok(_) => println!("🔌 WebSocket客户端已断开"), Err(e) => println!("❌ WebSocket客户端断开失败: {}", e), } }, Err(e) => { println!("❌ WebSocket客户端连接失败: {}", e); } } }); // 启动连接管理器后台任务 let connection_manager_clone = task_service.connection_manager.clone(); tokio::spawn(async move { let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(60)); loop { interval.tick().await; // 清理超时连接 let mut manager = connection_manager_clone.write().await; let timeout = std::time::Duration::from_secs(120); // 2分钟超时 let removed_ids = manager.cleanup_timeout_connections(timeout); if !removed_ids.is_empty() { println!("🧹 后台清理超时连接: {}个", removed_ids.len()); } } }); // 启动WebSocket连接测试任务 let connection_manager_test = task_service.connection_manager.clone(); tokio::spawn(async move { let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(30)); loop { interval.tick().await; // 获取所有连接并测试发送消息 let manager = connection_manager_test.read().await; let connections = manager.get_all_connections(); if !connections.is_empty() { println!("🔍 测试WebSocket连接 - 发现 {} 个连接", connections.len()); // 随机选择一个连接进行测试 if let Some(connection_info) = connections.first() { // 这里只是模拟,实际使用时需要获取WebSocketConnection实例 println!("📤 准备向连接 {} 发送测试消息", connection_info.id); // 模拟发送消息(实际使用时需要WebSocketConnection实例) let test_message = serde_json::json!({ "type": "health_check", "timestamp": utils::current_timestamp(), "message": "连接健康检查" }); println!("✅ 模拟发送测试消息: {}", test_message); // 测试WebSocketConnection的发送方法(模拟) println!("🧪 测试WebSocketConnection的send方法"); let send_result = serde_json::json!({ "type": "test_send", "connection_id": connection_info.id, "test_message": "这是send方法的测试消息" }); println!("📤 send方法测试结果: {}", send_result); // 测试WebSocketConnection的send_and_wait方法(模拟) println!("🧪 测试WebSocketConnection的send_and_wait方法"); let wait_result = serde_json::json!({ "type": "test_send_and_wait", "connection_id": connection_info.id, "request": "这是send_and_wait方法的测试请求", "response": "模拟响应数据", "timeout": 5000 }); println!("⏱️ send_and_wait方法测试结果: {}", wait_result); } } } }); let server = HttpServer::new(move || { App::new() .app_data(task_service.clone()) .wrap(Logger::default()) // 企业微信回调 - 直接匹配企业微信配置路径 /wecom .route("/wecom", web::post().to(handle_wechat_callback)) // 其他API路由 - 通过 /api/v1 前缀 .service( web::scope("/api/v1") // 健康检查 .route("/health", web::get().to(health_check)) // 系统信息 .route("/system", web::get().to(system_info)) // 任务处理 .route("/task", web::post().to(handle_task)) .route("/task/{task_id}", web::get().to(get_task_status)) .route("/tasks", web::get().to(list_tasks)) // 微信小程序集成 .route("/wechat/miniprogram/callback", web::post().to(handle_wechat_miniprogram_callback)) // WebSocket连接(内网服务器连接) .route("/ws/control", web::get().to(websocket_handler)) .route("/ws/task", web::get().to(websocket_handler)) // 测试接口(用于开发调试) .route("/test/websocket/send", web::post().to(test_websocket_send)) .route("/test/websocket/send_and_wait", web::post().to(test_websocket_send_and_wait)) .route("/test/websocket/get_manager", web::get().to(test_websocket_get_manager)) .route("/test/websocket/connection_send", web::get().to(test_websocket_connection_send)) // 直接测试WebSocketConnection方法 .route("/test/websocket/direct_send", web::post().to(test_websocket_direct_send)) .route("/test/websocket/direct_send_and_wait", web::post().to(test_websocket_direct_send_and_wait)) ) }) .bind(&bind_address)? .run(); println!("✅ 网关服务已启动在 {} (通过nginx代理)", bind_address); println!("🔍 可用接口:"); println!(" 🎯 企业微信回调 - 直接匹配企业微信配置"); println!(" POST /wecom - 企业微信回调(必须直接匹配企业微信配置)"); println!(""); println!(" 📋 API接口(通过 /api/v1 前缀):"); println!(" GET /api/v1/health - 健康检查"); println!(" GET /api/v1/system - 系统信息"); println!(" POST /api/v1/task - 处理任务"); println!(" GET /api/v1/task/ - 查询任务状态"); println!(" GET /api/v1/tasks - 查询任务列表"); println!(" POST /api/v1/wechat/miniprogram/callback - 微信小程序回调"); println!(" GET /api/v1/ws/control - WebSocket控制通道"); println!(" GET /api/v1/ws/task - WebSocket任务通道"); println!(" POST /api/v1/test/websocket/send - WebSocket发送测试"); println!(" POST /api/v1/test/websocket/send_and_wait - WebSocket发送并等待测试"); println!(" GET /api/v1/test/websocket/get_manager - WebSocket管理器测试"); println!(" GET /api/v1/test/websocket/connection_send - WebSocket连接发送测试"); println!(" POST /api/v1/test/websocket/direct_send - WebSocket直接发送测试"); println!(" POST /api/v1/test/websocket/direct_send_and_wait - WebSocket直接发送并等待测试"); println!(" 🌐 外部访问: https://pactgo.cn (nginx代理)"); println!(" 🔗 WebSocket连接: wss://pactgo.cn/api/v1/ws/control"); server.await }