feat(XCamera): 实现异步抓图功能并优化图像处理

fix(nginx): 调整企业微信回调代理路径

feat(gateway): 添加企业微信消息处理功能

docs: 更新项目规划文档和企业微信配置详情

refactor(XCamera): 重构LED检测和图像处理逻辑

test: 添加ONVIF抓图测试功能
This commit is contained in:
zqm
2026-03-31 11:04:43 +08:00
parent 1f5d05e6d6
commit cdf64fa31f
2605 changed files with 530175 additions and 1081 deletions

View File

@@ -0,0 +1,32 @@
{
"server": {
"port": "3001",
"host": "0.0.0.0"
},
"gateway": {
"url": "https://pactgo.cn",
"ws_path": "/api/v1/ws/control",
"api_key": "claw_secret_key"
},
"websocket": {
"heartbeat_interval": 30,
"connection_timeout": 60,
"reconnect_interval": 5,
"max_reconnect_attempts": 0
},
"task": {
"workers": 1,
"default_timeout": 60,
"queue_size": 100
},
"lmstudio": {
"enabled": false,
"url": "http://localhost:1234"
},
"debug": {
"log_level": "info",
"config": false,
"websocket": false,
"task": false
}
}

View File

@@ -0,0 +1,418 @@
````lms_hstack
`POST /api/v1/chat`
**Request body**
```lms_params
- name: model
type: string
optional: false
description: Unique identifier for the model to use.
- name: input
type: string | array<object>
optional: false
description: Message to send to the model.
children:
- name: Input text
unstyledName: true
type: string
description: Text content of the message.
- name: Input object
unstyledName: true
type: object
description: Object representing a message with additional metadata.
children:
- name: Text Input
type: object
optional: true
description: Text input to provide user messages
children:
- name: type
type: '"message"'
optional: false
description: Type of input item.
- name: content
type: string
description: Text content of the message.
optional: false
- name: Image Input
type: object
optional: true
description: Image input to provide user messages
children:
- name: type
type: '"image"'
optional: false
description: Type of input item.
- name: data_url
type: string
description: Image data as a base64-encoded data URL.
optional: false
- name: system_prompt
type: string
optional: true
description: System message that sets model behavior or instructions.
- name: integrations
type: array<string | object>
optional: true
description: List of integrations (plugins, ephemeral MCP servers, etc...) to enable for this request.
children:
- name: Plugin id
unstyledName: true
type: string
description: Unique identifier of a plugin to use. Plugins contain `mcp.json` installed MCP servers (id `mcp/<server_label>`). Shorthand for plugin object with no custom configuration.
- name: Plugin
unstyledName: true
type: object
description: Specification of a plugin to use. Plugins contain `mcp.json` installed MCP servers (id `mcp/<server_label>`).
children:
- name: type
type: '"plugin"'
optional: false
description: Type of integration.
- name: id
type: string
optional: false
description: Unique identifier of the plugin.
- name: allowed_tools
type: array<string>
optional: true
description: List of tool names the model can call from this plugin. If not provided, all tools from the plugin are allowed.
- name: Ephemeral MCP server specification
unstyledName: true
type: object
description: Specification of an ephemeral MCP server. Allows defining MCP servers on-the-fly without needing to pre-configure them in your `mcp.json`.
children:
- name: type
type: '"ephemeral_mcp"'
optional: false
description: Type of integration.
- name: server_label
type: string
optional: false
description: Label to identify the MCP server.
- name: server_url
type: string
optional: false
description: URL of the MCP server.
- name: allowed_tools
type: array<string>
optional: true
description: List of tool names the model can call from this server. If not provided, all tools from the server are allowed.
- name: headers
type: object
optional: true
description: Custom HTTP headers to send with requests to the server.
- name: stream
type: boolean
optional: true
description: Whether to stream partial outputs via SSE. Default `false`. See [streaming events](/docs/developer/rest/streaming-events) for more information.
- name: temperature
type: number
optional: true
description: Randomness in token selection. 0 is deterministic, higher values increase creativity [0,1].
- name: top_p
type: number
optional: true
description: Minimum cumulative probability for the possible next tokens [0,1].
- name: top_k
type: integer
optional: true
description: Limits next token selection to top-k most probable tokens.
- name: min_p
type: number
optional: true
description: Minimum base probability for a token to be selected for output [0,1].
- name: repeat_penalty
type: number
optional: true
description: Penalty for repeating token sequences. 1 is no penalty, higher values discourage repetition.
- name: max_output_tokens
type: integer
optional: true
description: Maximum number of tokens to generate.
- name: reasoning
type: '"off" | "low" | "medium" | "high" | "on"'
optional: true
description: Reasoning setting. Will error if the model being used does not support the reasoning setting using. Defaults to the automatically chosen setting for the model.
- name: context_length
type: integer
optional: true
description: Number of tokens to consider as context. Higher values recommended for MCP usage.
- name: store
type: boolean
optional: true
description: Whether to store the chat. If set, response will return a `"response_id"` field. Default `true`.
- name: previous_response_id
type: string
optional: true
description: Identifier of existing response to append to. Must start with `"resp_"`.
```
:::split:::
```lms_code_snippet
variants:
Request with MCP:
language: bash
code: |
curl http://localhost:1234/api/v1/chat \
-H "Authorization: Bearer $LM_API_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"model": "ibm/granite-4-micro",
"input": "Tell me the top trending model on hugging face and navigate to https://lmstudio.ai",
"integrations": [
{
"type": "ephemeral_mcp",
"server_label": "huggingface",
"server_url": "https://huggingface.co/mcp",
"allowed_tools": [
"model_search"
]
},
{
"type": "plugin",
"id": "mcp/playwright",
"allowed_tools": [
"browser_navigate"
]
}
],
"context_length": 8000,
"temperature": 0
}'
Request with Images:
language: bash
code: |
# Image is a small red square encoded as a base64 data URL
curl http://localhost:1234/api/v1/chat \
-H "Authorization: Bearer $LM_API_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"model": "qwen/qwen3-vl-4b",
"input": [
{
"type": "text",
"content": "Describe this image in two sentences"
},
{
"type": "image",
"data_url": "data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAAAoAAAAKCAYAAACNMs+9AAAAFUlEQVR42mP8z8BQz0AEYBxVSF+FABJADveWkH6oAAAAAElFTkSuQmCC"
}
],
"context_length": 2048,
"temperature": 0
}'
```
````
---
````lms_hstack
**Response fields**
```lms_params
- name: model_instance_id
type: string
description: Unique identifier for the loaded model instance that generated the response.
- name: output
type: array<object>
description: Array of output items generated. Each item can be one of three types.
children:
- name: Message
unstyledName: true
type: object
description: A text message from the model.
children:
- name: type
type: '"message"'
description: Type of output item.
- name: content
type: string
description: Text content of the message.
- name: Tool call
unstyledName: true
type: object
description: A tool call made by the model.
children:
- name: type
type: '"tool_call"'
description: Type of output item.
- name: tool
type: string
description: Name of the tool called.
- name: arguments
type: object
description: Arguments passed to the tool. Can have any keys/values depending on the tool definition.
- name: output
type: string
description: Result returned from the tool.
- name: provider_info
type: object
description: Information about the tool provider.
children:
- name: type
type: '"plugin" | "ephemeral_mcp"'
description: Provider type.
- name: plugin_id
type: string
optional: true
description: Identifier of the plugin (when `type` is `"plugin"`).
- name: server_label
type: string
optional: true
description: Label of the MCP server (when `type` is `"ephemeral_mcp"`).
- name: Reasoning
unstyledName: true
type: object
description: Reasoning content from the model.
children:
- name: type
type: '"reasoning"'
description: Type of output item.
- name: content
type: string
description: Text content of the reasoning.
- name: Invalid tool call
unstyledName: true
type: object
description: An invalid tool call made by the model - due to invalid tool name or tool arguments.
children:
- name: type
type: '"invalid_tool_call"'
description: Type of output item.
- name: reason
type: string
description: Reason why the tool call was invalid.
- name: metadata
type: object
description: Metadata about the invalid tool call.
children:
- name: type
type: '"invalid_name" | "invalid_arguments"'
description: Type of error that occurred.
- name: tool_name
type: string
description: Name of the tool that was attempted to be called.
- name: arguments
type: object
optional: true
description: Arguments that were passed to the tool (only present for `invalid_arguments` errors).
- name: provider_info
type: object
optional: true
description: Information about the tool provider (only present for `invalid_arguments` errors).
children:
- name: type
type: '"plugin" | "ephemeral_mcp"'
description: Provider type.
- name: plugin_id
type: string
optional: true
description: Identifier of the plugin (when `type` is `"plugin"`).
- name: server_label
type: string
optional: true
description: Label of the MCP server (when `type` is `"ephemeral_mcp"`).
- name: stats
type: object
description: Token usage and performance metrics.
children:
- name: input_tokens
type: number
description: Number of input tokens. Includes formatting, tool definitions, and prior messages in the chat.
- name: total_output_tokens
type: number
description: Total number of output tokens generated.
- name: reasoning_output_tokens
type: number
description: Number of tokens used for reasoning.
- name: tokens_per_second
type: number
description: Generation speed in tokens per second.
- name: time_to_first_token_seconds
type: number
description: Time in seconds to generate the first token.
- name: model_load_time_seconds
type: number
optional: true
description: Time taken to load the model for this request in seconds. Present only if the model was not already loaded.
- name: response_id
type: string
optional: true
description: Identifier of the response for subsequent requests. Starts with `"resp_"`. Present when `store` is `true`.
```
:::split:::
```lms_code_snippet
variants:
Request with MCP:
language: json
code: |
{
"model_instance_id": "ibm/granite-4-micro",
"output": [
{
"type": "tool_call",
"tool": "model_search",
"arguments": {
"sort": "trendingScore",
"query": "",
"limit": 1
},
"output": "...",
"provider_info": {
"server_label": "huggingface",
"type": "ephemeral_mcp"
}
},
{
"type": "message",
"content": "..."
},
{
"type": "tool_call",
"tool": "browser_navigate",
"arguments": {
"url": "https://lmstudio.ai"
},
"output": "...",
"provider_info": {
"plugin_id": "mcp/playwright",
"type": "plugin"
}
},
{
"type": "message",
"content": "**Top Trending Model on HuggingFace** ... Below is a quick snapshot of whats on the landing page ... more details on the model or LMStudio itself!"
}
],
"stats": {
"input_tokens": 646,
"total_output_tokens": 586,
"reasoning_output_tokens": 0,
"tokens_per_second": 29.753900615398926,
"time_to_first_token_seconds": 1.088,
"model_load_time_seconds": 2.656
},
"response_id": "resp_4ef013eba0def1ed23f19dde72b67974c579113f544086de"
}
Request with Images:
language: json
code: |
{
"model_instance_id": "qwen/qwen3-vl-4b",
"output": [
{
"type": "message",
"content": "This image is a solid, vibrant red square that fills the entire frame, with no discernible texture, pattern, or other elements. It presents a minimalist, uniform visual field of pure red, evoking a sense of boldness or urgency."
}
],
"stats": {
"input_tokens": 17,
"total_output_tokens": 50,
"reasoning_output_tokens": 0,
"tokens_per_second": 51.03762685242662,
"time_to_first_token_seconds": 0.814
},
"response_id": "resp_0182bd7c479d7451f9a35471f9c26b34de87a7255856b9a4"
}
```
````

View File

