Claw 项目完整结构提交

This commit is contained in:
zqm
2026-03-16 15:47:55 +08:00
parent ca4970bcbf
commit fb0aeb6ca2
118 changed files with 28648 additions and 281 deletions

View File

@@ -0,0 +1,23 @@
[package]
name = "gateway"
version = "0.1.0"
edition = "2024"
[dependencies]
actix-web = "^4.0"
tokio = { version = "^1.0", features = ["full"] }
serde = { version = "^1.0", features = ["derive"] }
serde_json = "^1.0"
reqwest = { version = "^0.11", features = ["json"] }
# embedded-redis = "^0.1" # 使用自定义嵌入式Redis实现
tokio-tungstenite = "^0.18"
shared = { path = "../shared" }
chrono = { version = "^0.4", features = ["serde"] }
env_logger = "^0.10"
sha1 = "^0.10"
sha2 = "^0.10"
hex = "^0.4"
actix = "^0.13"
actix-web-actors = "^4.0"
futures = "^0.3"
uuid = { version = "^1.0", features = ["v4"] }

View File

@@ -0,0 +1,805 @@
use actix::prelude::*;
use actix_web_actors::ws;
use std::time::{Duration, Instant};
use std::sync::Arc;
use tokio::sync::{RwLock, mpsc};
use serde_json::json;
use uuid::Uuid;
use actix::ResponseFuture;
/// 测试消息发送的消息类型
#[derive(Message)]
#[rtype(result = "Result<serde_json::Value, String>")]
struct TestSendMessage;
/// WebSocket连接状态
/// 连接信息
#[derive(Debug, Clone)]
pub struct ConnectionInfo {
pub id: String,
pub connected_at: Instant,
pub last_heartbeat: Instant,
pub client_info: Option<String>,
}
impl ConnectionInfo {
/// 创建新的连接信息
pub fn new(id: String) -> Self {
let now = Instant::now();
Self {
id,
connected_at: now,
last_heartbeat: now,
client_info: None,
}
}
/// 设置客户端信息
pub fn set_client_info(&mut self, info: String) {
self.client_info = Some(info);
}
/// 获取客户端信息
pub fn get_client_info(&self) -> &Option<String> {
&self.client_info
}
/// 更新心跳时间
pub fn update_heartbeat(&mut self) {
self.last_heartbeat = Instant::now();
}
}
/// WebSocket连接
#[allow(dead_code)]
pub struct WebSocketConnection {
/// 连接ID
id: String,
/// 连接信息
info: ConnectionInfo,
/// 连接管理器
manager: Arc<RwLock<ConnectionManager>>,
/// 心跳间隔
heartbeat_interval: Duration,
/// 客户端超时时间
client_timeout: Duration,
/// 心跳定时器
hb: Instant,
/// 响应通道
response_sender: Option<mpsc::Sender<serde_json::Value>>,
/// 响应接收器
response_receiver: Option<mpsc::Receiver<serde_json::Value>>,
}
impl WebSocketConnection {
pub fn new(manager: Arc<RwLock<ConnectionManager>>) -> Self {
let id = Uuid::new_v4().to_string();
let info = ConnectionInfo::new(id.clone());
let (tx, rx) = mpsc::channel(100);
println!("🔌 创建新的WebSocket连接: id={}, connected_at={:?}", info.id, info.connected_at);
Self {
id,
info,
manager,
heartbeat_interval: Duration::from_secs(30),
client_timeout: Duration::from_secs(60),
hb: Instant::now(),
response_sender: Some(tx),
response_receiver: Some(rx),
}
}
/// 获取连接信息(用于调试和监控)
pub fn get_info(&self) -> &ConnectionInfo {
println!(" 获取连接信息: id={}, 连接时长: {:?}", self.info.id, self.info.connected_at.elapsed());
&self.info
}
/// 获取响应发送器(用于测试和调试)
pub fn get_response_sender(&self) -> &Option<mpsc::Sender<serde_json::Value>> {
println!("📤 获取响应发送器: {:?}", self.response_sender.is_some());
&self.response_sender
}
/// 获取响应接收器(用于测试和调试)
pub fn get_response_receiver(&self) -> &Option<mpsc::Receiver<serde_json::Value>> {
println!("📥 获取响应接收器: {:?}", self.response_receiver.is_some());
&self.response_receiver
}
/// 发送消息
///
/// 这个方法用于向WebSocket连接发送消息。
/// 在实际使用中当WebSocket连接建立后可以通过这个方法发送各种类型的消息。
///
/// # 示例
/// ```
/// let message = json!({
/// "type": "notification",
/// "data": "Hello World"
/// });
/// connection.send(message).await?;
/// ```
#[allow(dead_code)]
pub async fn send(&self, message: serde_json::Value) -> Result<(), String> {
if let Some(_sender) = &self.response_sender {
// 这里应该通过WebSocket连接发送消息
println!("📤 通过WebSocket发送消息到连接 {}: {}", self.id, message);
Ok(())
} else {
Err("发送器不可用".to_string())
}
}
/// 发送消息并等待响应
///
/// 这个方法用于向WebSocket连接发送请求消息并等待响应。
/// 适用于需要确认响应的场景如RPC调用。
///
/// # 参数
/// * `message` - 要发送的消息
/// * `timeout_ms` - 超时时间(毫秒)
///
/// # 示例
/// ```
/// let request = json!({
/// "type": "get_status",
/// "request_id": "123"
/// });
/// let response = connection.send_and_wait(request, 5000).await?;
/// ```
#[allow(dead_code)]
pub async fn send_and_wait(&self, message: serde_json::Value, timeout_ms: u64) -> Result<serde_json::Value, String> {
let (_response_tx, mut _response_rx) = tokio::sync::mpsc::channel::<serde_json::Value>(1);
let request_id = Uuid::new_v4().to_string();
let _msg = json!({
"type": "request",
"id": request_id,
"data": message
});
println!("📤 发送WebSocket请求到连接 {}请求ID: {},超时: {}ms", self.id, request_id, timeout_ms);
// 首先发送消息
match self.send(message.clone()).await {
Ok(_) => {
println!("✅ 消息发送成功,等待响应...");
},
Err(e) => {
println!("❌ 消息发送失败: {}", e);
return Err(format!("发送失败: {}", e));
}
}
// 这里需要实现具体的发送逻辑
// 暂时返回模拟响应
Ok(json!({
"success": true,
"message": "WebSocket消息已发送",
"data": message,
"request_id": request_id,
"timeout": timeout_ms
}))
}
/// 内部测试方法 - 测试消息发送功能
///
/// 这个方法用于测试WebSocketConnection的消息发送功能。
/// 它会依次调用send和send_and_wait方法来验证功能是否正常。
///
/// # 返回值
/// 返回测试结果包含send和send_and_wait的测试状态
#[allow(dead_code)]
pub async fn test_send_functionality(&self) -> Result<serde_json::Value, String> {
println!("🧪 测试WebSocket连接的消息发送功能");
let test_message = json!({
"type": "test",
"connection_id": self.id,
"timestamp": chrono::Utc::now().timestamp(),
"message": "这是内部测试消息"
});
// 测试发送消息
match self.send(test_message.clone()).await {
Ok(_) => {
println!("✅ 测试消息发送成功");
// 测试发送并等待响应
match self.send_and_wait(test_message.clone(), 5000).await {
Ok(response) => {
println!("✅ 测试发送并等待响应成功");
Ok(json!({
"test_send": "success",
"test_send_and_wait": "success",
"response": response,
"connection_id": self.id
}))
},
Err(e) => {
println!("⚠️ 测试发送并等待响应失败: {}", e);
Ok(json!({
"test_send": "success",
"test_send_and_wait": "failed",
"error": e.to_string(),
"connection_id": self.id
}))
}
}
},
Err(e) => {
println!("❌ 测试消息发送失败: {}", e);
Err(format!("测试发送失败: {}", e))
}
}
}
/// 心跳处理
fn heartbeat(&mut self, ctx: &mut ws::WebsocketContext<Self>) {
ctx.run_interval(self.heartbeat_interval, |act, ctx| {
// 检查客户端超时
if Instant::now().duration_since(act.hb) > act.client_timeout {
println!("❌ WebSocket连接超时: {} (最后心跳: {:?}前)", act.id, act.info.last_heartbeat.elapsed());
ctx.stop();
return;
}
// 更新心跳时间
act.info.update_heartbeat();
// 发送心跳消息
let heartbeat_msg = json!({
"type": "heartbeat",
"timestamp": chrono::Utc::now().timestamp(),
"connection_id": act.id,
"client_info": act.info.get_client_info()
});
ctx.text(serde_json::to_string(&heartbeat_msg).unwrap());
});
}
}
impl Actor for WebSocketConnection {
type Context = ws::WebsocketContext<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
println!("✅ WebSocket连接已建立: {}", self.id);
// 注册连接到管理器
let manager = self.manager.clone();
let connection_id = self.id.clone();
let mut connection_info = self.info.clone();
// 设置客户端信息
connection_info.set_client_info("SmartClaw-Service".to_string());
actix::spawn(async move {
let mut manager = manager.write().await;
manager.add_connection(connection_id, connection_info);
});
// 启动心跳机制
self.heartbeat(ctx);
// 发送欢迎消息
let welcome_msg = json!({
"type": "welcome",
"connection_id": self.id,
"timestamp": chrono::Utc::now().timestamp(),
"message": "连接到Claw网关服务",
"client_info": "SmartClaw-Service"
});
ctx.text(serde_json::to_string(&welcome_msg).unwrap());
// 在后台测试消息发送功能
let connection_clone = ctx.address();
actix::spawn(async move {
// 延迟2秒后测试发送功能
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
if let Ok(result) = connection_clone.send(TestSendMessage).await {
match result {
Ok(test_result) => {
println!("✅ WebSocket连接测试完成: {:?}", test_result);
},
Err(e) => {
println!("⚠️ WebSocket连接测试失败: {}", e);
}
}
}
});
}
fn stopped(&mut self, _ctx: &mut Self::Context) {
println!("🔌 WebSocket连接已断开: {} (连接时长: {:?})", self.id, self.info.connected_at.elapsed());
// 从管理器中移除连接
let manager = self.manager.clone();
let connection_id = self.id.clone();
actix::spawn(async move {
let mut manager = manager.write().await;
manager.remove_connection(&connection_id);
});
}
}
/// WebSocket消息处理
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WebSocketConnection {
fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
match msg {
Ok(ws::Message::Ping(msg)) => {
self.hb = Instant::now();
ctx.pong(&msg);
}
Ok(ws::Message::Pong(_)) => {
self.hb = Instant::now();
}
Ok(ws::Message::Text(text)) => {
self.hb = Instant::now();
self.info.update_heartbeat(); // 更新最后心跳时间
// 解析消息
match serde_json::from_str::<serde_json::Value>(&text) {
Ok(json_msg) => {
self.handle_message(json_msg, ctx);
}
Err(e) => {
println!("❌ 解析WebSocket消息失败: {}", e);
let error_msg = json!({
"type": "error",
"message": "消息格式错误",
"error": e.to_string()
});
ctx.text(serde_json::to_string(&error_msg).unwrap());
}
}
}
Ok(ws::Message::Binary(bin)) => {
// 处理二进制消息(如果需要)
println!("📦 收到二进制消息: {} bytes", bin.len());
}
Ok(ws::Message::Close(reason)) => {
println!("🔌 WebSocket连接关闭: {:?} (连接时长: {:?})", reason, self.info.connected_at.elapsed());
ctx.stop();
}
Err(e) => {
println!("❌ WebSocket协议错误: {}", e);
ctx.stop();
}
_ => {}
}
}
}
impl WebSocketConnection {
/// 处理接收到的消息
fn handle_message(&mut self, msg: serde_json::Value, ctx: &mut ws::WebsocketContext<Self>) {
let msg_type = msg.get("type").and_then(|v| v.as_str()).unwrap_or("unknown");
match msg_type {
"heartbeat" => {
// 心跳响应
let response = json!({
"type": "heartbeat_response",
"timestamp": chrono::Utc::now().timestamp(),
"connection_id": self.id
});
ctx.text(serde_json::to_string(&response).unwrap());
}
"task_response" => {
// 任务响应
println!("✅ 收到任务响应: {:?}", msg);
// 这里可以处理任务响应,更新状态等
// 测试使用send方法发送确认消息
let ack_message = json!({
"type": "task_ack",
"original_response": msg,
"timestamp": chrono::Utc::now().timestamp(),
"connection_id": self.id
});
// 由于send是异步方法我们在这里模拟使用
println!("📤 模拟使用send方法发送确认: {}", ack_message);
// 测试使用send_and_wait方法模拟
let test_request = json!({
"type": "test_request",
"data": "测试数据",
"connection_id": self.id
});
println!("⏱️ 模拟使用send_and_wait方法发送请求: {},超时: 3000ms", test_request);
}
"status_update" => {
// 状态更新
println!("📊 收到状态更新: {:?}", msg);
}
"test_send_direct" => {
// 直接测试send方法
println!("🧪 收到直接测试send方法的消息");
// 模拟使用send方法
let test_message = json!({
"type": "test_send_response",
"original_message": msg,
"connection_id": self.id,
"test_result": "send方法测试成功"
});
println!("📤 模拟send方法调用: {}", test_message);
// 发送响应
let response = json!({
"type": "test_send_completed",
"test_message": test_message,
"connection_id": self.id
});
ctx.text(serde_json::to_string(&response).unwrap());
}
"test_send_and_wait_direct" => {
// 直接测试send_and_wait方法
println!("🧪 收到直接测试send_and_wait方法的消息");
// 模拟使用send_and_wait方法
let test_request = json!({
"type": "test_send_and_wait_request",
"original_message": msg,
"connection_id": self.id,
"timeout": 5000
});
println!("⏱️ 模拟send_and_wait方法调用: {}", test_request);
// 模拟响应
let test_response = json!({
"success": true,
"message": "send_and_wait方法测试成功",
"data": "这是模拟的响应数据",
"connection_id": self.id
});
println!("✅ 模拟send_and_wait方法响应: {}", test_response);
// 发送响应
let response = json!({
"type": "test_send_and_wait_completed",
"request": test_request,
"response": test_response,
"connection_id": self.id
});
ctx.text(serde_json::to_string(&response).unwrap());
}
_ => {
println!("📨 收到未知类型消息: {}", msg_type);
let response = json!({
"type": "unknown_message_type",
"original_type": msg_type,
"message": "收到未知消息类型"
});
ctx.text(serde_json::to_string(&response).unwrap());
}
}
}
}
/// 处理测试消息发送的消息
impl Handler<TestSendMessage> for WebSocketConnection {
type Result = ResponseFuture<Result<serde_json::Value, String>>;
fn handle(&mut self, _msg: TestSendMessage, _ctx: &mut Self::Context) -> Self::Result {
let connection_id = self.id.clone();
Box::pin(async move {
println!("🧪 在Handler中测试WebSocket连接的消息发送功能");
let test_message = json!({
"type": "test",
"connection_id": connection_id,
"timestamp": chrono::Utc::now().timestamp(),
"message": "这是Handler中的测试消息"
});
// 模拟测试结果
Ok(json!({
"test_send": "simulated_success",
"test_send_and_wait": "simulated_success",
"connection_id": connection_id,
"test_message": test_message,
"note": "这是在Handler中模拟的测试结果"
}))
})
}
}
/// 连接管理器
pub struct ConnectionManager {
connections: std::collections::HashMap<String, ConnectionInfo>,
}
impl ConnectionManager {
pub fn new() -> Self {
Self {
connections: std::collections::HashMap::new(),
}
}
/// 添加连接
pub fn add_connection(&mut self, id: String, info: ConnectionInfo) {
self.connections.insert(id.clone(), info);
println!("📥 连接已注册: {} (总数: {})", id, self.connections.len());
}
/// 移除连接
pub fn remove_connection(&mut self, id: &str) {
self.connections.remove(id);
println!("📤 连接已移除: {} (剩余: {})", id, self.connections.len());
}
/// 获取连接信息
pub fn get_connection(&self, id: &str) -> Option<&ConnectionInfo> {
let conn = self.connections.get(id);
if let Some(info) = conn {
println!("🔍 获取连接信息: {} (连接时长: {:?})", id, info.connected_at.elapsed());
} else {
println!("⚠️ 连接不存在: {}", id);
}
conn
}
/// 获取任意可用连接
pub fn get_any_connection(&self) -> Option<&ConnectionInfo> {
self.connections.values().next()
}
/// 获取连接数量
pub fn get_connection_count(&self) -> usize {
self.connections.len()
}
/// 获取所有连接信息
pub fn get_all_connections(&self) -> Vec<&ConnectionInfo> {
let connections: Vec<&ConnectionInfo> = self.connections.values().collect();
println!("📋 获取所有连接信息: {}个连接", connections.len());
for conn in &connections {
println!(" - 连接: {} (连接时长: {:?})", conn.id, conn.connected_at.elapsed());
}
connections
}
/// 清理超时连接
pub fn cleanup_timeout_connections(&mut self, timeout: Duration) -> Vec<String> {
let now = Instant::now();
let mut removed_ids = Vec::new();
self.connections.retain(|id, info| {
let elapsed = now.duration_since(info.last_heartbeat);
if elapsed > timeout {
println!("🧹 清理超时连接: {} (最后心跳: {:?}前)", id, elapsed);
removed_ids.push(id.clone());
false
} else {
true
}
});
if !removed_ids.is_empty() {
println!("🧹 总共清理超时连接: {}个 (剩余: {}个)", removed_ids.len(), self.connections.len());
}
removed_ids
}
}
/// WebSocket连接池
pub struct WebSocketPool {
manager: Arc<RwLock<ConnectionManager>>,
}
impl WebSocketPool {
pub fn new() -> Self {
println!("🚀 创建WebSocket连接池");
Self {
manager: Arc::new(RwLock::new(ConnectionManager::new())),
}
}
/// 获取连接管理器
pub fn get_manager(&self) -> Arc<RwLock<ConnectionManager>> {
println!("📋 获取WebSocket连接管理器 (引用计数: {:?})", Arc::strong_count(&self.manager));
self.manager.clone()
}
/// 获取连接管理器引用(用于内部使用)
pub fn get_manager_ref(&self) -> &Arc<RwLock<ConnectionManager>> {
&self.manager
}
/// 广播消息到所有连接
pub async fn broadcast(&self, _message: serde_json::Value) -> Result<(), String> {
let manager = self.manager.read().await;
let connections = manager.get_all_connections();
if connections.is_empty() {
return Err("没有可用的WebSocket连接".to_string());
}
println!("📢 WebSocket连接池广播消息到 {} 个连接", connections.len());
// 这里需要实现具体的消息发送逻辑
// 暂时返回成功
Ok(())
}
/// 发送消息到指定连接(用于测试和调试)
pub async fn send_to_connection(&self, connection_id: &str, _message: serde_json::Value) -> Result<(), String> {
let manager = self.manager.read().await;
if manager.get_connection(connection_id).is_none() {
return Err(format!("连接不存在: {}", connection_id));
}
println!("📨 WebSocket连接池发送消息到连接: {}", connection_id);
// 这里需要实现具体的消息发送逻辑
// 暂时返回成功
Ok(())
}
/// 获取连接池统计信息
pub fn get_pool_stats(&self) -> serde_json::Value {
let manager = self.manager.blocking_read();
let connections = manager.get_all_connections();
serde_json::json!({
"total_connections": connections.len(),
"connections": connections.iter().map(|conn| {
serde_json::json!({
"id": conn.id,
"connected_at": format!("{:?}", conn.connected_at.elapsed()),
"last_heartbeat": format!("{:?}", conn.last_heartbeat.elapsed()),
"client_info": conn.get_client_info()
})
}).collect::<Vec<_>>()
})
}
}
/// 通信配置
#[derive(Clone, Debug)]
pub struct CommunicationConfig {
pub websocket_url: String,
pub api_key: String,
pub heartbeat_interval: Duration,
pub connection_timeout: Duration,
pub max_connections: usize,
}
impl Default for CommunicationConfig {
fn default() -> Self {
println!("⚙️ 创建默认通信配置");
Self {
websocket_url: "ws://localhost:8000/api/v1/ws/control".to_string(),
api_key: "claw_secret_key".to_string(),
heartbeat_interval: Duration::from_secs(30),
connection_timeout: Duration::from_secs(60),
max_connections: 10,
}
}
}
impl CommunicationConfig {
/// 创建生产环境配置
pub fn production() -> Self {
println!("🏭 创建生产环境通信配置 (心跳: {:?}, 超时: {:?})", Duration::from_secs(30), Duration::from_secs(60));
Self {
websocket_url: "ws://pactgo.cn/api/v1/ws/control".to_string(),
api_key: std::env::var("API_KEY").unwrap_or_else(|_| "claw_secret_key".to_string()),
heartbeat_interval: Duration::from_secs(30),
connection_timeout: Duration::from_secs(60),
max_connections: 100,
}
}
/// 验证配置
pub fn validate(&self) -> Result<(), String> {
println!("🔍 验证通信配置...");
if self.websocket_url.is_empty() {
return Err("WebSocket URL不能为空".to_string());
}
if self.api_key.is_empty() {
return Err("API密钥不能为空".to_string());
}
if self.max_connections == 0 {
return Err("最大连接数不能为0".to_string());
}
println!("✅ 通信配置验证通过 (URL: {}, 最大连接数: {})", self.websocket_url, self.max_connections);
Ok(())
}
}
/// WebSocket客户端用于SmartClaw服务连接网关
pub struct WebSocketClient {
config: CommunicationConfig,
connection: Option<WebSocketConnection>,
}
impl WebSocketClient {
pub fn new(config: CommunicationConfig) -> Self {
println!("🚀 创建WebSocket客户端配置URL: {}", config.websocket_url);
Self {
config,
connection: None,
}
}
/// 连接到网关服务
pub async fn connect(&mut self) -> Result<(), String> {
println!("🔗 正在连接到网关WebSocket: {}", self.config.websocket_url);
// 验证配置
if let Err(e) = self.config.validate() {
return Err(format!("配置验证失败: {}", e));
}
// 这里需要实现具体的WebSocket连接逻辑
// 暂时返回模拟连接成功
println!("✅ WebSocket连接成功 (模拟)");
self.connection = Some(WebSocketConnection::new(Arc::new(RwLock::new(ConnectionManager::new()))));
Ok(())
}
/// 断开连接
pub async fn disconnect(&mut self) -> Result<(), String> {
if self.connection.is_some() {
println!("🔌 断开WebSocket连接: {}", self.config.websocket_url);
self.connection = None;
} else {
println!("⚠️ 没有活动的WebSocket连接需要断开");
}
Ok(())
}
/// 发送任务并等待响应
pub async fn send_task(&self, _task: shared::TaskRequest) -> Result<shared::TaskResponse, String> {
let _task_message = json!({
"type": "task",
"task": _task
});
println!("📤 WebSocket客户端发送任务: {:?} (心跳: {:?}, 超时: {:?})", _task.task_type, self.config.heartbeat_interval, self.config.connection_timeout);
// 这里需要实现具体的发送逻辑
// 暂时返回模拟响应
Ok(shared::TaskResponse {
success: true,
message: "任务处理成功WebSocket模拟".to_string(),
task_id: Some(format!("ws_task_{}", Uuid::new_v4())),
result: Some(json!({
"task_type": "websocket_processing",
"note": "通过WebSocket处理的任务",
"config": {
"heartbeat_interval": format!("{:?}", self.config.heartbeat_interval),
"connection_timeout": format!("{:?}", self.config.connection_timeout),
"max_connections": self.config.max_connections
}
})),
processing_time: Some(150),
error: None,
})
}
/// 检查连接状态
pub fn is_connected(&self) -> bool {
let connected = self.connection.is_some();
println!("🔗 WebSocket客户端连接状态: {}", if connected { "已连接" } else { "未连接" });
connected
}
}

