taskkill /F /IM nginx.exe
This commit is contained in:
@@ -8,12 +8,16 @@ actix-web = "^4.0"
|
||||
tokio = { version = "^1.0", features = ["full"] }
|
||||
serde = { version = "^1.0", features = ["derive"] }
|
||||
serde_json = "^1.0"
|
||||
reqwest = { version = "^0.11", features = ["json"] }
|
||||
reqwest = { version = "^0.11", features = ["json", "native-tls"] }
|
||||
# mini-redis = "0.4" # 使用自定义嵌入式Redis实现
|
||||
tokio-tungstenite = "^0.18"
|
||||
tokio-tungstenite = { version = "0.18", features = ["native-tls"] }
|
||||
native-tls = "0.2"
|
||||
shared = { path = "../shared" }
|
||||
env_logger = "^0.10"
|
||||
awc = { version = "^3.0", features = ["rustls"] }
|
||||
awc = { version = "^3.0" }
|
||||
futures-util = "^0.3"
|
||||
chrono = "^0.4"
|
||||
url = "^2.0"
|
||||
base64 = "^0.22"
|
||||
rand = "^0.8"
|
||||
# heed = "^0.20" # 暂时移除,后续实现HeedDB功能
|
||||
|
||||
@@ -265,11 +265,11 @@ async fn websocket_disconnect(ws_manager: web::Data<WebSocketClientManager>) ->
|
||||
// 获取客户端实例
|
||||
let client = ws_manager.get_client();
|
||||
|
||||
if client.is_connected() {
|
||||
if client.is_connected().await {
|
||||
println!("🔗 WebSocket客户端当前已连接,准备断开...");
|
||||
|
||||
// 断开连接
|
||||
client.disconnect();
|
||||
client.disconnect().await;
|
||||
|
||||
println!("✅ WebSocket客户端已断开连接");
|
||||
|
||||
@@ -294,7 +294,7 @@ async fn websocket_stop(ws_manager: web::Data<WebSocketClientManager>) -> impl R
|
||||
println!("🛑 收到WebSocket停止管理器请求");
|
||||
|
||||
// 停止管理器
|
||||
ws_manager.stop();
|
||||
ws_manager.stop().await;
|
||||
|
||||
println!("✅ WebSocket客户端管理器已停止");
|
||||
|
||||
@@ -319,56 +319,57 @@ async fn websocket_handler(req: HttpRequest, _stream: web::Payload) -> impl Resp
|
||||
let client = ws_manager.get_client();
|
||||
|
||||
// 检查连接状态(用于演示)
|
||||
if client.is_connected() {
|
||||
println!("🔗 WebSocket客户端已连接");
|
||||
|
||||
// 测试发送消息(用于演示)
|
||||
let test_message = serde_json::json!({
|
||||
"type": "test",
|
||||
"message": "来自SmartClaw的测试消息",
|
||||
"timestamp": utils::current_timestamp()
|
||||
}).to_string();
|
||||
|
||||
match client.send_message(test_message).await {
|
||||
Ok(_) => {
|
||||
println!("✅ 测试消息发送成功");
|
||||
},
|
||||
Err(e) => {
|
||||
println!("⚠️ 测试消息发送失败: {}", e);
|
||||
if client.is_connected().await {
|
||||
println!("🔗 WebSocket客户端已连接");
|
||||
|
||||
// 测试发送消息(用于演示)
|
||||
let test_message = serde_json::json!({
|
||||
"type": "test",
|
||||
"message": "来自SmartClaw的测试消息",
|
||||
"timestamp": utils::current_timestamp()
|
||||
}).to_string();
|
||||
|
||||
match client.send_message(test_message).await {
|
||||
Ok(_) => {
|
||||
println!("✅ 测试消息发送成功");
|
||||
},
|
||||
Err(e) => {
|
||||
println!("⚠️ 测试消息发送失败: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 测试发送任务响应(用于演示)
|
||||
let test_response = shared::TaskResponse {
|
||||
success: true,
|
||||
message: "测试响应".to_string(),
|
||||
task_id: Some("test_task_123".to_string()),
|
||||
result: Some(serde_json::json!({
|
||||
"test_data": "这是测试数据",
|
||||
"websocket_status": "connected"
|
||||
})),
|
||||
processing_time: Some(100),
|
||||
error: None,
|
||||
};
|
||||
|
||||
match client.send_task_response(test_response).await {
|
||||
Ok(_) => {
|
||||
println!("✅ 测试任务响应发送成功");
|
||||
},
|
||||
Err(e) => {
|
||||
println!("⚠️ 测试任务响应发送失败: {}", e);
|
||||
|
||||
// 测试发送任务响应(用于演示)
|
||||
let test_response = shared::TaskResponse {
|
||||
success: true,
|
||||
message: "测试响应".to_string(),
|
||||
task_id: Some("test_task_123".to_string()),
|
||||
result: Some(serde_json::json!({
|
||||
"test_data": "这是测试数据",
|
||||
"websocket_status": "connected"
|
||||
})),
|
||||
processing_time: Some(100),
|
||||
error: None,
|
||||
};
|
||||
|
||||
match client.send_task_response(test_response).await {
|
||||
Ok(_) => {
|
||||
println!("✅ 测试任务响应发送成功");
|
||||
},
|
||||
Err(e) => {
|
||||
println!("⚠️ 测试任务响应发送失败: {}", e);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
println!("⚠️ WebSocket客户端未连接");
|
||||
}
|
||||
} else {
|
||||
println!("⚠️ WebSocket客户端未连接");
|
||||
}
|
||||
|
||||
// 暂时返回不支持的消息
|
||||
HttpResponse::NotImplemented().json(serde_json::json!({
|
||||
"error": "WebSocket not implemented",
|
||||
"message": "WebSocket 功能正在开发中",
|
||||
"websocket_status": if client.is_connected() { "connected" } else { "disconnected" }
|
||||
}))
|
||||
let status = if client.is_connected().await { "connected" } else { "disconnected" };
|
||||
HttpResponse::NotImplemented().json(serde_json::json!({
|
||||
"error": "WebSocket not implemented",
|
||||
"message": "WebSocket 功能正在开发中",
|
||||
"websocket_status": status
|
||||
}))
|
||||
}
|
||||
|
||||
/// 任务队列状态
|
||||
@@ -397,7 +398,7 @@ async fn main() -> std::io::Result<()> {
|
||||
let bind_address = format!("0.0.0.0:{}", port);
|
||||
|
||||
// 获取网关服务地址
|
||||
let gateway_url = env::var("GATEWAY_URL").unwrap_or_else(|_| "http://localhost:8000".to_string());
|
||||
let gateway_url = env::var("GATEWAY_URL").unwrap_or_else(|_| "https://pactgo.cn".to_string());
|
||||
|
||||
println!("🚀 SmartClaw 服务启动中...");
|
||||
println!("📍 绑定地址: {}", bind_address);
|
||||
@@ -411,20 +412,17 @@ async fn main() -> std::io::Result<()> {
|
||||
|
||||
// 启动 WebSocket 连接(在后台任务中)
|
||||
let ws_manager_for_spawn = ws_manager.clone();
|
||||
std::thread::spawn(move || {
|
||||
let rt = tokio::runtime::Runtime::new().unwrap();
|
||||
rt.block_on(async {
|
||||
println!("🔄 正在启动 WebSocket 客户端连接...");
|
||||
match ws_manager_for_spawn.start().await {
|
||||
Ok(_) => {
|
||||
println!("✅ WebSocket 客户端连接成功");
|
||||
}
|
||||
Err(e) => {
|
||||
println!("❌ WebSocket 客户端连接失败: {}", e);
|
||||
// 这里可以添加重试逻辑
|
||||
}
|
||||
tokio::spawn(async move {
|
||||
println!("🔄 正在启动 WebSocket 客户端连接...");
|
||||
match ws_manager_for_spawn.start().await {
|
||||
Ok(_) => {
|
||||
println!("✅ WebSocket 客户端连接成功");
|
||||
}
|
||||
});
|
||||
Err(e) => {
|
||||
println!("❌ WebSocket 客户端连接失败: {}", e);
|
||||
// 这里可以添加重试逻辑
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let ws_manager_for_server = ws_manager.clone();
|
||||
@@ -452,6 +450,7 @@ async fn main() -> std::io::Result<()> {
|
||||
.route("/websocket/stop", web::post().to(websocket_stop))
|
||||
)
|
||||
})
|
||||
.workers(1) // 设置为1个worker
|
||||
.bind(&bind_address)?
|
||||
.run();
|
||||
|
||||
@@ -491,7 +490,7 @@ async fn graceful_shutdown(ws_manager: WebSocketClientManager) {
|
||||
println!("🛑 开始优雅关闭...");
|
||||
|
||||
// 停止WebSocket客户端
|
||||
ws_manager.stop();
|
||||
ws_manager.stop().await;
|
||||
println!("🔌 WebSocket客户端已停止");
|
||||
|
||||
// 等待一段时间确保所有连接都已关闭
|
||||
|
||||
@@ -1,16 +1,19 @@
|
||||
use tokio_tungstenite::{connect_async, tungstenite::Message};
|
||||
use tokio_tungstenite::{connect_async, tungstenite::Message, tungstenite::http::Request};
|
||||
use futures_util::{SinkExt, StreamExt};
|
||||
use serde_json::json;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::{mpsc, Mutex};
|
||||
use tokio::time::{interval, Duration};
|
||||
use shared::{TaskRequest, TaskResponse};
|
||||
use base64::Engine as _;
|
||||
use base64::engine::general_purpose;
|
||||
use rand::Rng;
|
||||
|
||||
/// WebSocket 客户端连接管理器
|
||||
pub struct WebSocketClient {
|
||||
gateway_url: String,
|
||||
sender: Arc<std::sync::Mutex<Option<mpsc::Sender<String>>>>,
|
||||
is_connected: Arc<std::sync::Mutex<bool>>,
|
||||
sender: Arc<Mutex<Option<mpsc::Sender<String>>>>,
|
||||
is_connected: Arc<Mutex<bool>>,
|
||||
}
|
||||
|
||||
impl WebSocketClient {
|
||||
@@ -18,8 +21,8 @@ impl WebSocketClient {
|
||||
pub fn new(gateway_url: String) -> Self {
|
||||
Self {
|
||||
gateway_url,
|
||||
sender: Arc::new(std::sync::Mutex::new(None)),
|
||||
is_connected: Arc::new(std::sync::Mutex::new(false)),
|
||||
sender: Arc::new(Mutex::new(None)),
|
||||
is_connected: Arc::new(Mutex::new(false)),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -27,22 +30,46 @@ impl WebSocketClient {
|
||||
pub async fn connect(&self) -> Result<(), Box<dyn std::error::Error>> {
|
||||
println!("🔌 正在连接到网关服务: {}", self.gateway_url);
|
||||
|
||||
let ws_url = format!("{}/ws", self.gateway_url.replace("http://", "ws://").replace("https://", "wss://"));
|
||||
let ws_url = format!("{}/api/v1/ws/control", self.gateway_url.replace("http://", "ws://").replace("https://", "wss://"));
|
||||
println!("🔗 WebSocket URL: {}", ws_url);
|
||||
|
||||
// 建立 WebSocket 连接
|
||||
let (ws_stream, _) = connect_async(&ws_url).await?;
|
||||
// 生成随机的 Sec-WebSocket-Key
|
||||
let mut key = [0u8; 16];
|
||||
rand::thread_rng().fill(&mut key);
|
||||
let sec_websocket_key = general_purpose::STANDARD.encode(&key);
|
||||
|
||||
// 建立 WebSocket 连接,添加API密钥和标准 WebSocket 握手头
|
||||
let request = Request::builder()
|
||||
.uri(&ws_url)
|
||||
.header("Host", "pactgo.cn")
|
||||
.header("X-API-Key", "claw_secret_key")
|
||||
.header("Upgrade", "websocket")
|
||||
.header("Connection", "Upgrade")
|
||||
.header("Sec-WebSocket-Key", sec_websocket_key)
|
||||
.header("Sec-WebSocket-Version", "13")
|
||||
.body(())?;
|
||||
|
||||
println!("🔗 正在建立WebSocket连接...");
|
||||
println!("📋 请求URL: {}", ws_url);
|
||||
println!("📋 HTTP版本: {:?}", request.version());
|
||||
println!("📋 请求头:");
|
||||
for (name, value) in request.headers() {
|
||||
println!(" {}: {:?}", name, value);
|
||||
}
|
||||
|
||||
let (ws_stream, response) = connect_async(request).await?;
|
||||
println!("✅ WebSocket 连接建立");
|
||||
println!("📋 响应状态: {}", response.status());
|
||||
|
||||
// 设置连接状态
|
||||
*self.is_connected.lock().unwrap() = true;
|
||||
*self.is_connected.lock().await = true;
|
||||
|
||||
// 分割流
|
||||
let (mut write, mut read) = ws_stream.split();
|
||||
|
||||
// 创建消息通道
|
||||
let (tx, mut rx) = mpsc::channel::<String>(100);
|
||||
*self.sender.lock().unwrap() = Some(tx);
|
||||
*self.sender.lock().await = Some(tx);
|
||||
|
||||
// 启动消息发送循环
|
||||
let _write_handle = tokio::spawn(async move {
|
||||
@@ -67,13 +94,13 @@ impl WebSocketClient {
|
||||
}
|
||||
Ok(Message::Close(_)) => {
|
||||
println!("🔚 收到关闭消息");
|
||||
*is_connected_clone.lock().unwrap() = false;
|
||||
*is_connected_clone.lock().await = false;
|
||||
break;
|
||||
}
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
println!("❌ 接收消息错误: {}", e);
|
||||
*is_connected_clone.lock().unwrap() = false;
|
||||
*is_connected_clone.lock().await = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -89,7 +116,7 @@ impl WebSocketClient {
|
||||
loop {
|
||||
heartbeat_interval.tick().await;
|
||||
|
||||
let connected = *is_connected.lock().unwrap();
|
||||
let connected = *is_connected.lock().await;
|
||||
if !connected {
|
||||
println!("💔 心跳检测到连接已断开");
|
||||
break;
|
||||
@@ -115,7 +142,7 @@ impl WebSocketClient {
|
||||
"timestamp": chrono::Utc::now().timestamp()
|
||||
}).to_string();
|
||||
|
||||
if let Some(sender) = &*self.sender.lock().unwrap() {
|
||||
if let Some(sender) = &*self.sender.lock().await {
|
||||
let _ = sender.send(connect_msg).await;
|
||||
}
|
||||
|
||||
@@ -150,7 +177,7 @@ impl WebSocketClient {
|
||||
|
||||
/// 发送消息
|
||||
pub async fn send_message(&self, message: String) -> Result<(), Box<dyn std::error::Error>> {
|
||||
if let Some(sender) = &*self.sender.lock().unwrap() {
|
||||
if let Some(sender) = &*self.sender.lock().await {
|
||||
sender.send(message).await.map_err(|e| Box::new(e) as Box<dyn std::error::Error>)?;
|
||||
Ok(())
|
||||
} else {
|
||||
@@ -171,14 +198,14 @@ impl WebSocketClient {
|
||||
}
|
||||
|
||||
/// 检查连接状态
|
||||
pub fn is_connected(&self) -> bool {
|
||||
*self.is_connected.lock().unwrap()
|
||||
pub async fn is_connected(&self) -> bool {
|
||||
*self.is_connected.lock().await
|
||||
}
|
||||
|
||||
/// 断开连接
|
||||
pub fn disconnect(&self) {
|
||||
*self.is_connected.lock().unwrap() = false;
|
||||
*self.sender.lock().unwrap() = None;
|
||||
pub async fn disconnect(&self) {
|
||||
*self.is_connected.lock().await = false;
|
||||
*self.sender.lock().await = None;
|
||||
println!("🔌 WebSocket 连接已断开");
|
||||
}
|
||||
}
|
||||
@@ -208,7 +235,7 @@ impl WebSocketClientManager {
|
||||
}
|
||||
|
||||
/// 停止客户端
|
||||
pub fn stop(&self) {
|
||||
self.client.disconnect();
|
||||
pub async fn stop(&self) {
|
||||
self.client.disconnect().await;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user