@@ -1,10 +1,142 @@
use actix_web::{web, App, HttpRequest, HttpResponse, HttpServer, Responder, middleware::Logger};
use std::env;
use std::fs::File;
use std::io::{Read, Write};
use std::path::Path;
use shared::{TaskRequest, TaskResponse, HealthResponse, utils};
use serde::{Deserialize, Serialize};
mod websocket_client;
use websocket_client::WebSocketClientManager;
#[derive(Deserialize, Serialize, Debug)]
struct Config {
server: ServerConfig,
gateway: GatewayConfig,
websocket: WebSocketConfig,
task: TaskConfig,
lmstudio: LmStudioConfig,
debug: DebugConfig,
}
#[derive(Deserialize, Serialize, Debug)]
struct ServerConfig {
port: String,
host: String,
}
#[derive(Deserialize, Serialize, Debug)]
struct GatewayConfig {
url: String,
ws_path: String,
api_key: String,
}
#[derive(Deserialize, Serialize, Debug)]
struct WebSocketConfig {
heartbeat_interval: u64,
connection_timeout: u64,
reconnect_interval: u64,
max_reconnect_attempts: u32,
}
#[derive(Deserialize, Serialize, Debug)]
struct TaskConfig {
workers: u32,
default_timeout: u64,
queue_size: usize,
}
#[derive(Deserialize, Serialize, Debug)]
struct LmStudioConfig {
enabled: bool,
url: String,
}
#[derive(Deserialize, Serialize, Debug)]
struct DebugConfig {
log_level: String,
config: bool,
websocket: bool,
task: bool,
}
impl Default for Config {
fn default() -> Self {
Self {
server: ServerConfig {
port: "3001".to_string(),
host: "0.0.0.0".to_string(),
},
gateway: GatewayConfig {
url: "https://pactgo.cn".to_string(),
ws_path: "/api/v1/ws/control".to_string(),
api_key: "claw_secret_key".to_string(),
},
websocket: WebSocketConfig {
heartbeat_interval: 30,
connection_timeout: 60,
reconnect_interval: 5,
max_reconnect_attempts: 0,
},
task: TaskConfig {
workers: 1,
default_timeout: 60,
queue_size: 100,
},
lmstudio: LmStudioConfig {
enabled: true,
url: "http://12.0.0.1:1234".to_string(),
},
debug: DebugConfig {
log_level: "info".to_string(),
config: false,
websocket: false,
task: false,
},
}
}
}
fn load_config() -> Config {
let config_path = Path::new("./config.json");
if config_path.exists() {
match File::open(config_path) {
Ok(mut file) => {
let mut contents = String::new();
if file.read_to_string(&mut contents).is_ok() {
match serde_json::from_str::<Config>(&contents) {
Ok(config) => {
if config.debug.config {
println!("✅ 配置文件读取成功");
}
return config;
}
Err(e) => {
println!("❌ 配置文件解析失败: {}, 使用默认配置", e);
}
}
}
}
Err(e) => {
println!("❌ 打开配置文件失败: {}, 使用默认配置", e);
}
}
} else {
println!("📁 配置文件不存在,生成默认配置");
if let Ok(mut file) = File::create(config_path) {
let default_config = Config::default();
if let Ok(contents) = serde_json::to_string_pretty(&default_config) {
let _ = file.write_all(contents.as_bytes());
println!("✅ 默认配置文件生成成功");
}
}
}
Config::default()
}
/// 智能控制核心服务
struct SmartClawService;
@@ -307,13 +439,20 @@ async fn websocket_stop(ws_manager: web::Data<WebSocketClientManager>) -> impl R
/// WebSocket 连接处理器(用于与网关服务通信)
async fn websocket_handler(req: HttpRequest, _stream: web::Payload) -> impl Responder {
// TODO: 实现 WebSocket 通信
// 这将用于接收来自网关服务的实时任务
println!("📡 WebSocket 连接请求: {:?}", req);
// 获取WebSocket客户端实例用于测试和演示
let gateway_url = env::var("GATEWAY_URL").unwrap_or_else(|_| "http://localhost:8000".to_string());
let ws_manager = WebSocketClientManager::new(gateway_url);
let config = load_config();
let ws_manager = WebSocketClientManager::new(
config.gateway.url.clone(),
config.gateway.ws_path.clone(),
config.gateway.api_key.clone(),
config.websocket.heartbeat_interval,
config.websocket.reconnect_interval,
config.websocket.max_reconnect_attempts,
config.debug.websocket,
config.lmstudio.url.clone(),
config.lmstudio.enabled,
);
// 获取客户端实例(用于测试)
let client = ws_manager.get_client();
@@ -389,38 +528,54 @@ async fn queue_status() -> impl Responder {
#[actix_web::main]
async fn main() -> std::io::Result<()> {
// 初始化日志
let config = load_config();
let log_filter = format!("{},actix_web={}", config.debug.log_level, config.debug.log_level);
env_logger::init_from_env(
env_logger::Env::new().default_filter_or("info,actix_web=info")
env_logger::Env::new().default_filter_or(&log_filter)
);
let port = env::var("PORT").unwrap_or_else(|_| "3001".to_string());
let bind_address = format!("0.0.0.0:{}", port);
// 获取网关服务地址
let gateway_url = env::var("GATEWAY_URL").unwrap_or_else(|_| "https://pactgo.cn".to_string());
let bind_address = format!("{}:{}", config.server.host, config.server.port);
println!("🚀 SmartClaw 服务启动中...");
println!("📍 绑定地址: {}", bind_address);
println!("🔗 网关服务: {}", gateway_url);
println!("📝 日志级别: info");
println!("🔗 网关服务: {}", config.gateway.url);
println!("📝 日志级别: {}", config.debug.log_level);
println!("🔧 版本: {}", env!("CARGO_PKG_VERSION"));
println!("🎯 环境: {}", env::var("ENVIRONMENT").unwrap_or_else(|_| "development".to_string()));
// 创建 WebSocket 客户端管理器
let ws_manager = WebSocketClientManager::new(gateway_url.clone());
if config.debug.config {
println!("📋 配置信息: {:?}", config);
}
// 启动 WebSocket 连接(在后台任务中)
let ws_manager = WebSocketClientManager::new(
config.gateway.url.clone(),
config.gateway.ws_path.clone(),
config.gateway.api_key.clone(),
config.websocket.heartbeat_interval,
config.websocket.reconnect_interval,
config.websocket.max_reconnect_attempts,
config.debug.websocket,
config.lmstudio.url.clone(),
config.lmstudio.enabled,
);
// 启动 WebSocket 连接并自动重连
let ws_manager_for_spawn = ws_manager.clone();
let max_reconnect = config.websocket.max_reconnect_attempts;
tokio::spawn(async move {
println!("🔄 正在启动 WebSocket 客户端连接...");
match ws_manager_for_spawn.start().await {
Ok(_) => {
println!("✅ WebSocket 客户端连接成功");
}
Err(e) => {
println!("❌ WebSocket 客户端连接失败: {}", e);
// 这里可以添加重试逻辑
if max_reconnect == 0 {
println!("🔄 正在启动 WebSocket 客户端(自动重连模式)...");
ws_manager_for_spawn.start_with_reconnect().await;
} else {
println!("🔄 正在启动 WebSocket 客户端(重连 {} 次)...", max_reconnect);
match ws_manager_for_spawn.start().await {
Ok(_) => {
println!("✅ WebSocket 客户端连接成功");
}
Err(e) => {
println!("❌ WebSocket 客户端连接失败: {}", e);
}
}
}
});
@@ -450,10 +605,11 @@ async fn main() -> std::io::Result<()> {
.route("/websocket/stop", web::post().to(websocket_stop))
)
})
.workers(1) // 设置为1个worker
.workers(config.task.workers as usize)
.bind(&bind_address)?
.run();
let ws_url = format!("{}{}", config.gateway.url.replace("https://", "wss://").replace("http://", "ws://"), config.gateway.ws_path);
println!("✅ SmartClaw 服务已启动在 {}", bind_address);
println!("🔍 可用接口:");
println!(" GET /api/v1/health - 健康检查");
@@ -464,7 +620,7 @@ async fn main() -> std::io::Result<()> {
println!(" GET /api/v1/websocket - WebSocket 连接");
println!(" POST /api/v1/websocket/disconnect - WebSocket 断开连接(测试)");
println!(" POST /api/v1/websocket/stop - WebSocket 停止管理器(测试)");
println!("💡 WebSocket 客户端已配置,连接到: {}", gateway_url);
println!("💡 WebSocket 客户端已配置,连接到: {}", ws_url);
// 启动优雅关闭处理
let ws_manager_clone = ws_manager.clone();

View File

@@ -8,30 +8,70 @@ use shared::{TaskRequest, TaskResponse};
use base64::Engine as _;
use base64::engine::general_purpose;
use rand::Rng;
use reqwest;
/// 获取当前本地时间的格式化字符串
fn get_current_time() -> String {
let now = chrono::Local::now();
now.format("%m-%d %H:%M:%S%.3f").to_string()
}
/// 带时间前缀的打印宏
macro_rules! log {
($($arg:tt)*) => {
println!("[{}] {}", get_current_time(), format!($($arg)*));
};
}
/// WebSocket 客户端连接管理器
#[derive(Clone)]
pub struct WebSocketClient {
gateway_url: String,
ws_path: String,
api_key: String,
heartbeat_interval: u64,
reconnect_interval: u64,
max_reconnect_attempts: u32,
debug: bool,
lmstudio_url: String,
lmstudio_enabled: bool,
sender: Arc<Mutex<Option<mpsc::Sender<String>>>>,
is_connected: Arc<Mutex<bool>>,
}
impl WebSocketClient {
/// 创建新的 WebSocket 客户端
pub fn new(gateway_url: String) -> Self {
pub fn new(gateway_url: String, ws_path: String, api_key: String, heartbeat_interval: u64, reconnect_interval: u64, max_reconnect_attempts: u32, debug: bool, lmstudio_url: String, lmstudio_enabled: bool) -> Self {
Self {
gateway_url,
ws_path,
api_key,
heartbeat_interval,
reconnect_interval,
max_reconnect_attempts,
debug,
lmstudio_url,
lmstudio_enabled,
sender: Arc::new(Mutex::new(None)),
is_connected: Arc::new(Mutex::new(false)),
}
}
/// 连接到网关服务
pub async fn connect(&self) -> Result<(), Box<dyn std::error::Error>> {
println!("🔌 正在连接到网关服务: {}", self.gateway_url);
pub async fn connect(&self) -> Result<(), String> {
if self.debug {
log!("🔌 正在连接到网关服务: {}", self.gateway_url);
}
let ws_url = format!("{}/api/v1/ws/control", self.gateway_url.replace("http://", "ws://").replace("https://", "wss://"));
println!("🔗 WebSocket URL: {}", ws_url);
let ws_url = format!("{}{}", self.gateway_url.replace("http://", "ws://").replace("https://", "wss://"), self.ws_path);
// 提取host header
let url_without_protocol = self.gateway_url.replace("https://", "").replace("http://", "");
let host = url_without_protocol.split(':').next().unwrap_or("localhost").to_string();
if self.debug {
log!("🔗 WebSocket URL: {}", ws_url);
}
// 生成随机的 Sec-WebSocket-Key
let mut key = [0u8; 16];
@@ -39,27 +79,37 @@ impl WebSocketClient {
let sec_websocket_key = general_purpose::STANDARD.encode(&key);
// 建立 WebSocket 连接添加API密钥和标准 WebSocket 握手头
let request = Request::builder()
let request = match Request::builder()
.uri(&ws_url)
.header("Host", "pactgo.cn")
.header("X-API-Key", "claw_secret_key")
.header("Host", host)
.header("X-API-Key", &self.api_key)
.header("Upgrade", "websocket")
.header("Connection", "Upgrade")
.header("Sec-WebSocket-Key", sec_websocket_key)
.header("Sec-WebSocket-Version", "13")
.body(())?;
.body(()) {
Ok(req) => req,
Err(e) => return Err(e.to_string()),
};
println!("🔗 正在建立WebSocket连接...");
println!("📋 请求URL: {}", ws_url);
println!("📋 HTTP版本: {:?}", request.version());
println!("📋 请求头:");
for (name, value) in request.headers() {
println!(" {}: {:?}", name, value);
if self.debug {
log!("🔗 正在建立WebSocket连接...");
log!("📋 请求URL: {}", ws_url);
log!("📋 HTTP版本: {:?}", request.version());
log!("📋 请求头:");
for (name, value) in request.headers() {
log!(" {}: {:?}", name, value);
}
}
let (ws_stream, response) = connect_async(request).await?;
println!("✅ WebSocket 连接建立");
println!("📋 响应状态: {}", response.status());
let (ws_stream, response) = match connect_async(request).await {
Ok(result) => result,
Err(e) => return Err(e.to_string()),
};
if self.debug {
log!("✅ WebSocket 连接建立");
log!("📋 响应状态: {}", response.status());
}
// 设置连接状态
*self.is_connected.lock().await = true;
@@ -72,10 +122,13 @@ impl WebSocketClient {
*self.sender.lock().await = Some(tx);
// 启动消息发送循环
let debug1 = self.debug;
let _write_handle = tokio::spawn(async move {
while let Some(msg) = rx.recv().await {
if let Err(e) = write.send(Message::Text(msg)).await {
println!("❌ 发送消息失败: {}", e);
if debug1 {
log!("❌ 发送消息失败: {}", e);
}
break;
}
}
@@ -83,23 +136,34 @@ impl WebSocketClient {
// 启动消息接收循环
let is_connected_clone = self.is_connected.clone();
let debug2 = self.debug;
let client_clone = self.clone();
let _read_handle = tokio::spawn(async move {
while let Some(msg) = read.next().await {
match msg {
Ok(Message::Text(text)) => {
println!("📨 收到消息: {}", text);
if debug2 {
log!("📨 收到消息: {}", text);
}
if let Ok(parsed) = serde_json::from_str::<serde_json::Value>(&text) {
Self::handle_incoming_message(parsed).await;
let client_clone = client_clone.clone();
tokio::spawn(async move {
client_clone.handle_incoming_message(parsed).await;
});
}
}
Ok(Message::Close(_)) => {
println!("🔚 收到关闭消息");
if debug2 {
log!("🔚 收到关闭消息");
}
*is_connected_clone.lock().await = false;
break;
}
Ok(_) => {}
Err(e) => {
println!("❌ 接收消息错误: {}", e);
if debug2 {
log!("❌ 接收消息错误: {}", e);
}
*is_connected_clone.lock().await = false;
break;
}
@@ -108,28 +172,47 @@ impl WebSocketClient {
});
// 启动心跳机制
let heartbeat_interval_cfg = self.heartbeat_interval;
let is_connected_heartbeat = self.is_connected.clone();
let sender_clone = self.sender.clone();
let debug3 = self.debug;
let _heartbeat_handle = {
let is_connected = self.is_connected.clone();
tokio::spawn(async move {
let mut heartbeat_interval = interval(Duration::from_secs(30));
let mut heartbeat_interval = interval(Duration::from_secs(heartbeat_interval_cfg));
loop {
heartbeat_interval.tick().await;
let connected = *is_connected.lock().await;
let connected = *is_connected_heartbeat.lock().await;
if !connected {
println!("💔 心跳检测到连接已断开");
if debug3 {
log!("💔 心跳检测到连接已断开");
}
break;
}
let _heartbeat_msg = json!({
// 创建心跳消息
let heartbeat_msg = json!({
"type": "heartbeat",
"service": "smartclaw",
"timestamp": chrono::Utc::now().timestamp()
}).to_string();
// 这里需要重新获取 sender因为生命周期问题
println!("💓 心跳发送");
// 实际发送心跳消息
if let Some(sender) = &*sender_clone.lock().await {
if let Err(e) = sender.send(heartbeat_msg).await {
if debug3 {
log!("❌ 发送心跳失败: {}", e);
log!("💔 心跳发送失败,连接可能已断开");
}
*is_connected_heartbeat.lock().await = false;
break;
}
}
if debug3 {
log!("💓 心跳发送");
}
}
})
};
@@ -146,31 +229,170 @@ impl WebSocketClient {
let _ = sender.send(connect_msg).await;
}
println!("🚀 WebSocket 客户端已启动");
if self.debug {
log!("🚀 WebSocket 客户端已启动");
}
Ok(())
}
/// 启动客户端并自动重连
pub async fn start_with_reconnect(&self) {
let mut attempt = 0u32;
loop {
// 检查是否超过最大重连次数0表示无限重连
if self.max_reconnect_attempts > 0 && attempt >= self.max_reconnect_attempts {
if self.debug {
log!("❌ 达到最大重连次数 {},停止重连", self.max_reconnect_attempts);
}
break;
}
if attempt > 0 {
if self.debug {
log!("⏳ 等待 {} 秒后进行第 {} 次重连...", self.reconnect_interval, attempt);
}
tokio::time::sleep(Duration::from_secs(self.reconnect_interval)).await;
}
attempt += 1;
if self.debug {
log!("🔄 尝试连接 (第 {} 次)...", attempt);
}
match self.connect().await {
Ok(_) => {
if self.debug {
log!("✅ 连接成功");
}
attempt = 0; // 连接成功后重置计数
// 等待连接断开
loop {
tokio::time::sleep(Duration::from_secs(1)).await;
if !*self.is_connected.lock().await {
if self.debug {
log!("🔌 检测到连接已断开,准备重连...");
}
break;
}
}
}
Err(e) => {
let error_msg = e.to_string();
if self.debug {
log!("❌ 连接失败: {}", error_msg);
}
*self.is_connected.lock().await = false;
}
}
}
}
/// 处理接收到的消息
async fn handle_incoming_message(message: serde_json::Value) {
async fn handle_incoming_message(&self, message: serde_json::Value) {
let debug = self.debug;
match message.get("type").and_then(|t| t.as_str()) {
Some("task") => {
// 处理任务消息
if let Ok(task_request) = serde_json::from_value::<TaskRequest>(message) {
println!("📝 收到任务请求: {:?}", task_request);
// 这里可以调用任务处理逻辑
if debug {
log!("📝 收到任务请求: {:?}", task_request);
}
}
}
Some("wechat_message") => {
// 处理企业微信消息
if debug {
log!("📱 收到企业微信消息");
log!(" 消息内容: {:?}", message);
}
// 提取消息数据
let from_user_name = message.get("data").and_then(|d| d.get("from_user_name").and_then(|v| v.as_str())).unwrap_or("");
let content = message.get("data").and_then(|d| d.get("content").and_then(|v| v.as_str())).unwrap_or("");
if debug {
log!(" 发送者: {}", from_user_name);
log!(" 消息内容: {}", content);
}
// 调用AI处理消息
if !content.is_empty() {
if debug {
log!("🤖 开始处理AI对话任务");
}
// 调用LMStudio API处理消息
let ai_response = match self.call_lmstudio(content).await {
Ok(response) => response,
Err(error) => {
if debug {
log!("❌ LMStudio处理失败: {}", error);
}
// 如果LMStudio调用失败使用默认回复
format!("处理消息时出错: {}", error)
}
};
if debug {
log!("✅ AI处理完成生成回复: {}", ai_response);
}
// 构建回复消息
let reply_message = serde_json::json! ({
"type": "wechat_message_response",
"data": {
"from_user_name": from_user_name,
"content": ai_response,
"msg_type": "text",
"timestamp": chrono::Utc::now().timestamp()
}
});
if debug {
log!("📤 准备回复企业微信消息: {:?}", reply_message);
}
// 通过WebSocket发送回复消息到网关
if let Some(sender) = &*self.sender.lock().await {
let reply_str = reply_message.to_string();
if let Err(e) = sender.send(reply_str).await {
if debug {
log!("❌ 发送回复消息失败: {}", e);
}
} else {
if debug {
log!("✅ 回复消息已发送到网关");
}
}
} else {
if debug {
log!("⚠️ 无法发送回复消息WebSocket连接未建立");
}
}
}
}
Some("heartbeat") => {
println!("💓 收到心跳响应");
if debug {
log!("💓 收到心跳响应");
}
}
Some("ack") => {
println!("✅ 收到确认消息");
if debug {
log!("✅ 收到确认消息");
}
}
Some(msg_type) => {
println!("❓ 收到未知消息类型: {}", msg_type);
if debug {
log!("❓ 收到未知消息类型: {}", msg_type);
}
}
None => {
println!("❓ 收到无类型消息");
if debug {
log!("❓ 收到无类型消息");
}
}
}
}
@@ -206,7 +428,172 @@ impl WebSocketClient {
pub async fn disconnect(&self) {
*self.is_connected.lock().await = false;
*self.sender.lock().await = None;
println!("🔌 WebSocket 连接已断开");
log!("🔌 WebSocket 连接已断开");
}
/// 读取或创建system prompt文件
async fn load_system_prompt(&self) -> String {
use std::fs::File;
use std::io::{Read, Write};
use std::path::Path;
// 定义system prompt文件路径
let prompts_dir = Path::new("./prompts");
let system_prompt_file = prompts_dir.join("system.txt");
// 确保prompts目录存在
if !prompts_dir.exists() {
if let Err(e) = std::fs::create_dir_all(prompts_dir) {
if self.debug {
log!("❌ 创建prompts目录失败: {}", e);
}
return self.get_default_system_prompt();
}
}
// 尝试读取system prompt文件
if system_prompt_file.exists() {
match File::open(&system_prompt_file) {
Ok(mut file) => {
let mut content = String::new();
if let Ok(_) = file.read_to_string(&mut content) {
if self.debug {
log!("✅ 成功加载system prompt文件");
}
return content;
} else {
if self.debug {
log!("❌ 读取system prompt文件失败使用默认值");
}
return self.get_default_system_prompt();
}
}
Err(e) => {
if self.debug {
log!("❌ 打开system prompt文件失败: {}, 使用默认值", e);
}
return self.get_default_system_prompt();
}
}
} else {
// 创建system prompt文件
let default_prompt = self.get_default_system_prompt();
match File::create(&system_prompt_file) {
Ok(mut file) => {
if let Ok(_) = file.write_all(default_prompt.as_bytes()) {
if self.debug {
log!("✅ 创建system prompt文件成功");
}
} else {
if self.debug {
log!("❌ 创建system prompt文件失败");
}
}
}
Err(e) => {
if self.debug {
log!("❌ 创建system prompt文件失败: {}", e);
}
}
}
return default_prompt;
}
}
/// 获取默认的system prompt
fn get_default_system_prompt(&self) -> String {
"<|im_start|>system
你现在的名字是小宝,在任何回答中都要记住自己叫小宝。
你是一个友好、简洁、专业的智能助手。
<|im_end|>".to_string()
}
/// 调用LMStudio API处理消息
async fn call_lmstudio(&self, message: &str) -> Result<String, String> {
if !self.lmstudio_enabled {
return Err("LMStudio is not enabled".to_string());
}
if self.debug {
log!("🌐 调用LMStudio API: {}", self.lmstudio_url);
log!("📝 发送消息: {}", message);
}
// 读取或创建system prompt文件
let system_prompt = self.load_system_prompt().await;
// 构建LMStudio API请求
let payload = json!({
"model": "qwen-3-panda-agi-v2", // 使用当前可用模型
"input": message,
"system_prompt": system_prompt,
"temperature": 0.7,
"max_output_tokens": 500
});
// 发送请求到LMStudio
let client = reqwest::Client::new();
let response = match client.post(&format!("{}/api/v1/chat", self.lmstudio_url))
.header("Content-Type", "application/json")
.json(&payload)
.send()
.await {
Ok(resp) => resp,
Err(e) => {
if self.debug {
log!("❌ LMStudio API请求失败: {}", e);
}
return Err(format!("LMStudio API request failed: {}", e));
}
};
// 解析响应
let response_text = match response.text().await {
Ok(text) => text,
Err(e) => {
if self.debug {
log!("❌ 读取LMStudio响应失败: {}", e);
}
return Err(format!("Failed to read LMStudio response: {}", e));
}
};
if self.debug {
log!("📨 LMStudio响应: {}", response_text);
}
// 解析JSON响应
let response_json: serde_json::Value = match serde_json::from_str(&response_text) {
Ok(json) => json,
Err(e) => {
if self.debug {
log!("❌ 解析LMStudio响应失败: {}", e);
}
return Err(format!("Failed to parse LMStudio response: {}", e));
}
};
// 提取AI回复
let ai_response = match response_json
.get("output")
.and_then(|o| o.as_array())
.and_then(|o| o.iter().find(|item| item.get("type").and_then(|t| t.as_str()) == Some("message")))
.and_then(|item| item.get("content"))
.and_then(|c| c.as_str()) {
Some(content) => content.to_string(),
None => {
if self.debug {
log!("❌ 无法提取LMStudio回复内容");
}
return Err("Failed to extract AI response from LMStudio".to_string());
}
};
if self.debug {
log!("✅ LMStudio处理完成生成回复: {}", ai_response);
}
Ok(ai_response)
}
}
@@ -218,17 +605,22 @@ pub struct WebSocketClientManager {
impl WebSocketClientManager {
/// 创建新的管理器
pub fn new(gateway_url: String) -> Self {
pub fn new(gateway_url: String, ws_path: String, api_key: String, heartbeat_interval: u64, reconnect_interval: u64, max_reconnect_attempts: u32, debug: bool, lmstudio_url: String, lmstudio_enabled: bool) -> Self {
Self {
client: Arc::new(WebSocketClient::new(gateway_url)),
client: Arc::new(WebSocketClient::new(gateway_url, ws_path, api_key, heartbeat_interval, reconnect_interval, max_reconnect_attempts, debug, lmstudio_url, lmstudio_enabled)),
}
}
/// 启动客户端连接
pub async fn start(&self) -> Result<(), Box<dyn std::error::Error>> {
pub async fn start(&self) -> Result<(), String> {
self.client.connect().await
}
/// 启动客户端并自动重连
pub async fn start_with_reconnect(&self) {
self.client.start_with_reconnect().await;
}
/// 获取客户端实例
pub fn get_client(&self) -> Arc<WebSocketClient> {
self.client.clone()

View File

@@ -3,6 +3,20 @@ use std::sync::Arc;
use tokio::sync::RwLock;
use serde_json::json;
use uuid::Uuid;
use actix_ws::Session;
/// 获取当前本地时间的格式化字符串
fn get_current_time() -> String {
let now = chrono::Local::now();
now.format("%m-%d %H:%M:%S%.3f").to_string()
}
/// 带时间前缀的打印宏
macro_rules! log {
($($arg:tt)*) => {
println!("[{}] {}", get_current_time(), format!($($arg)*));
};
}
/// 连接信息
#[derive(Debug, Clone)]
@@ -32,13 +46,39 @@ impl ConnectionManager {
}
}
/// 添加连接
pub fn add_connection(&mut self, info: ConnectionInfo) {
let id = info.id.clone();
self.connections.insert(id.clone(), info);
log!("🔌 添加新连接: {}", id);
}
/// 移除连接
pub fn remove_connection(&mut self, id: &str) -> Option<ConnectionInfo> {
let removed = self.connections.remove(id);
if removed.is_some() {
log!("🔌 移除连接: {}", id);
}
removed
}
/// 更新连接心跳时间
pub fn update_heartbeat(&mut self, id: &str) -> bool {
if let Some(info) = self.connections.get_mut(id) {
info.last_heartbeat = Instant::now();
true
} else {
false
}
}
/// 获取连接信息
pub fn get_connection(&self, id: &str) -> Option<&ConnectionInfo> {
let conn = self.connections.get(id);
if let Some(info) = conn {
println!("🔍 获取连接信息: {} (连接时长: {:?})", id, info.connected_at.elapsed());
log!("🔍 获取连接信息: {} (连接时长: {:?})", id, info.connected_at.elapsed());
} else {
println!("⚠️ 连接不存在: {}", id);
log!("⚠️ 连接不存在: {}", id);
}
conn
}
@@ -56,9 +96,9 @@ impl ConnectionManager {
/// 获取所有连接信息
pub fn get_all_connections(&self) -> Vec<&ConnectionInfo> {
let connections: Vec<&ConnectionInfo> = self.connections.values().collect();
println!("📋 获取所有连接信息: {}个连接", connections.len());
log!("📋 获取所有连接信息: {}个连接", connections.len());
for conn in &connections {
println!(" - 连接: {} (连接时长: {:?})", conn.id, conn.connected_at.elapsed());
log!(" - 连接: {} (连接时长: {:?})", conn.id, conn.connected_at.elapsed());
}
connections
}
@@ -71,7 +111,7 @@ impl ConnectionManager {
self.connections.retain(|id, info| {
let elapsed = now.duration_since(info.last_heartbeat);
if elapsed > timeout {
println!("🧹 清理超时连接: {} (最后心跳: {:?}前)", id, elapsed);
log!("🧹 清理超时连接: {} (最后心跳: {:?}前)", id, elapsed);
removed_ids.push(id.clone());
false
} else {
@@ -80,7 +120,7 @@ impl ConnectionManager {
});
if !removed_ids.is_empty() {
println!("🧹 总共清理超时连接: {}个 (剩余: {}个)", removed_ids.len(), self.connections.len());
log!("🧹 总共清理超时连接: {}个 (剩余: {}个)", removed_ids.len(), self.connections.len());
}
removed_ids
@@ -90,19 +130,21 @@ impl ConnectionManager {
/// WebSocket连接池
pub struct WebSocketPool {
manager: Arc<RwLock<ConnectionManager>>,
sessions: Arc<RwLock<std::collections::HashMap<String, Arc<RwLock<Session>>>>>,
}
impl WebSocketPool {
pub fn new() -> Self {
println!("🚀 创建WebSocket连接池");
pub fn new(manager: Arc<RwLock<ConnectionManager>>) -> Self {
log!("🚀 创建WebSocket连接池");
Self {
manager: Arc::new(RwLock::new(ConnectionManager::new())),
manager,
sessions: Arc::new(RwLock::new(std::collections::HashMap::new())),
}
}
/// 获取连接管理器
pub fn get_manager(&self) -> Arc<RwLock<ConnectionManager>> {
println!("📋 获取WebSocket连接管理器 (引用计数: {:?})", Arc::strong_count(&self.manager));
log!("📋 获取WebSocket连接管理器 (引用计数: {:?})", Arc::strong_count(&self.manager));
self.manager.clone()
}
@@ -111,34 +153,85 @@ impl WebSocketPool {
&self.manager
}
/// 广播消息到所有连接
pub async fn broadcast(&self, _message: serde_json::Value) -> Result<(), String> {
/// 添加WebSocket会话
pub async fn add_session(&self, connection_id: &str, session: Session) {
let mut sessions = self.sessions.write().await;
sessions.insert(connection_id.to_string(), Arc::new(RwLock::new(session)));
log!("🔌 添加WebSocket会话: {}", connection_id);
}
/// 移除WebSocket会话
pub async fn remove_session(&self, connection_id: &str) {
let mut sessions = self.sessions.write().await;
if sessions.remove(connection_id).is_some() {
log!("🔌 移除WebSocket会话: {}", connection_id);
}
}
/// 广播消息到所有控制通道连接SmartClaw
pub async fn broadcast_to_control(&self, message: serde_json::Value) -> Result<(), String> {
let manager = self.manager.read().await;
let connections = manager.get_all_connections();
if connections.is_empty() {
return Err("没有可用的WebSocket连接".to_string());
// 过滤出控制通道连接SmartClaw
let control_connections: Vec<&ConnectionInfo> = connections.into_iter()
.filter(|conn| conn.client_info.as_ref().map(|info| info == "SmartClaw").unwrap_or(false))
.collect();
if control_connections.is_empty() {
return Err("没有可用的SmartClaw连接".to_string());
}
println!("📢 WebSocket连接池广播消息到 {}连接", connections.len());
log!("📢 WebSocket连接池广播消息到 {} 个SmartClaw连接", control_connections.len());
// 实现具体的消息发送逻辑
let sessions = self.sessions.read().await;
let message_str = message.to_string();
for conn in &control_connections {
if let Some(session) = sessions.get(&conn.id) {
let mut session = session.write().await;
if let Err(e) = session.text(message_str.clone()).await {
log!("❌ 发送消息到SmartClaw连接 {} 失败: {}", conn.id, e);
} else {
log!("✅ 发送消息到SmartClaw连接: {}", conn.id);
}
}
}
// 这里需要实现具体的消息发送逻辑
// 暂时返回成功
Ok(())
}
/// 广播消息到所有连接(兼容旧接口)
pub async fn broadcast(&self, message: serde_json::Value) -> Result<(), String> {
self.broadcast_to_control(message).await
}
/// 发送消息到指定连接(用于测试和调试)
pub async fn send_to_connection(&self, connection_id: &str, _message: serde_json::Value) -> Result<(), String> {
pub async fn send_to_connection(&self, connection_id: &str, message: serde_json::Value) -> Result<(), String> {
let manager = self.manager.read().await;
if manager.get_connection(connection_id).is_none() {
return Err(format!("连接不存在: {}", connection_id));
}
println!("📨 WebSocket连接池发送消息到连接: {}", connection_id);
log!("📨 WebSocket连接池发送消息到连接: {}", connection_id);
// 实现具体的消息发送逻辑
let sessions = self.sessions.read().await;
let message_str = message.to_string();
if let Some(session) = sessions.get(connection_id) {
let mut session = session.write().await;
if let Err(e) = session.text(message_str).await {
return Err(format!("发送消息失败: {}", e));
} else {
log!("✅ 发送消息到连接: {}", connection_id);
}
} else {
return Err(format!("会话不存在: {}", connection_id));
}
// 这里需要实现具体的消息发送逻辑
// 暂时返回成功
Ok(())
}
@@ -173,7 +266,7 @@ pub struct CommunicationConfig {
impl Default for CommunicationConfig {
fn default() -> Self {
println!("⚙️ 创建默认通信配置");
log!("⚙️ 创建默认通信配置");
Self {
websocket_url: "ws://localhost:8000/api/v1/ws/control".to_string(),
api_key: "claw_secret_key".to_string(),
@@ -187,7 +280,7 @@ impl Default for CommunicationConfig {
impl CommunicationConfig {
/// 创建生产环境配置
pub fn production() -> Self {
println!("🏭 创建生产环境通信配置 (心跳: {:?}, 超时: {:?})", Duration::from_secs(30), Duration::from_secs(60));
log!("🏭 创建生产环境通信配置 (心跳: {:?}, 超时: {:?})", Duration::from_secs(30), Duration::from_secs(60));
Self {
websocket_url: "ws://pactgo.cn/api/v1/ws/control".to_string(),
api_key: std::env::var("API_KEY").unwrap_or_else(|_| "claw_secret_key".to_string()),
@@ -199,7 +292,7 @@ impl CommunicationConfig {
/// 验证配置
pub fn validate(&self) -> Result<(), String> {
println!("🔍 验证通信配置...");
log!("🔍 验证通信配置...");
if self.websocket_url.is_empty() {
return Err("WebSocket URL不能为空".to_string());
}
@@ -209,7 +302,7 @@ impl CommunicationConfig {
if self.max_connections == 0 {
return Err("最大连接数不能为0".to_string());
}
println!("✅ 通信配置验证通过 (URL: {}, 最大连接数: {})", self.websocket_url, self.max_connections);
log!("✅ 通信配置验证通过 (URL: {}, 最大连接数: {})", self.websocket_url, self.max_connections);
Ok(())
}
}
@@ -222,7 +315,7 @@ pub struct WebSocketClient {
impl WebSocketClient {
pub fn new(config: CommunicationConfig) -> Self {
println!("🚀 创建WebSocket客户端配置URL: {}", config.websocket_url);
log!("🚀 创建WebSocket客户端配置URL: {}", config.websocket_url);
Self {
config,
connected: false,
@@ -231,7 +324,7 @@ impl WebSocketClient {
/// 连接到网关服务
pub async fn connect(&mut self) -> Result<(), String> {
println!("🔗 正在连接到网关WebSocket: {}", self.config.websocket_url);
log!("🔗 正在连接到网关WebSocket: {}", self.config.websocket_url);
// 验证配置
if let Err(e) = self.config.validate() {
@@ -240,7 +333,7 @@ impl WebSocketClient {
// 这里需要实现具体的WebSocket连接逻辑
// 暂时返回模拟连接成功
println!("✅ WebSocket连接成功 (模拟)");
log!("✅ WebSocket连接成功 (模拟)");
self.connected = true;
Ok(())
}
@@ -248,10 +341,10 @@ impl WebSocketClient {
/// 断开连接
pub async fn disconnect(&mut self) -> Result<(), String> {
if self.connected {
println!("🔌 断开WebSocket连接: {}", self.config.websocket_url);
log!("🔌 断开WebSocket连接: {}", self.config.websocket_url);
self.connected = false;
} else {
println!("⚠️ 没有活动的WebSocket连接需要断开");
log!("⚠️ 没有活动的WebSocket连接需要断开");
}
Ok(())
}
@@ -263,7 +356,7 @@ impl WebSocketClient {
"task": _task
});
println!("📤 WebSocket客户端发送任务: {:?} (心跳: {:?}, 超时: {:?})", _task.task_type, self.config.heartbeat_interval, self.config.connection_timeout);
log!("📤 WebSocket客户端发送任务: {:?} (心跳: {:?}, 超时: {:?})", _task.task_type, self.config.heartbeat_interval, self.config.connection_timeout);
// 这里需要实现具体的发送逻辑
// 暂时返回模拟响应
@@ -287,7 +380,7 @@ impl WebSocketClient {
/// 检查连接状态
pub fn is_connected(&self) -> bool {
println!("🔗 WebSocket客户端连接状态: {}", if self.connected { "已连接" } else { "未连接" });
log!("🔗 WebSocket客户端连接状态: {}", if self.connected { "已连接" } else { "未连接" });
self.connected
}
}

View File

@@ -0,0 +1,140 @@
C:\Disk\Gateway>gateway.exe
🚀 网关服务启动中...
📍 绑定地址: 127.0.0.1:8000 (通过nginx代理)
📝 日志级别: info
🔧 版本: 0.1.0
🎯 环境: development
🌐 外部访问: https://pactgo.cn (nginx代理)
[03-23 13:08:18.414] 🚀 创建WebSocket连接池
[03-23 13:08:18.415] ⚙️ 创建默认通信配置
[03-23 13:08:18.415] 🚀 初始化任务处理服务
[03-23 13:08:18.415] 📋 WebSocket连接池已创建
[03-23 13:08:18.415] ⚙️ 通信配置已加载: "ws://localhost:8000/api/v1/ws/control"
[03-23 13:08:18.415] 🏭 创建生产环境通信配置 (心跳: 30s, 超时: 60s)
[03-23 13:08:18.415] 🚀 创建WebSocket客户端配置URL: ws://pactgo.cn/api/v1/ws/control
[2026-03-23T05:08:18Z INFO actix_server::builder] starting 2 workers
✅ 网关服务已启动在 127.0.0.1:8000 (通过nginx代理)
🔍 可用接口:
🎯 企业微信回调 - 直接匹配企业微信配置
POST /wecom - 企业微信回调(必须直接匹配企业微信配置)
📋 API接口通过 /api/v1 前缀):
GET /api/v1/health - 健康检查
GET /api/v1/system - 系统信息
POST /api/v1/task - 处理任务
GET /api/v1/task/<task_id> - 查询任务状态
GET /api/v1/tasks - 查询任务列表
POST /api/v1/wechat/miniprogram/callback - 微信小程序回调
GET /api/v1/ws/control - WebSocket控制通道
GET /api/v1/ws/task - WebSocket任务通道
POST /api/v1/test/websocket/send - WebSocket发送测试
POST /api/v1/test/websocket/send_and_wait - WebSocket发送并等待测试
GET /api/v1/test/websocket/get_manager - WebSocket管理器测试
GET /api/v1/test/websocket/connection_send - WebSocket连接发送测试
POST /api/v1/test/websocket/direct_send - WebSocket直接发送测试
POST /api/v1/test/websocket/direct_send_and_wait - WebSocket直接发送并等待测试
🌐 外部访问: https://pactgo.cn (nginx代理)
🔗 WebSocket连接: wss://pactgo.cn/api/v1/ws/control
[2026-03-23T05:08:18Z INFO actix_server::server] Actix runtime found; starting in Actix runtime
[2026-03-23T05:08:18Z INFO actix_server::server] starting service: "actix-web-service-127.0.0.1:8000", workers: 2, listening on: 127.0.0.1:8000
🔄 启动WebSocket客户端连接测试...
[03-23 13:08:18.425] 🔗 正在连接到网关WebSocket: ws://pactgo.cn/api/v1/ws/control
[03-23 13:08:18.425] 🔍 验证通信配置...
[03-23 13:08:18.425] ✅ 通信配置验证通过 (URL: ws://pactgo.cn/api/v1/ws/control, 最大连接数: 100)
[03-23 13:08:18.425] ✅ WebSocket连接成功 (模拟)
✅ WebSocket客户端连接成功
[03-23 13:08:18.426] 🔗 WebSocket客户端连接状态: 已连接
🔗 WebSocket客户端已连接
[03-23 13:08:18.426] 📤 WebSocket客户端发送任务: TextProcessing (心跳: 30s, 超时: 60s)
✅ 测试任务发送成功: "任务处理成功WebSocket模拟"
[03-23 13:08:18.428] 📋 获取所有连接信息: 0个连接
[03-23 13:08:18.556] 🔗 收到WebSocket连接请求:
[03-23 13:08:18.556] HttpRequest GET HTTP/1.1:/api/v1/ws/control
[03-23 13:08:18.557] headers:
[03-23 13:08:18.557] "upgrade": "websocket"
[03-23 13:08:18.557] "x-api-key": "claw_secret_key"
[03-23 13:08:18.557] "sec-websocket-version": "13"
[03-23 13:08:18.558] "x-real-ip": "222.211.215.207"
[03-23 13:08:18.558] "connection": "upgrade"
[03-23 13:08:18.559] "host": "pactgo.cn"
[03-23 13:08:18.560] "x-forwarded-for": "222.211.215.207"
[03-23 13:08:18.560] "x-forwarded-proto": "https"
[03-23 13:08:18.560] "sec-websocket-key": "xV8B16meVWRnAZI8FHb1ig=="
[03-23 13:08:18.560] ✅ WebSocket连接认证通过
[03-23 13:08:18.560] 🎯 检测到SmartClaw服务连接 (控制通道)
[03-23 13:08:18.560] 🔗 开始WebSocket握手...
[03-23 13:08:18.560] ✅ WebSocket握手成功
[03-23 13:08:18.561] 🔌 添加新连接: 26dd6cb5-c305-421d-a8a3-753d0455b1c6
[03-23 13:08:18.561] 🔌 添加WebSocket会话: 26dd6cb5-c305-421d-a8a3-753d0455b1c6
[03-23 13:08:18.561] 🔌 创建新的WebSocket连接: id=26dd6cb5-c305-421d-a8a3-753d0455b1c6, type=SmartClaw
[03-23 13:08:18.561] 🔄 启动WebSocket消息处理循环...
[03-23 13:08:18.561] ✅ WebSocket连接已建立
[03-23 13:08:18.561] ✅ WebSocket消息处理循环已启动
[03-23 13:08:18.570] 📨 收到消息: {"service":"smartclaw","timestamp":1774242498,"type":"connect","version":"0.1.0"}
[03-23 13:08:18.570] 🔗 收到连接消息
[03-23 13:08:18.594] 📨 收到消息: {"service":"smartclaw","timestamp":1774242498,"type":"heartbeat"}
[03-23 13:08:18.594] 💓 收到心跳消息
[03-23 13:08:22.292] 📨 收到消息: {"service":"smartclaw","timestamp":1774242502,"type":"heartbeat"}
[03-23 13:08:22.293] 💓 收到心跳消息
[03-23 13:08:23.435] 🔌 断开WebSocket连接: ws://pactgo.cn/api/v1/ws/control
🔌 WebSocket客户端已断开
[03-23 13:08:37.412] 📱 收到企业微信回调
[03-23 13:08:37.412] 请求方法: POST
[03-23 13:08:37.414] 查询参数: msg_signature=d3c4aaed9b7bfdb3b7ac17119762e639bcaa3e2a&timestamp=1774242517&nonce=1774420742
[03-23 13:08:37.414] 📥 开始消息推送处理流程
[03-23 13:08:37.415] 消息内容: <xml><ToUserName><![CDATA[wwa7bb7aec981103b4]]></ToUserName><Encrypt><![CDATA[sTK12sXLTGbDIHop8a/0+8lehw6CBjCdfXBdR1DwpyFWmt8fttKjChwIzocwgWXaPDPaB68Cs7+MHVg6VINthu4bLHS/lNgKGjYYsQHe5vrdEQ2OGUBOWIQqNNX3bb0e7Ls/zBecAw7blSQNKi4akxGF1anvMVuVfv8ZspdkkongM3jXFiQ0HJJJDv68ejWRpW9iY3XR5F3FsQgYB0UbJguKtO+JuBdxlLQ30PdI0VrNtWL6XbMZN1xwXWTqxwy/A7aXFv/7nc2w/HnPJZW21/2AX0K/1MIdQpOamtZ3LaV5/tu/bSqUaSEF8n2PHN3tpmynYQKtcq0+KZWEnUXmxqvTWotLkV8f6MPlkPrQ5cEmXZ+EcV8jS4v2y9gZvhQ7IUYJ3H78eoqKm0uGlKCnRM9Ac22F961lRKuT9Y+ggsCtlKwU3eDuKLNJ2rdAQ0oO4OxcvPVguuy9c5m6CepL2g==]]></Encrypt><AgentID><![CDATA[1000002]]></AgentID></xml>
[03-23 13:08:37.415] 🔐 开始验证企业微信签名
[03-23 13:08:37.415] 🔐 验证企业微信签名:
[03-23 13:08:37.415] msg_signature: d3c4aaed9b7bfdb3b7ac17119762e639bcaa3e2a
[03-23 13:08:37.415] timestamp: 1774242517
[03-23 13:08:37.415] nonce: 1774420742
[03-23 13:08:37.415] data: sTK12sXLTGbDIHop8a/0+8lehw6CBjCdfXBdR1DwpyFWmt8fttKjChwIzocwgWXaPDPaB68Cs7+MHVg6VINthu4bLHS/lNgKGjYYsQHe5vrdEQ2OGUBOWIQqNNX3bb0e7Ls/zBecAw7blSQNKi4akxGF1anvMVuVfv8ZspdkkongM3jXFiQ0HJJJDv68ejWRpW9iY3XR5F3FsQgYB0UbJguKtO+JuBdxlLQ30PdI0VrNtWL6XbMZN1xwXWTqxwy/A7aXFv/7nc2w/HnPJZW21/2AX0K/1MIdQpOamtZ3LaV5/tu/bSqUaSEF8n2PHN3tpmynYQKtcq0+KZWEnUXmxqvTWotLkV8f6MPlkPrQ5cEmXZ+EcV8jS4v2y9gZvhQ7IUYJ3H78eoqKm0uGlKCnRM9Ac22F961lRKuT9Y+ggsCtlKwU3eDuKLNJ2rdAQ0oO4OxcvPVguuy9c5m6CepL2g==
[03-23 13:08:37.416] token: mytoken123456
[03-23 13:08:37.416] 排序后拼接字符串: 17742425171774420742mytoken123456sTK12sXLTGbDIHop8a/0+8lehw6CBjCdfXBdR1DwpyFWmt8fttKjChwIzocwgWXaPDPaB68Cs7+MHVg6VINthu4bLHS/lNgKGjYYsQHe5vrdEQ2OGUBOWIQqNNX3bb0e7Ls/zBecAw7blSQNKi4akxGF1anvMVuVfv8ZspdkkongM3jXFiQ0HJJJDv68ejWRpW9iY3XR5F3FsQgYB0UbJguKtO+JuBdxlLQ30PdI0VrNtWL6XbMZN1xwXWTqxwy/A7aXFv/7nc2w/HnPJZW21/2AX0K/1MIdQpOamtZ3LaV5/tu/bSqUaSEF8n2PHN3tpmynYQKtcq0+KZWEnUXmxqvTWotLkV8f6MPlkPrQ5cEmXZ+EcV8jS4v2y9gZvhQ7IUYJ3H78eoqKm0uGlKCnRM9Ac22F961lRKuT9Y+ggsCtlKwU3eDuKLNJ2rdAQ0oO4OxcvPVguuy9c5m6CepL2g==
[03-23 13:08:37.416] 计算签名: d3c4aaed9b7bfdb3b7ac17119762e639bcaa3e2a
[03-23 13:08:37.416] 验证结果: ✅ 通过
[03-23 13:08:37.416] ✅ 签名验证通过,开始处理消息
[03-23 13:08:37.416] 📄 开始解析企业微信XML消息
[03-23 13:08:37.417] 🔒 发现加密消息,开始解密
[03-23 13:08:37.417] 提取到加密内容
[03-23 13:08:37.417] ✅ 消息解密成功
[03-23 13:08:37.417] 解密后内容: <xml><ToUserName><![CDATA[wwa7bb7aec981103b4]]></ToUserName><FromUserName><![CDATA[ZengQingMing]]></FromUserName><CreateTime>1774242517</CreateTime><MsgType><![CDATA[text]]></MsgType><Content><![CDATA[好 像通了]]></Content><MsgId>7620313586460205140</MsgId><AgentID>1000002</AgentID></xml>
[03-23 13:08:37.417] 发送者: ZengQingMing
[03-23 13:08:37.417] 消息内容: 好像通了
[03-23 13:08:37.417] 消息类型: text
[03-23 13:08:37.418] 📋 获取所有连接信息: 1个连接
[03-23 13:08:37.418] - 连接: 26dd6cb5-c305-421d-a8a3-753d0455b1c6 (连接时长: 18.8562652s)
[03-23 13:08:37.418] 📤 准备发送回复消息给用户: ZengQingMing
[03-23 13:08:37.418] ✅ 已开始发送回复消息: 思考中...
[03-23 13:08:37.418] 🔄 开始转发消息到SmartClaw
[03-23 13:08:37.418] 📤 发送消息到SmartClaw: Object {"data": Object {"content": String("好像通了"), "event": Null, "from_user_name": String("ZengQingMing"), "msg_signature": String("d3c4aaed9b7bfdb3b7ac17119762e639bcaa3e2a"), "msg_type": String("text"), "nonce": String("1774420742"), "raw_body": String("<xml><ToUserName><![CDATA[wwa7bb7aec981103b4]]></ToUserName><Encrypt><![CDATA[sTK12sXLTGbDIHop8a/0+8lehw6CBjCdfXBdR1DwpyFWmt8fttKjChwIzocwgWXaPDPaB68Cs7+MHVg6VINthu4bLHS/lNgKGjYYsQHe5vrdEQ2OGUBOWIQqNNX3bb0e7Ls/zBecAw7blSQNKi4akxGF1anvMVuVfv8ZspdkkongM3jXFiQ0HJJJDv68ejWRpW9iY3XR5F3FsQgYB0UbJguKtO+JuBdxlLQ30PdI0VrNtWL6XbMZN1xwXWTqxwy/A7aXFv/7nc2w/HnPJZW21/2AX0K/1MIdQpOamtZ3LaV5/tu/bSqUaSEF8n2PHN3tpmynYQKtcq0+KZWEnUXmxqvTWotLkV8f6MPlkPrQ5cEmXZ+EcV8jS4v2y9gZvhQ7IUYJ3H78eoqKm0uGlKCnRM9Ac22F961lRKuT9Y+ggsCtlKwU3eDuKLNJ2rdAQ0oO4OxcvPVguuy9c5m6CepL2g==]]></Encrypt><AgentID><![CDATA[1000002]]></AgentID></xml>"), "timestamp": String("1774242517")}, "type": String("wechat_message")}
[03-23 13:08:37.418] ✅ 企业微信消息处理完成,返回 success
[03-23 13:08:37.419] 📤 开始发送企业微信消息
[03-23 13:08:37.420] 📋 获取所有连接信息: 1个连接
[03-23 13:08:37.420] - 连接: 26dd6cb5-c305-421d-a8a3-753d0455b1c6 (连接时长: 18.858536s)
[03-23 13:08:37.420] 📢 WebSocket连接池广播消息到 1 个SmartClaw连接
[03-23 13:08:37.420] ✅ 发送消息到SmartClaw连接: 26dd6cb5-c305-421d-a8a3-753d0455b1c6
[03-23 13:08:37.421] ✅ 消息已成功转发到SmartClaw
[03-23 13:08:37.432] 📨 收到消息: {"data":{"content":"我收到了你的消息,正在处理...","from_user_name":"ZengQingMing","msg_type":"text","timestamp":1774242517},"type":"wechat_message_response"}
[03-23 13:08:37.438] 📱 收到SmartClaw的微信消息回复
[03-23 13:08:37.438] 回复发送者: ZengQingMing
[03-23 13:08:37.438] 回复内容: 我收到了你的消息,正在处理...
[03-23 13:08:37.438] 📤 开始发送企业微信消息
[03-23 13:08:37.756] 获取到访问令牌: mUca6bQOJQ3fPnLdoZU9__QOLXUnBU9amY9yl-y1QJCb3oumKzRhKS_poGcXkOzaA58q221dGtVGISkQANsamvXYpM-q5bCQs-ok_WpzqZwBCiBuiVkZpzyQoBxPE_OKUCLvMKQW0Rx381LOBMK8G34ngBvUodnWo9hlvosPHe48qLpunLdQJjXtekpJe0HHOaAAoWfWWJLY1G60IMPIvA
[03-23 13:08:37.811] 获取到访问令牌: mUca6bQOJQ3fPnLdoZU9__QOLXUnBU9amY9yl-y1QJCb3oumKzRhKS_poGcXkOzaA58q221dGtVGISkQANsamvXYpM-q5bCQs-ok_WpzqZwBCiBuiVkZpzyQoBxPE_OKUCLvMKQW0Rx381LOBMK8G34ngBvUodnWo9hlvosPHe48qLpunLdQJjXtekpJe0HHOaAAoWfWWJLY1G60IMPIvA
[03-23 13:08:38.187] ✅ 消息发送成功
[03-23 13:08:38.211] ✅ 消息发送成功
[03-23 13:08:38.211] ✅ 企业微信回复消息发送成功
[03-23 13:08:48.436] 📋 获取所有连接信息: 1个连接
[03-23 13:08:48.436] - 连接: 26dd6cb5-c305-421d-a8a3-753d0455b1c6 (连接时长: 29.8742601s)
🔍 测试WebSocket连接 - 发现 1 个连接
📤 准备向连接 26dd6cb5-c305-421d-a8a3-753d0455b1c6 发送测试消息
✅ 模拟发送测试消息: {"message":"连接健康检查","timestamp":1774242528,"type":"health_check"}
🧪 测试WebSocketConnection的send方法
📤 send方法测试结果: {"connection_id":"26dd6cb5-c305-421d-a8a3-753d0455b1c6","test_message":"这是send方法的测试消息","type":"test_send"}
🧪 测试WebSocketConnection的send_and_wait方法
⏱️ send_and_wait方法测试结果: {"connection_id":"26dd6cb5-c305-421d-a8a3-753d0455b1c6","request":"这是send_and_wait方法的测试请求","response":"模拟响应数据","timeout":5000,"type":"test_send_and_wait"}
[03-23 13:08:48.572] 📨 收到消息: {"service":"smartclaw","timestamp":1774242528,"type":"heartbeat"}
[03-23 13:08:48.572] 💓 收到心跳消息
[03-23 13:08:52.293] 📨 收到消息: {"service":"smartclaw","timestamp":1774242532,"type":"heartbeat"}
[03-23 13:08:52.293] 💓 收到心跳消息

View File

@@ -4,11 +4,26 @@ use actix_ws::{Message};
use serde::{Deserialize, Serialize};
use std::env;
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::RwLock;
use base64;
use shared::{TaskRequest, TaskResponse, HealthResponse, utils};
use sha1::{Sha1, Digest};
use futures::StreamExt;
use crate::communication::ConnectionInfo;
/// 获取当前本地时间的格式化字符串
fn get_current_time() -> String {
let now = chrono::Local::now();
now.format("%m-%d %H:%M:%S%.3f").to_string()
}
/// 带时间前缀的打印宏
macro_rules! log {
($($arg:tt)*) => {
println!("[{}] {}", get_current_time(), format!($($arg)*));
};
}
mod communication;
@@ -56,13 +71,13 @@ async fn get_wechat_access_token() -> Result<String, Box<dyn std::error::Error>>
/// 发送企业微信消息
async fn send_wechat_message(touser: &str, content: &str, debug: bool) -> Result<(), Box<dyn std::error::Error>> {
if debug {
println!("📤 开始发送企业微信消息");
log!("📤 开始发送企业微信消息");
}
// 获取访问令牌
let access_token = get_wechat_access_token().await?;
if debug {
println!(" 获取到访问令牌: {}", access_token);
log!(" 获取到访问令牌: {}", access_token);
}
// 构建消息
@@ -88,12 +103,12 @@ async fn send_wechat_message(touser: &str, content: &str, debug: bool) -> Result
if result.errcode == 0 {
if debug {
println!("✅ 消息发送成功");
log!("✅ 消息发送成功");
}
Ok(())
} else {
if debug {
println!("❌ 消息发送失败: {} - {}", result.errcode, result.errmsg);
log!("❌ 消息发送失败: {} - {}", result.errcode, result.errmsg);
}
Err(format!("消息发送失败: {} - {}", result.errcode, result.errmsg).into())
}
@@ -217,27 +232,27 @@ fn get_wechat_config() -> (String, String, String, String, bool, bool, bool) {
/// 解析企业微信XML消息
fn parse_wechat_xml_message(xml_content: &str, debug: bool) -> (Option<String>, Option<String>, Option<String>, Option<String>) {
if debug {
println!("📄 开始解析企业微信XML消息");
log!("📄 开始解析企业微信XML消息");
}
// 检查是否为加密消息
if xml_content.contains("<Encrypt>") {
if debug {
println!("🔒 发现加密消息,开始解密");
log!("🔒 发现加密消息,开始解密");
}
// 提取Encrypt标签内容
if let Some(encrypt_content) = extract_xml_tag(xml_content, "Encrypt") {
if debug {
println!(" 提取到加密内容");
log!(" 提取到加密内容");
}
// 解密消息
match decrypt_wechat_message(&encrypt_content) {
Ok(decrypted_xml) => {
if debug {
println!("✅ 消息解密成功");
println!(" 解密后内容: {}", decrypted_xml);
log!("✅ 消息解密成功");
log!(" 解密后内容: {}", decrypted_xml);
}
// 从解密后的XML中提取发送者、内容、消息类型和事件类型
@@ -248,16 +263,16 @@ fn parse_wechat_xml_message(xml_content: &str, debug: bool) -> (Option<String>,
if debug {
if let Some(from_user) = &from_user_name {
println!(" 发送者: {}", from_user);
log!(" 发送者: {}", from_user);
}
if let Some(msg_content) = &content {
println!(" 消息内容: {}", msg_content);
log!(" 消息内容: {}", msg_content);
}
if let Some(msg_type_val) = &msg_type {
println!(" 消息类型: {}", msg_type_val);
log!(" 消息类型: {}", msg_type_val);
}
if let Some(event_val) = &event {
println!(" 事件类型: {}", event_val);
log!(" 事件类型: {}", event_val);
}
}
@@ -265,21 +280,21 @@ fn parse_wechat_xml_message(xml_content: &str, debug: bool) -> (Option<String>,
}
Err(e) => {
if debug {
println!("❌ 消息解密失败: {}", e);
log!("❌ 消息解密失败: {}", e);
}
return (None, None, None, None);
}
}
} else {
if debug {
println!("❌ 无法提取Encrypt标签");
log!("❌ 无法提取Encrypt标签");
}
return (None, None, None, None);
}
} else {
// 非加密消息,直接解析
if debug {
println!("🔓 非加密消息,直接解析");
log!("🔓 非加密消息,直接解析");
}
let from_user_name = extract_xml_tag(xml_content, "FromUserName");
let content = extract_xml_tag(xml_content, "Content");
@@ -288,16 +303,16 @@ fn parse_wechat_xml_message(xml_content: &str, debug: bool) -> (Option<String>,
if debug {
if let Some(from_user) = &from_user_name {
println!(" 发送者: {}", from_user);
log!(" 发送者: {}", from_user);
}
if let Some(msg_content) = &content {
println!(" 消息内容: {}", msg_content);
log!(" 消息内容: {}", msg_content);
}
if let Some(msg_type_val) = &msg_type {
println!(" 消息类型: {}", msg_type_val);
log!(" 消息类型: {}", msg_type_val);
}
if let Some(event_val) = &event {
println!(" 事件类型: {}", event_val);
log!(" 事件类型: {}", event_val);
}
}
@@ -408,12 +423,12 @@ impl TaskService {
/// 创建新的任务处理服务
fn new() -> Self {
let connection_manager = Arc::new(RwLock::new(ConnectionManager::new()));
let websocket_pool = WebSocketPool::new();
let websocket_pool = WebSocketPool::new(connection_manager.clone());
let communication_config = CommunicationConfig::default();
println!("🚀 初始化任务处理服务");
println!("📋 WebSocket连接池已创建");
println!("⚙️ 通信配置已加载: {:?}", communication_config.websocket_url);
log!("🚀 初始化任务处理服务");
log!("📋 WebSocket连接池已创建");
log!("⚙️ 通信配置已加载: {:?}", communication_config.websocket_url);
Self {
connection_manager,
@@ -424,10 +439,10 @@ impl TaskService {
/// 处理任务请求 - 现在通过WebSocket发送给内网服务器
async fn process_task(&self, task: TaskRequest) -> TaskResponse {
println!("📝 收到任务请求:");
println!(" 用户ID: {}", task.user_id);
println!(" 任务类型: {}", task.task_type);
println!(" 内容长度: {} 字符", task.content.len());
log!("📝 收到任务请求:");
log!(" 用户ID: {}", task.user_id);
log!(" 任务类型: {}", task.task_type);
log!(" 内容长度: {} 字符", task.content.len());
// 验证任务参数
if task.content.is_empty() {
@@ -442,15 +457,15 @@ impl TaskService {
let task_id = utils::generate_task_id(&task.user_id);
// 通过WebSocket连接发送任务到内网服务器
println!("🚀 通过WebSocket发送任务到内网服务器...");
log!("🚀 通过WebSocket发送任务到内网服务器...");
match self.send_task_via_websocket(task.clone()).await {
Ok(response) => {
println!("✅ 任务处理成功");
log!("✅ 任务处理成功");
response
},
Err(e) => {
println!("❌ WebSocket任务发送失败: {}", e);
println!("🎭 使用模拟响应");
log!("❌ WebSocket任务发送失败: {}", e);
log!("🎭 使用模拟响应");
self.create_mock_response(task_id, task)
}
}
@@ -459,7 +474,7 @@ impl TaskService {
/// 通过WebSocket发送任务到内网服务器
async fn send_task_via_websocket(&self, task: TaskRequest) -> Result<TaskResponse, String> {
// 使用通信配置
println!("⚙️ 使用通信配置 - 心跳间隔: {:?}, 连接超时: {:?}",
log!("⚙️ 使用通信配置 - 心跳间隔: {:?}, 连接超时: {:?}",
self.communication_config.heartbeat_interval,
self.communication_config.connection_timeout);
@@ -477,11 +492,11 @@ impl TaskService {
// 使用WebSocket池广播任务消息
match self.websocket_pool.broadcast(task_message.clone()).await {
Ok(_) => {
println!("📤 任务已通过WebSocket广播到所有连接");
log!("📤 任务已通过WebSocket广播到所有连接");
// 获取连接池统计信息
let pool_stats = self.websocket_pool.get_pool_stats();
println!("📊 WebSocket连接池统计: {}", pool_stats);
log!("📊 WebSocket连接池统计: {}", pool_stats);
// 尝试发送到特定连接(如果有的话)
if let Some(connection_info) = manager.get_all_connections().first() {
@@ -493,16 +508,16 @@ impl TaskService {
match self.websocket_pool.send_to_connection(&connection_info.id, specific_message).await {
Ok(_) => {
println!("📨 任务已发送到特定连接: {}", connection_info.id);
log!("📨 任务已发送到特定连接: {}", connection_info.id);
},
Err(e) => {
println!("⚠️ 发送到特定连接失败: {}", e);
log!("⚠️ 发送到特定连接失败: {}", e);
}
}
}
},
Err(e) => {
println!("⚠️ WebSocket广播失败: {}", e);
log!("⚠️ WebSocket广播失败: {}", e);
}
}
@@ -529,7 +544,7 @@ impl TaskService {
/// 创建模拟响应当WebSocket不可用时
fn create_mock_response(&self, task_id: String, task: TaskRequest) -> TaskResponse {
println!("🎭 创建模拟响应");
log!("🎭 创建模拟响应");
let result = match task.task_type {
shared::TaskType::TextProcessing => {
@@ -592,17 +607,17 @@ impl TaskService {
/// 验证企业微信签名
fn validate_wechat_signature(msg_signature: &str, timestamp: &str, nonce: &str, data: &str, debug: bool) -> bool {
if debug {
println!("🔐 验证企业微信签名:");
println!(" msg_signature: {}", msg_signature);
println!(" timestamp: {}", timestamp);
println!(" nonce: {}", nonce);
println!(" data: {}", data);
log!("🔐 验证企业微信签名:");
log!(" msg_signature: {}", msg_signature);
log!(" timestamp: {}", timestamp);
log!(" nonce: {}", nonce);
log!(" data: {}", data);
}
// 获取企业微信配置
let (token, _corp_id, _encoding_aes_key, _corp_secret, _, _, _) = get_wechat_config();
if debug {
println!(" token: {}", token);
log!(" token: {}", token);
}
// 企业微信签名验证算法(严格按照官方要求)
@@ -613,7 +628,7 @@ impl TaskService {
// 2. 将排序后的参数拼接成一个字符串
let combined = params.join("");
if debug {
println!(" 排序后拼接字符串: {}", combined);
log!(" 排序后拼接字符串: {}", combined);
}
// 3. 进行sha1加密
@@ -625,8 +640,8 @@ impl TaskService {
// 4. 与msg_signature对比
let is_valid = computed_signature == msg_signature;
if debug {
println!(" 计算签名: {}", computed_signature);
println!(" 验证结果: {}", if is_valid { "✅ 通过" } else { "❌ 失败" });
log!(" 计算签名: {}", computed_signature);
log!(" 验证结果: {}", if is_valid { "✅ 通过" } else { "❌ 失败" });
}
is_valid
@@ -634,10 +649,10 @@ impl TaskService {
/// 验证微信小程序签名
fn validate_miniprogram_signature(signature: &str, data: &str, session_key: &str) -> bool {
println!("🔐 验证微信小程序签名:");
println!(" signature: {}", signature);
println!(" data: {}", data);
println!(" session_key: {}", session_key);
log!("🔐 验证微信小程序签名:");
log!(" signature: {}", signature);
log!(" data: {}", data);
log!(" session_key: {}", session_key);
// 微信小程序签名验证算法
// 1. 将session_key和data拼接
@@ -652,8 +667,8 @@ impl TaskService {
// 3. 与signature对比
let is_valid = computed_signature == signature;
println!(" 计算签名: {}", computed_signature);
println!(" 验证结果: {}", if is_valid { "✅ 通过" } else { "❌ 失败" });
log!(" 计算签名: {}", computed_signature);
log!(" 验证结果: {}", if is_valid { "✅ 通过" } else { "❌ 失败" });
is_valid
}
@@ -663,13 +678,13 @@ impl TaskService {
async fn websocket_handler(
req: HttpRequest,
body: web::Payload,
_app_data: web::Data<TaskService>,
app_data: web::Data<TaskService>,
) -> Result<HttpResponse, actix_web::Error> {
println!("🔗 收到WebSocket连接请求: ");
println!(" HttpRequest {:?} {:?}:{}", req.method(), req.version(), req.path());
println!(" headers: ");
log!("🔗 收到WebSocket连接请求: ");
log!(" HttpRequest {:?} {:?}:{}", req.method(), req.version(), req.path());
log!(" headers: ");
for (name, value) in req.headers() {
println!(" {:?}: {:?}", name, value);
log!(" {:?}: {:?}", name, value);
}
// 验证连接来源可以添加API密钥验证
@@ -680,58 +695,129 @@ async fn websocket_handler(
let expected_key = env::var("WEBSOCKET_API_KEY").unwrap_or_else(|_| "claw_secret_key".to_string());
if api_key != expected_key {
println!("❌ WebSocket连接认证失败");
log!("❌ WebSocket连接认证失败");
return Err(actix_web::error::ErrorUnauthorized("Invalid API key"));
}
println!("✅ WebSocket连接认证通过");
log!("✅ WebSocket连接认证通过");
// 获取请求路径,区分连接类型
let path = req.path();
let is_control_connection = path == "/api/v1/ws/control";
if is_control_connection {
println!("🎯 检测到SmartClaw服务连接 (控制通道)");
log!("🎯 检测到SmartClaw服务连接 (控制通道)");
} else {
println!("📱 检测到设备连接 (任务通道)");
log!("📱 检测到设备连接 (任务通道)");
}
println!("🔗 开始WebSocket握手...");
log!("🔗 开始WebSocket握手...");
// 使用actix-ws处理WebSocket连接
let (response, mut session, msg_stream) = match actix_ws::handle(&req, body) {
Ok(result) => {
println!("✅ WebSocket握手成功");
log!("✅ WebSocket握手成功");
result
},
Err(e) => {
println!("❌ WebSocket握手失败: {}", e);
log!("❌ WebSocket握手失败: {}", e);
return Err(e);
}
};
println!("🔄 启动WebSocket消息处理循环...");
// 生成连接ID
let connection_id = uuid::Uuid::new_v4().to_string();
// 添加连接到连接管理器
{{
let mut manager = app_data.connection_manager.write().await;
let connection_info = ConnectionInfo {
id: connection_id.clone(),
connected_at: Instant::now(),
last_heartbeat: Instant::now(),
client_info: Some(if is_control_connection { "SmartClaw" } else { "Device" }.to_string()),
};
manager.add_connection(connection_info);
// 保存会话到WebSocketPool
app_data.websocket_pool.add_session(&connection_id, session.clone()).await;
log!("🔌 创建新的WebSocket连接: id={}, type={}", connection_id, if is_control_connection { "SmartClaw" } else { "Device" });
}}
log!("🔄 启动WebSocket消息处理循环...");
// 启动WebSocket消息处理循环
actix_web::rt::spawn(async move {
println!("✅ WebSocket消息处理循环已启动");
log!("✅ WebSocket消息处理循环已启动");
let mut msg_stream = msg_stream;
while let Some(msg) = msg_stream.next().await {
match msg {
Ok(Message::Text(text)) => {
println!("📨 收到消息: {}", text);
log!("📨 收到消息: {}", text);
// 处理消息
if let Ok(parsed) = serde_json::from_str::<serde_json::Value>(&text) {
match parsed.get("type").and_then(|t| t.as_str()) {
Some("wechat_message_response") => {
// 处理SmartClaw的微信消息回复
log!("📱 收到SmartClaw的微信消息回复");
if let Some(data) = parsed.get("data") {
if let Some(from_user_name) = data.get("from_user_name").and_then(|v| v.as_str()) {
if let Some(content) = data.get("content").and_then(|v| v.as_str()) {
log!(" 回复发送者: {}", from_user_name);
log!(" 回复内容: {}", content);
// 发送回复到企业微信
let from_user_name_clone = from_user_name.to_string();
let content_clone = content.to_string();
tokio::spawn(async move {
if let Err(e) = send_wechat_message(&from_user_name_clone, &content_clone, true).await {
log!("❌ 发送企业微信回复消息失败: {}", e);
} else {
log!("✅ 企业微信回复消息发送成功");
}
});
}
}
}
}
Some("heartbeat") => {
// 处理心跳消息
log!("💓 收到心跳消息");
// 更新心跳时间
let mut manager = app_data.connection_manager.write().await;
manager.update_heartbeat(&connection_id);
}
Some("connect") => {
// 处理连接消息
log!("🔗 收到连接消息");
}
Some(msg_type) => {
log!("❓ 收到未知消息类型: {}", msg_type);
}
None => {
log!("❓ 收到无类型消息");
}
}
}
}
Ok(Message::Binary(bin)) => {
println!("📨 收到二进制消息: {} bytes", bin.len());
log!("📨 收到二进制消息: {} bytes", bin.len());
}
Ok(Message::Ping(msg)) => {
println!("📨 收到Ping");
log!("📨 收到Ping");
let _ = session.pong(&msg).await;
// 更新心跳时间
let mut manager = app_data.connection_manager.write().await;
manager.update_heartbeat(&connection_id);
}
Ok(Message::Pong(_)) => {
println!("📨 收到Pong");
log!("📨 收到Pong");
}
Ok(Message::Close(reason)) => {
println!("📨 收到关闭消息: {:?}", reason);
log!("📨 收到关闭消息: {:?}", reason);
// 从连接管理器中移除连接
let mut manager = app_data.connection_manager.write().await;
manager.remove_connection(&connection_id);
// 从WebSocketPool中移除会话
app_data.websocket_pool.remove_session(&connection_id).await;
log!("🔌 移除WebSocket连接: {}", connection_id);
break;
}
Ok(Message::Continuation(_)) => {
@@ -741,16 +827,22 @@ async fn websocket_handler(
// 处理 nop 消息
}
Err(e) => {
println!("❌ WebSocket错误: {}", e);
log!("❌ WebSocket错误: {}", e);
// 从连接管理器中移除连接
let mut manager = app_data.connection_manager.write().await;
manager.remove_connection(&connection_id);
// 从WebSocketPool中移除会话
app_data.websocket_pool.remove_session(&connection_id).await;
log!("🔌 移除WebSocket连接: {}", connection_id);
break;
}
}
}
println!("🔚 WebSocket连接已关闭");
log!("🔚 WebSocket连接已关闭");
});
println!("✅ WebSocket连接已建立");
log!("✅ WebSocket连接已建立");
Ok(response)
}
@@ -761,7 +853,7 @@ async fn health_check(app_data: web::Data<TaskService>) -> impl Responder {
// 获取连接管理器引用(用于测试)
let manager_ref = app_data.websocket_pool.get_manager_ref();
println!("📋 健康检查 - 连接管理器引用: {:?}", manager_ref.as_ref() as *const _);
log!("📋 健康检查 - 连接管理器引用: {:?}", manager_ref.as_ref() as *const _);
let response = HealthResponse {
status: "healthy".to_string(),
@@ -836,7 +928,7 @@ async fn test_websocket_connection_send(app_data: web::Data<TaskService>) -> imp
}
/// 企业微信回调处理器
async fn handle_wechat_callback(req: HttpRequest, body: web::Bytes) -> impl Responder {
async fn handle_wechat_callback(req: HttpRequest, body: web::Bytes, app_data: web::Data<TaskService>) -> impl Responder {
// 获取企业微信配置包括debug配置
let (_, _, _, _, debug_wechat, _debug_config, _http) = get_wechat_config();
@@ -847,11 +939,11 @@ async fn handle_wechat_callback(req: HttpRequest, body: web::Bytes) -> impl Resp
let query_string = req.query_string();
// 根据debug配置控制日志输出
if debug_wechat {
println!("📱 收到企业微信回调");
println!(" 请求方法: {}", method);
println!(" 查询参数: {}", query_string);
}
if debug_wechat {
log!("📱 收到企业微信回调");
log!(" 请求方法: {}", method);
log!(" 查询参数: {}", query_string);
}
// 解析查询参数
#[derive(Deserialize)]
@@ -865,18 +957,18 @@ async fn handle_wechat_callback(req: HttpRequest, body: web::Bytes) -> impl Resp
let query: WeChatQuery = match web::Query::<WeChatQuery>::from_query(query_string) {
Ok(q) => q.into_inner(),
Err(e) => {
if debug_wechat {
println!("❌ 解析查询参数失败: {}", e);
if debug_wechat {
log!("❌ 解析查询参数失败: {}", e);
}
return HttpResponse::BadRequest().body("error");
}
return HttpResponse::BadRequest().body("error");
}
};
// 核心判断GET 和 POST 必须分开处理
if method == &Method::GET {
// 1. GET请求 = URL 验证
if debug_wechat {
println!("🔐 开始URL验证流程");
log!("🔐 开始URL验证流程");
}
// 验证签名
@@ -889,32 +981,32 @@ async fn handle_wechat_callback(req: HttpRequest, body: web::Bytes) -> impl Resp
);
if !is_valid {
if debug_wechat {
println!("❌ URL验证签名失败");
if debug_wechat {
log!("❌ URL验证签名失败");
}
return HttpResponse::Unauthorized().body("invalid signature");
}
return HttpResponse::Unauthorized().body("invalid signature");
}
// 验证通过返回echostr
if let Some(echostr) = query.echostr {
if debug_wechat {
println!("✅ URL验证成功返回 echostr: {}", echostr);
log!("✅ URL验证成功返回 echostr: {}", echostr);
}
return HttpResponse::Ok().body(echostr);
} else {
if debug_wechat {
println!("❌ URL验证失败缺少echostr参数");
log!("❌ URL验证失败缺少echostr参数");
}
return HttpResponse::BadRequest().body("missing echostr");
}
} else if method == &Method::POST {
// 2. POST请求 = 消息推送
if debug_wechat {
println!("📥 开始消息推送处理流程");
log!("📥 开始消息推送处理流程");
// 处理实际的消息回调
let body_str = String::from_utf8_lossy(&body);
println!(" 消息内容: {}", body_str);
log!(" 消息内容: {}", body_str);
}
// 提取Encrypt字段内容
@@ -923,7 +1015,7 @@ async fn handle_wechat_callback(req: HttpRequest, body: web::Bytes) -> impl Resp
// 验证签名POST请求需要包含Encrypt字段
if debug_wechat {
println!("🔐 开始验证企业微信签名");
log!("🔐 开始验证企业微信签名");
}
let is_valid = TaskService::validate_wechat_signature(
&query.msg_signature,
@@ -934,78 +1026,131 @@ async fn handle_wechat_callback(req: HttpRequest, body: web::Bytes) -> impl Resp
);
if !is_valid {
if debug_wechat {
println!("❌ 消息推送签名验证失败");
if debug_wechat {
log!("❌ 消息推送签名验证失败");
}
return HttpResponse::Unauthorized().body("invalid signature");
}
return HttpResponse::Unauthorized().body("invalid signature");
}
if debug_wechat {
println!("✅ 签名验证通过,开始处理消息");
log!("✅ 签名验证通过,开始处理消息");
}
// 解析XML消息
let (from_user_name, content, msg_type, event) = parse_wechat_xml_message(&body_str, debug_wechat);
// 检查是否有可用的SmartClaw连接
let manager = app_data.connection_manager.read().await;
let connections = manager.get_all_connections();
let has_smartclaw_connections = connections.iter()
.any(|conn| conn.client_info.as_ref().map(|info| info == "SmartClaw").unwrap_or(false));
let has_connections = has_smartclaw_connections;
// 尝试发送回复消息
if let Some(user_id) = from_user_name {
// 判断是否需要回复:只回复文本消息和非位置上报事件
if let Some(ref user_id) = from_user_name {
// 判断是否需要回复:只回复文本消息
let should_reply = match msg_type.as_deref() {
Some("text") => true, // 文本消息需要回复
Some("event") => {
// 事件消息中,除了位置上报事件外都回复
event.as_deref() != Some("LOCATION")
}
Some("text") => true, // 只有文本消息需要回复
_ => false, // 其他类型消息不回复
};
if should_reply {
if debug_wechat {
println!("📤 准备发送回复消息给用户: {}", user_id);
}
if debug_wechat {
log!("📤 准备发送回复消息给用户: {}", user_id);
}
let reply_content = if let Some(msg_content) = content {
format!("我收到你的消息啦!你说: {}", msg_content)
// 根据连接状态选择回复内容
let reply_content = if has_connections {
"思考中..."
} else {
"我收到你的消息啦!".to_string()
"服务器未就绪"
};
// 步发送消息(不阻塞主线程)
let user_id_clone = user_id.clone();
let reply_content_clone = reply_content.clone();
let debug_wechat_clone = debug_wechat;
tokio::spawn(async move {
if let Err(e) = send_wechat_message(&user_id_clone, &reply_content_clone, debug_wechat_clone).await {
if debug_wechat_clone {
println!("❌ 发送回复消息失败: {}", e);
}
// 步发送消息,确保"思考中..."先发送完成
if let Err(e) = send_wechat_message(user_id.as_str(), reply_content, debug_wechat).await {
if debug_wechat {
log!("❌ 发送回复消息失败: {}", e);
}
});
}
if debug_wechat {
println!("✅ 已开始发送回复消息");
}
log!("✅ 已开始发送回复消息: {}", reply_content);
}
} else {
if debug_wechat {
println!("⚠️ 消息类型不需要回复,跳过回复");
log!("⚠️ 消息类型不需要回复,跳过回复");
}
}
} else {
if debug_wechat {
println!("⚠️ 无法获取发送者信息,跳过回复");
log!("⚠️ 无法获取发送者信息,跳过回复");
}
}
// 如果有连接且是文本消息转发消息到SmartClaw
if has_connections && msg_type.as_deref() == Some("text") {
// 转发消息到SmartClaw
if debug_wechat {
log!("🔄 开始转发消息到SmartClaw");
}
// 构建转发消息
let wechat_message = serde_json::json!({
"type": "wechat_message",
"data": {
"from_user_name": from_user_name,
"content": content,
"msg_type": msg_type,
"event": event,
"raw_body": body_str.to_string(),
"timestamp": query.timestamp,
"nonce": query.nonce,
"msg_signature": query.msg_signature
}
});
// 通过WebSocket发送消息到SmartClaw
if debug_wechat {
log!("📤 发送消息到SmartClaw: {:?}", wechat_message);
}
// 异步发送消息到SmartClaw
let app_data_clone = app_data.clone();
let debug_wechat_clone = debug_wechat;
tokio::spawn(async move {
match app_data_clone.websocket_pool.broadcast(wechat_message).await {
Ok(_) => {
if debug_wechat_clone {
log!("✅ 消息已成功转发到SmartClaw");
}
},
Err(e) => {
if debug_wechat_clone {
log!("❌ 转发消息到SmartClaw失败: {}", e);
}
}
}
});
} else if !has_connections {
if debug_wechat {
log!("⚠️ 没有可用的SmartClaw连接跳过消息转发");
}
} else if msg_type.as_deref() != Some("text") {
if debug_wechat {
log!("⚠️ 非文本消息,跳过消息转发");
}
}
// 企业微信要求返回纯文本 "success"
if debug_wechat {
println!("✅ 企业微信消息处理完成,返回 success");
log!("✅ 企业微信消息处理完成,返回 success");
}
HttpResponse::Ok().body("success")
} else {
// 其他请求方法
if debug_wechat {
println!("❌ 不支持的请求方法: {}", method);
log!("❌ 不支持的请求方法: {}", method);
}
HttpResponse::MethodNotAllowed().body("method not allowed")
}