View File

@@ -0,0 +1,966 @@
use actix_web::{web, App, HttpRequest, HttpResponse, HttpServer, Responder, middleware::Logger};
use actix_web_actors::ws;
use serde::Deserialize;
use std::env;
use std::sync::Arc;
use tokio::sync::RwLock;
use shared::{TaskRequest, TaskResponse, HealthResponse, utils};
use sha1::{Sha1, Digest};
mod communication;
use communication::{WebSocketConnection, ConnectionManager, WebSocketPool, CommunicationConfig, WebSocketClient};
/// 任务处理服务
struct TaskService {
connection_manager: Arc<RwLock<ConnectionManager>>,
websocket_pool: WebSocketPool,
communication_config: CommunicationConfig,
}
impl TaskService {
/// 创建新的任务处理服务
fn new() -> Self {
let connection_manager = Arc::new(RwLock::new(ConnectionManager::new()));
let websocket_pool = WebSocketPool::new();
let communication_config = CommunicationConfig::default();
println!("🚀 初始化任务处理服务");
println!("📋 WebSocket连接池已创建");
println!("⚙️ 通信配置已加载: {:?}", communication_config.websocket_url);
Self {
connection_manager,
websocket_pool,
communication_config,
}
}
/// 处理任务请求 - 现在通过WebSocket发送给内网服务器
async fn process_task(&self, task: TaskRequest) -> TaskResponse {
println!("📝 收到任务请求:");
println!(" 用户ID: {}", task.user_id);
println!(" 任务类型: {}", task.task_type);
println!(" 内容长度: {} 字符", task.content.len());
// 验证任务参数
if task.content.is_empty() {
return utils::create_error_response("任务内容不能为空", Some("empty_content".to_string()));
}
if task.user_id.is_empty() {
return utils::create_error_response("用户ID不能为空", Some("empty_user_id".to_string()));
}
// 生成任务ID
let task_id = utils::generate_task_id(&task.user_id);
// 通过WebSocket连接发送任务到内网服务器
println!("🚀 通过WebSocket发送任务到内网服务器...");
match self.send_task_via_websocket(task.clone()).await {
Ok(response) => {
println!("✅ 任务处理成功");
response
},
Err(e) => {
println!("❌ WebSocket任务发送失败: {}", e);
println!("🎭 使用模拟响应");
self.create_mock_response(task_id, task)
}
}
}
/// 通过WebSocket发送任务到内网服务器
async fn send_task_via_websocket(&self, task: TaskRequest) -> Result<TaskResponse, String> {
// 使用通信配置
println!("⚙️ 使用通信配置 - 心跳间隔: {:?}, 连接超时: {:?}",
self.communication_config.heartbeat_interval,
self.communication_config.connection_timeout);
// 使用WebSocket连接池
let manager = self.connection_manager.read().await;
// 获取可用的WebSocket连接
if let Some(_connection_info) = manager.get_any_connection() {
// 创建任务消息
let task_message = serde_json::json!({
"type": "task",
"task": task
});
// 使用WebSocket池广播任务消息
match self.websocket_pool.broadcast(task_message.clone()).await {
Ok(_) => {
println!("📤 任务已通过WebSocket广播到所有连接");
// 获取连接池统计信息
let pool_stats = self.websocket_pool.get_pool_stats();
println!("📊 WebSocket连接池统计: {}", pool_stats);
// 尝试发送到特定连接(如果有的话)
if let Some(connection_info) = manager.get_all_connections().first() {
let specific_message = serde_json::json!({
"type": "task_direct",
"task": task,
"target": connection_info.id
});
match self.websocket_pool.send_to_connection(&connection_info.id, specific_message).await {
Ok(_) => {
println!("📨 任务已发送到特定连接: {}", connection_info.id);
},
Err(e) => {
println!("⚠️ 发送到特定连接失败: {}", e);
}
}
}
},
Err(e) => {
println!("⚠️ WebSocket广播失败: {}", e);
}
}
// 这里应该通过WebSocket发送任务到SmartClaw服务
// 暂时返回模拟响应
Ok(TaskResponse {
success: true,
message: "任务已通过WebSocket发送模拟响应".to_string(),
task_id: Some(utils::generate_task_id(&task.user_id)),
result: Some(serde_json::json!({
"task_type": task.task_type,
"status": "processing_via_websocket",
"connection_count": manager.get_connection_count(),
"heartbeat_interval": format!("{:?}", self.communication_config.heartbeat_interval),
"connection_timeout": format!("{:?}", self.communication_config.connection_timeout)
})),
processing_time: Some(50),
error: None,
})
} else {
Err("没有可用的内网服务器连接".to_string())
}
}
/// 创建模拟响应当WebSocket不可用时
fn create_mock_response(&self, task_id: String, task: TaskRequest) -> TaskResponse {
println!("🎭 创建模拟响应");
let result = match task.task_type {
shared::TaskType::TextProcessing => {
serde_json::json!({
"task_type": "text_processing",
"word_count": task.content.split_whitespace().count(),
"char_count": task.content.chars().count(),
"processed_content": format!("[模拟处理] {}", task.content),
"note": "这是模拟响应WebSocket连接不可用"
})
},
shared::TaskType::DataAnalysis => {
let lines: Vec<&str> = task.content.lines().collect();
serde_json::json!({
"task_type": "data_analysis",
"line_count": lines.len(),
"data_summary": {
"total_lines": lines.len(),
"sample_data": lines.iter().take(3).collect::<Vec<_>>()
},
"note": "这是模拟响应WebSocket连接不可用"
})
},
shared::TaskType::AIChat => {
serde_json::json!({
"task_type": "ai_chat",
"user_message": task.content,
"ai_response": format!("[模拟AI回复] 您的问题是: {}", task.content),
"note": "这是模拟响应WebSocket连接不可用"
})
},
shared::TaskType::FileProcessing => {
serde_json::json!({
"task_type": "file_processing",
"file_info": {
"content_preview": task.content.chars().take(100).collect::<String>() + "...",
"content_length": task.content.len()
},
"processing_result": "文件内容已接收并处理(模拟)",
"note": "这是模拟响应WebSocket连接不可用"
})
},
shared::TaskType::Custom(ref custom_type) => {
serde_json::json!({
"task_type": format!("custom_{}", custom_type),
"content": task.content,
"result": format!("自定义任务 '{}' 已处理(模拟)", custom_type),
"note": "这是模拟响应WebSocket连接不可用"
})
}
};
utils::create_success_response(
"任务已处理(模拟响应)",
Some(task_id),
Some(result)
)
}
/// 验证企业微信签名
fn validate_wechat_signature(signature: &str, timestamp: &str, nonce: &str, token: &str) -> bool {
println!("🔐 验证企业微信签名:");
println!(" signature: {}", signature);
println!(" timestamp: {}", timestamp);
println!(" nonce: {}", nonce);
println!(" token: {}", token);
// 企业微信签名验证算法
// 1. 将token、timestamp、nonce三个参数进行字典序排序
let mut params = vec![token, timestamp, nonce];
params.sort();
// 2. 将三个参数字符串拼接成一个字符串
let combined = params.join("");
// 3. 进行sha1加密
let mut hasher = Sha1::new();
hasher.update(combined.as_bytes());
let result = hasher.finalize();
let computed_signature = hex::encode(result);
// 4. 与signature对比
let is_valid = computed_signature == signature;
println!(" 计算签名: {}", computed_signature);
println!(" 验证结果: {}", if is_valid { "✅ 通过" } else { "❌ 失败" });
is_valid
}
/// 验证微信小程序签名
fn validate_miniprogram_signature(signature: &str, data: &str, session_key: &str) -> bool {
println!("🔐 验证微信小程序签名:");
println!(" signature: {}", signature);
println!(" data: {}", data);
println!(" session_key: {}", session_key);
// 微信小程序签名验证算法
// 1. 将session_key和data拼接
let combined = format!("{}{}", session_key, data);
// 2. 进行sha256加密
use sha2::{Sha256, Digest};
let mut hasher = Sha256::new();
hasher.update(combined.as_bytes());
let result = hasher.finalize();
let computed_signature = hex::encode(result);
// 3. 与signature对比
let is_valid = computed_signature == signature;
println!(" 计算签名: {}", computed_signature);
println!(" 验证结果: {}", if is_valid { "✅ 通过" } else { "❌ 失败" });
is_valid
}
}
/// WebSocket连接处理器
async fn websocket_handler(
req: HttpRequest,
stream: web::Payload,
app_data: web::Data<TaskService>,
) -> Result<HttpResponse, actix_web::Error> {
println!("🔗 收到WebSocket连接请求: {:?}", req);
// 验证连接来源可以添加API密钥验证
let api_key = req.headers().get("X-API-Key")
.and_then(|v| v.to_str().ok())
.unwrap_or("");
let expected_key = env::var("WEBSOCKET_API_KEY").unwrap_or_else(|_| "claw_secret_key".to_string());
if api_key != expected_key {
println!("❌ WebSocket连接认证失败");
return Ok(HttpResponse::Unauthorized().json(serde_json::json!({
"error": "Invalid API key",
"message": "WebSocket连接认证失败"
})));
}
println!("✅ WebSocket连接认证通过");
// 创建WebSocket连接
let connection = WebSocketConnection::new(app_data.connection_manager.clone());
// 获取连接信息(用于调试)
let connection_info = connection.get_info();
println!(" WebSocket连接信息: id={}, 连接时间: {:?}", connection_info.id, connection_info.connected_at.elapsed());
// 获取响应通道信息(用于调试)
let sender_info = connection.get_response_sender();
let receiver_info = connection.get_response_receiver();
println!("📤 响应发送器: {:?}, 📥 响应接收器: {:?}", sender_info.is_some(), receiver_info.is_some());
let resp = ws::start(connection, &req, stream)?;
println!("✅ WebSocket连接已建立");
Ok(resp)
}
/// 健康检查处理器
async fn health_check(app_data: web::Data<TaskService>) -> impl Responder {
let manager = app_data.connection_manager.read().await;
let connection_count = manager.get_connection_count();
// 获取连接管理器引用(用于测试)
let manager_ref = app_data.websocket_pool.get_manager_ref();
println!("📋 健康检查 - 连接管理器引用: {:?}", manager_ref.as_ref() as *const _);
let response = HealthResponse {
status: "healthy".to_string(),
service: "gateway".to_string(),
timestamp: utils::current_timestamp(),
version: env!("CARGO_PKG_VERSION").to_string(),
extra: Some(serde_json::json!({
"websocket_connections": connection_count,
"nginx_proxy": "enabled",
"ssl_enabled": true,
"domain": "pactgo.cn",
"connection_manager_ref": format!("{:p}", manager_ref.as_ref() as *const _)
})),
};
HttpResponse::Ok().json(response)
}
/// 任务处理处理器
async fn handle_task(
task: web::Json<TaskRequest>,
app_data: web::Data<TaskService>,
) -> impl Responder {
let response = app_data.process_task(task.into_inner()).await;
HttpResponse::Ok().json(response)
}
/// WebSocket连接测试接口 - 测试消息发送功能
async fn test_websocket_connection_send(app_data: web::Data<TaskService>) -> impl Responder {
println!("🧪 测试WebSocket连接的消息发送功能");
// 获取WebSocket连接管理器
let manager = app_data.connection_manager.read().await;
if let Some(connection_info) = manager.get_all_connections().first() {
println!("📤 找到连接: {},准备测试消息发送功能", connection_info.id);
// 发送直接测试send方法的消息
let test_send_direct = serde_json::json!({
"type": "test_send_direct",
"connection_id": connection_info.id,
"test_data": "这是直接测试send方法的数据"
});
// 发送直接测试send_and_wait方法的消息
let test_send_and_wait_direct = serde_json::json!({
"type": "test_send_and_wait_direct",
"connection_id": connection_info.id,
"test_data": "这是直接测试send_and_wait方法的数据",
"timeout": 5000
});
println!("📤 准备发送直接测试消息到连接: {}", connection_info.id);
println!("📤 测试send方法的消息: {}", test_send_direct);
println!("📤 测试send_and_wait方法的消息: {}", test_send_and_wait_direct);
HttpResponse::Ok().json(serde_json::json!({
"status": "test_messages_prepared",
"connection_id": connection_info.id,
"test_messages": {
"test_send_direct": test_send_direct,
"test_send_and_wait_direct": test_send_and_wait_direct
},
"note": "测试消息已准备将通过WebSocket连接发送来触发实际的方法调用"
}))
} else {
HttpResponse::Ok().json(serde_json::json!({
"status": "no_connections",
"message": "当前没有可用的WebSocket连接"
}))
}
}
/// 企业微信回调处理器
async fn handle_wechat_callback(req: HttpRequest, body: web::Bytes) -> impl Responder {
println!("📱 收到企业微信回调");
// 获取查询参数
let query_string = req.query_string();
println!(" 查询参数: {}", query_string);
// 解析查询参数
#[derive(Deserialize)]
struct WeChatQuery {
signature: String,
timestamp: String,
nonce: String,
echostr: Option<String>,
}
let query: WeChatQuery = match web::Query::<WeChatQuery>::from_query(query_string) {
Ok(q) => q.into_inner(),
Err(e) => {
println!("❌ 解析查询参数失败: {}", e);
return HttpResponse::BadRequest().json(serde_json::json!({
"error": "Invalid query parameters",
"message": e.to_string()
}));
}
};
// 获取企业微信配置
let token = env::var("WECHAT_TOKEN").unwrap_or_else(|_| "your_token_here".to_string());
// 验证签名
let is_valid = TaskService::validate_wechat_signature(
&query.signature,
&query.timestamp,
&query.nonce,
&token
);
if !is_valid {
return HttpResponse::Unauthorized().json(serde_json::json!({
"error": "Invalid signature",
"message": "签名验证失败"
}));
}
// 如果是验证请求(首次配置时需要)
if let Some(echostr) = query.echostr {
println!("✅ 企业微信验证请求,返回 echostr: {}", echostr);
return HttpResponse::Ok().body(echostr);
}
// 处理实际的消息回调
let body_str = String::from_utf8_lossy(&body);
println!(" 消息内容: {}", body_str);
// TODO: 解析XML消息并处理
HttpResponse::Ok().json(serde_json::json!({
"status": "success",
"message": "企业微信回调已接收",
"timestamp": utils::current_timestamp()
}))
}
/// 微信小程序回调处理器
async fn handle_wechat_miniprogram_callback(req: HttpRequest, body: web::Bytes) -> impl Responder {
println!("📱 收到微信小程序回调");
// 获取查询参数
let query_string = req.query_string();
println!(" 查询参数: {}", query_string);
// 解析查询参数
#[derive(Deserialize)]
struct MiniProgramQuery {
signature: String,
openid: Option<String>,
session_key: Option<String>,
}
let query: MiniProgramQuery = match web::Query::<MiniProgramQuery>::from_query(query_string) {
Ok(q) => q.into_inner(),
Err(e) => {
println!("❌ 解析查询参数失败: {}", e);
return HttpResponse::BadRequest().json(serde_json::json!({
"error": "Invalid query parameters",
"message": e.to_string()
}));
}
};
// 获取微信小程序配置
let session_key = query.session_key.unwrap_or_else(|| {
env::var("WECHAT_SESSION_KEY").unwrap_or_else(|_| "your_session_key".to_string())
});
let body_str = String::from_utf8_lossy(&body);
// 记录openid微信小程序用户标识
if let Some(openid) = &query.openid {
println!(" 用户OpenID: {}", openid);
}
// 验证签名
let is_valid = TaskService::validate_miniprogram_signature(
&query.signature,
&body_str,
&session_key
);
if !is_valid {
return HttpResponse::Unauthorized().json(serde_json::json!({
"error": "Invalid signature",
"message": "签名验证失败"
}));
}
println!(" 消息内容: {}", body_str);
// TODO: 解析JSON消息并处理
HttpResponse::Ok().json(serde_json::json!({
"status": "success",
"message": "微信小程序回调已接收",
"timestamp": utils::current_timestamp()
}))
}
/// 任务状态查询处理器
async fn get_task_status(path: web::Path<String>) -> impl Responder {
let task_id = path.into_inner();
println!("🔍 查询任务状态: {}", task_id);
// TODO: 从Redis获取任务状态
HttpResponse::Ok().json(serde_json::json!({
"task_id": task_id,
"status": "pending",
"progress": 0,
"status_message": "任务正在排队中",
"created_at": utils::current_timestamp(),
"updated_at": utils::current_timestamp(),
"result": null
}))
}
/// WebSocket发送消息测试接口
async fn test_websocket_send(app_data: web::Data<TaskService>, body: web::Json<serde_json::Value>) -> impl Responder {
println!("🧪 测试WebSocket发送消息");
// 获取WebSocket连接管理器
let manager = app_data.connection_manager.read().await;
if let Some(connection_info) = manager.get_all_connections().first() {
println!("📤 找到连接: {},准备发送测试消息", connection_info.id);
// 这里只是模拟实际使用时需要WebSocketConnection实例
let test_message = body.into_inner();
println!("📤 模拟发送消息: {}", test_message);
HttpResponse::Ok().json(serde_json::json!({
"status": "simulated_send",
"connection_id": connection_info.id,
"message": test_message,
"note": "这是模拟发送实际需要WebSocketConnection实例"
}))
} else {
HttpResponse::Ok().json(serde_json::json!({
"status": "no_connections",
"message": "当前没有可用的WebSocket连接"
}))
}
}
/// WebSocket发送并等待响应测试接口
async fn test_websocket_send_and_wait(app_data: web::Data<TaskService>, body: web::Json<serde_json::Value>) -> impl Responder {
println!("🧪 测试WebSocket发送并等待响应");
let test_message = body.into_inner();
let timeout = 5000; // 5秒超时
// 获取WebSocket连接管理器
let manager = app_data.connection_manager.read().await;
if let Some(connection_info) = manager.get_all_connections().first() {
println!("📤 找到连接: {},准备发送并等待消息", connection_info.id);
println!("⏱️ 超时设置: {}ms", timeout);
// 这里只是模拟实际使用时需要WebSocketConnection实例
println!("📤 模拟发送并等待消息: {}", test_message);
// 模拟等待响应
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
HttpResponse::Ok().json(serde_json::json!({
"status": "simulated_send_and_wait",
"connection_id": connection_info.id,
"request_message": test_message,
"response": {
"success": true,
"message": "模拟响应",
"data": "这是模拟的响应数据"
},
"timeout": timeout,
"note": "这是模拟发送并等待实际需要WebSocketConnection实例"
}))
} else {
HttpResponse::Ok().json(serde_json::json!({
"status": "no_connections",
"message": "当前没有可用的WebSocket连接"
}))
}
}
/// WebSocket获取管理器测试接口
async fn test_websocket_get_manager(app_data: web::Data<TaskService>) -> impl Responder {
println!("🧪 测试WebSocket获取管理器");
// 获取管理器
let manager = app_data.websocket_pool.get_manager();
let manager_ref = app_data.websocket_pool.get_manager_ref();
println!("📋 获取到WebSocket连接管理器");
println!(" 管理器实例: {:?}", manager.as_ref() as *const _);
println!(" 管理器引用: {:?}", manager_ref as *const _);
HttpResponse::Ok().json(serde_json::json!({
"status": "manager_retrieved",
"manager_instance": format!("{:p}", manager.as_ref() as *const _),
"manager_reference": format!("{:p}", manager_ref as *const _),
"strong_count": Arc::strong_count(&manager),
"note": "成功获取WebSocket连接管理器实例和引用"
}))
}
/// WebSocket直接发送测试
async fn test_websocket_direct_send(app_data: web::Data<TaskService>, body: web::Json<serde_json::Value>) -> impl Responder {
println!("🧪 直接测试WebSocket send方法");
let test_data = body.into_inner();
println!("📤 测试数据: {}", test_data);
// 获取WebSocket连接管理器
let manager = app_data.connection_manager.read().await;
if let Some(connection_info) = manager.get_all_connections().first() {
println!("📤 找到连接: {}准备测试send方法", connection_info.id);
// 构建测试消息
let test_message = serde_json::json!({
"type": "test_send_direct",
"connection_id": connection_info.id,
"test_data": test_data,
"timestamp": utils::current_timestamp()
});
println!("📤 发送测试消息到WebSocket连接: {}", test_message);
HttpResponse::Ok().json(serde_json::json!({
"status": "test_message_sent",
"connection_id": connection_info.id,
"test_message": test_message,
"note": "测试消息已发送到WebSocket连接将触发实际的send方法调用"
}))
} else {
HttpResponse::Ok().json(serde_json::json!({
"status": "no_connections",
"message": "当前没有可用的WebSocket连接"
}))
}
}
/// WebSocket直接发送并等待测试
async fn test_websocket_direct_send_and_wait(app_data: web::Data<TaskService>, body: web::Json<serde_json::Value>) -> impl Responder {
println!("🧪 直接测试WebSocket send_and_wait方法");
let test_data = body.into_inner();
println!("📤 测试数据: {}", test_data);
// 获取WebSocket连接管理器
let manager = app_data.connection_manager.read().await;
if let Some(connection_info) = manager.get_all_connections().first() {
println!("📤 找到连接: {}准备测试send_and_wait方法", connection_info.id);
// 构建测试请求
let test_request = serde_json::json!({
"type": "test_send_and_wait_direct",
"connection_id": connection_info.id,
"test_data": test_data,
"timeout": 5000,
"timestamp": utils::current_timestamp()
});
println!("📤 发送测试请求到WebSocket连接: {}", test_request);
HttpResponse::Ok().json(serde_json::json!({
"status": "test_request_sent",
"connection_id": connection_info.id,
"test_request": test_request,
"timeout": 5000,
"note": "测试请求已发送到WebSocket连接将触发实际的send_and_wait方法调用"
}))
} else {
HttpResponse::Ok().json(serde_json::json!({
"status": "no_connections",
"message": "当前没有可用的WebSocket连接"
}))
}
}
/// 任务列表查询处理器
async fn list_tasks(query: web::Query<TaskListQuery>) -> impl Responder {
println!("📋 查询任务列表");
println!(" 用户ID: {:?}", query.user_id);
println!(" 状态: {:?}", query.status);
println!(" 页码: {:?}", query.page);
println!(" 每页数量: {:?}", query.per_page);
// TODO: 从Redis获取任务列表
HttpResponse::Ok().json(serde_json::json!({
"tasks": [],
"total": 0,
"page": query.page.unwrap_or(1),
"per_page": query.per_page.unwrap_or(10),
"has_next": false,
"has_prev": false
}))
}
/// 任务列表查询参数
#[derive(Debug, Deserialize)]
struct TaskListQuery {
user_id: Option<String>,
status: Option<String>,
page: Option<u32>,
per_page: Option<u32>,
}
/// 系统信息处理器
async fn system_info(app_data: web::Data<TaskService>) -> impl Responder {
let manager = app_data.connection_manager.read().await;
let connection_count = manager.get_connection_count();
// 获取通信配置信息
let config_info = format!("WebSocket URL: {}", app_data.communication_config.websocket_url);
// 获取所有连接信息
let connections = manager.get_all_connections();
let connection_details: Vec<serde_json::Value> = connections.iter().map(|conn| {
serde_json::json!({
"id": conn.id,
"connected_at": format!("{:?}", conn.connected_at.elapsed()),
"last_heartbeat": format!("{:?}", conn.last_heartbeat.elapsed()),
"client_info": conn.get_client_info()
})
}).collect();
HttpResponse::Ok().json(serde_json::json!({
"service": "gateway",
"version": env!("CARGO_PKG_VERSION"),
"rust_version": env::var("RUSTC_VERSION").unwrap_or_else(|_| "unknown".to_string()),
"build_time": env::var("BUILD_TIME").unwrap_or_else(|_| "unknown".to_string()),
"environment": env::var("ENVIRONMENT").unwrap_or_else(|_| "development".to_string()),
"features": [
"health_check",
"task_processing",
"wechat_integration",
"miniprogram_integration",
"websocket_support",
"nginx_proxy_integration"
],
"websocket_connections": connection_count,
"websocket_connection_details": connection_details,
"communication_config": config_info,
"nginx_proxy": "enabled",
"ssl_enabled": true,
"domain": "pactgo.cn",
"timestamp": utils::current_timestamp()
}))
}
#[actix_web::main]
async fn main() -> std::io::Result<()> {
// 初始化日志
env_logger::init_from_env(
env_logger::Env::new().default_filter_or("info,actix_web=info")
);
// 由于nginx代理网关服务监听在8000端口
let port = env::var("PORT").unwrap_or_else(|_| "8000".to_string());
let bind_address = format!("127.0.0.1:{}", port); // 只监听本地通过nginx代理
println!("🚀 网关服务启动中...");
println!("📍 绑定地址: {} (通过nginx代理)", bind_address);
println!("📝 日志级别: info");
println!("🔧 版本: {}", env!("CARGO_PKG_VERSION"));
println!("🎯 环境: {}", env::var("ENVIRONMENT").unwrap_or_else(|_| "development".to_string()));
println!("🌐 外部访问: https://pactgo.cn (nginx代理)");
// 创建任务处理服务
let task_service = web::Data::new(TaskService::new());
// 创建WebSocket客户端配置用于测试和演示
let ws_config = CommunicationConfig::production();
let mut ws_client = WebSocketClient::new(ws_config.clone());
// 在后台启动WebSocket客户端连接测试
tokio::spawn(async move {
println!("🔄 启动WebSocket客户端连接测试...");
match ws_client.connect().await {
Ok(_) => {
println!("✅ WebSocket客户端连接成功");
// 测试连接状态
if ws_client.is_connected() {
println!("🔗 WebSocket客户端已连接");
// 测试发送任务
let test_task = shared::TaskRequest {
user_id: "test_user".to_string(),
task_type: shared::TaskType::TextProcessing,
content: "这是一个测试任务".to_string(),
priority: 1,
timeout: Some(30),
extra_params: None,
timestamp: utils::current_timestamp(),
};
match ws_client.send_task(test_task).await {
Ok(response) => {
println!("✅ 测试任务发送成功: {:?}", response.message);
},
Err(e) => {
println!("⚠️ 测试任务发送失败: {}", e);
}
}
}
// 延迟后断开连接
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
match ws_client.disconnect().await {
Ok(_) => println!("🔌 WebSocket客户端已断开"),
Err(e) => println!("❌ WebSocket客户端断开失败: {}", e),
}
},
Err(e) => {
println!("❌ WebSocket客户端连接失败: {}", e);
}
}
});
// 启动连接管理器后台任务
let connection_manager_clone = task_service.connection_manager.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(60));
loop {
interval.tick().await;
// 清理超时连接
let mut manager = connection_manager_clone.write().await;
let timeout = std::time::Duration::from_secs(120); // 2分钟超时
let removed_ids = manager.cleanup_timeout_connections(timeout);
if !removed_ids.is_empty() {
println!("🧹 后台清理超时连接: {}", removed_ids.len());
}
}
});
// 启动WebSocket连接测试任务
let connection_manager_test = task_service.connection_manager.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(30));
loop {
interval.tick().await;
// 获取所有连接并测试发送消息
let manager = connection_manager_test.read().await;
let connections = manager.get_all_connections();
if !connections.is_empty() {
println!("🔍 测试WebSocket连接 - 发现 {} 个连接", connections.len());
// 随机选择一个连接进行测试
if let Some(connection_info) = connections.first() {
// 这里只是模拟实际使用时需要获取WebSocketConnection实例
println!("📤 准备向连接 {} 发送测试消息", connection_info.id);
// 模拟发送消息实际使用时需要WebSocketConnection实例
let test_message = serde_json::json!({
"type": "health_check",
"timestamp": utils::current_timestamp(),
"message": "连接健康检查"
});
println!("✅ 模拟发送测试消息: {}", test_message);
// 测试WebSocketConnection的发送方法模拟
println!("🧪 测试WebSocketConnection的send方法");
let send_result = serde_json::json!({
"type": "test_send",
"connection_id": connection_info.id,
"test_message": "这是send方法的测试消息"
});
println!("📤 send方法测试结果: {}", send_result);
// 测试WebSocketConnection的send_and_wait方法模拟
println!("🧪 测试WebSocketConnection的send_and_wait方法");
let wait_result = serde_json::json!({
"type": "test_send_and_wait",
"connection_id": connection_info.id,
"request": "这是send_and_wait方法的测试请求",
"response": "模拟响应数据",
"timeout": 5000
});
println!("⏱️ send_and_wait方法测试结果: {}", wait_result);
}
}
}
});
let server = HttpServer::new(move || {
App::new()
.app_data(task_service.clone())
.wrap(Logger::default())
.service(
web::scope("/api/v1")
// 健康检查
.route("/health", web::get().to(health_check))
// 系统信息
.route("/system", web::get().to(system_info))
// 任务处理
.route("/task", web::post().to(handle_task))
.route("/task/{task_id}", web::get().to(get_task_status))
.route("/tasks", web::get().to(list_tasks))
// 微信集成
.route("/wechat/callback", web::post().to(handle_wechat_callback))
.route("/wechat/miniprogram/callback", web::post().to(handle_wechat_miniprogram_callback))
// WebSocket连接内网服务器连接
.route("/ws/control", web::get().to(websocket_handler))
.route("/ws/task", web::get().to(websocket_handler))
// 测试接口(用于开发调试)
.route("/test/websocket/send", web::post().to(test_websocket_send))
.route("/test/websocket/send_and_wait", web::post().to(test_websocket_send_and_wait))
.route("/test/websocket/get_manager", web::get().to(test_websocket_get_manager))
.route("/test/websocket/connection_send", web::get().to(test_websocket_connection_send))
// 直接测试WebSocketConnection方法
.route("/test/websocket/direct_send", web::post().to(test_websocket_direct_send))
.route("/test/websocket/direct_send_and_wait", web::post().to(test_websocket_direct_send_and_wait))
)
})
.bind(&bind_address)?
.run();
println!("✅ 网关服务已启动在 {} (通过nginx代理)", bind_address);
println!("🔍 可用接口:");
println!(" GET /api/v1/health - 健康检查");
println!(" GET /api/v1/system - 系统信息");
println!(" POST /api/v1/task - 处理任务");
println!(" GET /api/v1/task/{{task_id}} - 查询任务状态");
println!(" GET /api/v1/tasks - 查询任务列表");
println!(" POST /api/v1/wechat/callback - 企业微信回调");
println!(" POST /api/v1/wechat/miniprogram/callback - 微信小程序回调");
println!(" GET /api/v1/ws/control - WebSocket控制通道");
println!(" GET /api/v1/ws/task - WebSocket任务通道");
println!(" POST /api/v1/test/websocket/send - WebSocket发送测试");
println!(" POST /api/v1/test/websocket/send_and_wait - WebSocket发送并等待测试");
println!(" GET /api/v1/test/websocket/get_manager - WebSocket管理器测试");
println!(" GET /api/v1/test/websocket/connection_send - WebSocket连接发送测试");
println!(" POST /api/v1/test/websocket/direct_send - WebSocket直接发送测试");
println!(" POST /api/v1/test/websocket/direct_send_and_wait - WebSocket直接发送并等待测试");
println!(" 🌐 外部访问: https://pactgo.cn (nginx代理)");
println!(" 🔗 WebSocket连接: wss://pactgo.cn/api/v1/ws/control");
server.await
}