13 KiB
13 KiB
WechatApp ↔ 网关:SSE 代理方案
WechatApp 通过 HTTP POST 发送消息,网关通过 WebSocket 将请求转发给 SmartClaw,SmartClaw 代为请求 LMStudio SSE 端点,再通过 WebSocket 将 SSE 流透传给网关,最终以 SSE 格式返回给 WechatApp。网关全程不直接连接 LMStudio。
一、整体架构
┌──────────────┐ ┌──────────────┐
│ │ POST /v1/chat/completions │ │
│ WechatApp/浏览器 │ ────────────────────────────► │ Gateway │
│ │ SSE Stream │ (Actix) │
└──────────────┘ ◄───────────────────────────── │ │
└──────┬───────┘
│ WebSocket
┌────────▼────────┐
│ SmartClaw │
│ (Rust 桌面端) │
└────────┬────────┘
│ SSE (HTTP)
┌────────▼────────┐
│ LMStudio │
│ (内网) │
└─────────────────┘
两条链路完全独立:
| 链路 | 协议 | 说明 |
|---|---|---|
| WechatApp → 网关 | HTTP POST + SSE | WechatApp 发请求,网关返回 SSE 流 |
| 网关 → SmartClaw | WebSocket | 网关透传消息,SmartClaw 代为请求 LMStudio |
| SmartClaw → LMStudio | HTTP SSE | SmartClaw 直连内网 LMStudio |
| SmartClaw → 网关 | WebSocket → SSE | LMStudio SSE 流透传回网关 |
二、SSE 端点设计
2.1 端点
POST /v1/chat/completions
标准 OpenAI 兼容格式,网关将其转发给 SmartClaw,由 SmartClaw 代为请求 LMStudio。
2.2 请求
POST /v1/chat/completions
Content-Type: application/json
{
"model": "qwen2.5",
"messages": [
{"role": "user", "content": "你好"}
],
"stream": true
}
2.3 响应
HTTP/1.1 200 OK
Content-Type: text/event-stream
X-Request-Id: req_xxx
data: {"id":"chatcmpl-xxx","choices":[{"delta":{"content":"你"}}]}
data: {"id":"chatcmpl-xxx","choices":[{"delta":{"content":"好"}}]}
data: {"id":"chatcmpl-xxx","choices":[{"delta":{"content":"!"}}]}
data: [DONE]
网关收到 SmartClaw 透传的 LMStudio SSE 流,原样返回给客户端,不解析、不转换、不缓冲。
2.4 状态码
| 状态码 | 含义 |
|---|---|
| 200 | 正常 SSE 流 |
| 400 | 参数错误 |
| 502 | SmartClaw 不可达 |
| 504 | LMStudio 处理超时 |
三、完整消息流
1. WechatApp ──POST /v1/chat/completions──► 网关
2. 网关 ──WebSocket (wechat_app_sse_request)──► SmartClaw
3. SmartClaw ──POST /v1/chat/completions──► LMStudio
4. LMStudio ──SSE Stream──► SmartClaw
5. SmartClaw ──WebSocket (sse_chunk)──► 网关
6. 网关 ──SSE Stream──► WechatApp
3.1 WebSocket 消息格式
网关 → SmartClaw(请求):
{
"type": "wechat_app_sse_request",
"request_id": "req_abc123",
"data": {
"model": "qwen2.5",
"messages": [
{"role": "user", "content": "你好"}
],
"stream": true
}
}
SmartClaw → 网关(响应片段):
{
"type": "sse_chunk",
"request_id": "req_abc123",
"chunk": "data: {\"choices\":[{\"delta\":{\"content\":\"你\"}}]}\n\n"
}
SmartClaw → 网关(结束):
{
"type": "sse_done",
"request_id": "req_abc123"
}
SmartClaw → 网关(错误):
{
"type": "sse_error",
"request_id": "req_abc123",
"error": "LMStudio 不可达"
}
四、网关实现
4.1 Actix-web Handler
// src/sse_proxy.rs
use actix_web::{web, HttpRequest, HttpResponse};
use tokio::sync::mpsc;
use futures_util::StreamExt;
/// POST /v1/chat/completions
/// 网关将请求透传给 SmartClaw,由 SmartClaw 代为请求 LMStudio
pub async fn chat_completions(
req: HttpRequest,
body: web::Json<ChatCompletionsRequest>,
pool: web::Data<WebSocketPool>,
) -> HttpResponse {
let request_id = uuid::Uuid::new_v4().to_string();
let body = body.into_inner();
// 1. 验证 stream 参数(必须为 true)
if !body.stream {
return HttpResponse::BadRequest().json(serde_json::json!({
"error": { "message": "stream must be true", "type": "invalid_request_error" }
}));
}
// 2. 构建转发给 SmartClaw 的 WebSocket 消息
let forward = serde_json::json!({
"type": "wechat_app_sse_request",
"request_id": request_id,
"data": body,
});
// 3. 创建 SSE channel,等待 SmartClaw 响应
let (client_tx, server_rx) = mpsc::channel::<Vec<u8>>(1024);
// 4. 注册请求到待响应映射
let sse_manager = app_data.sse_manager.clone();
sse_manager.add(request_id.clone(), client_tx).await;
// 5. 发给 SmartClaw
if let Err(e) = pool.broadcast_to_smartclaw(forward).await {
sse_manager.remove(&request_id).await;
return HttpResponse::BadGateway().json(serde_json::json!({
"error": { "message": "SmartClaw 不可用", "type": "bad_gateway" }
}));
}
// 6. 将 SmartClaw 的 WebSocket 消息转换为 SSE 流返回
let stream = server_rx.map(|chunk| Ok::<_, std::convert::Infallible>(actix_web::web::Bytes::from(chunk)));
HttpResponse::Ok()
.content_type("text/event-stream")
.append_header(("Cache-Control", "no-cache"))
.append_header(("X-Request-Id", &request_id))
.streaming(stream)
}
4.2 接收 SmartClaw 的 SSE 片段
SmartClaw 通过 /api/v1/ws/control WebSocket 通道发回 SSE 片段,网关需要处理 sse_chunk 类型:
// src/main.rs — WebSocket on_message handler 中
Some("sse_chunk") => {
let request_id = parsed
.get("request_id")
.and_then(|v| v.as_str())
.unwrap_or("");
let chunk = parsed
.get("chunk")
.and_then(|v| v.as_str())
.unwrap_or("");
if let Some(tx) = sse_manager.get(request_id).await {
let _ = tx.send(chunk.as_bytes().to_vec()).await;
}
}
Some("sse_done") => {
let request_id = parsed
.get("request_id")
.and_then(|v| v.as_str())
.unwrap_or("");
sse_manager.remove(request_id).await;
}
Some("sse_error") => {
let request_id = parsed
.get("request_id")
.and_then(|v| v.as_str())
.unwrap_or("");
let error_msg = parsed.get("error")
.and_then(|v| v.as_str())
.unwrap_or("未知错误");
if let Some(tx) = sse_manager.get(request_id).await {
let _ = tx.send(format!("data: {{\"error\": \"{}\"}}\n\n", error_msg).into_bytes()).await;
sse_manager.remove(request_id).await;
}
}
4.3 SSE Manager
// src/sse_manager.rs
use tokio::sync::mpsc;
use std::collections::HashMap;
/// 管理所有活跃的 SSE 请求,等待 SmartClaw 的 WebSocket 响应
pub struct SseManager {
pending: RwLock<HashMap<String, mpsc::Sender<Vec<u8>>>),
}
impl SseManager {
pub fn new() -> Self {
Self { pending: RwLock::new(HashMap::new()).into() }
}
/// 注册一个 SSE 请求,关联其响应 channel
pub async fn add(&self, request_id: String, tx: mpsc::Sender<Vec<u8>>) {
self.pending.write().await.insert(request_id, tx);
}
/// 根据 request_id 获取对应的响应 channel
pub async fn get(&self, request_id: &str) -> Option<mpsc::Sender<Vec<u8>>> {
self.pending.read().await.get(request_id).cloned()
}
/// 移除完成的请求
pub async fn remove(&self, request_id: &str) {
self.pending.write().await.remove(request_id);
}
}
五、SmartClaw 侧实现
SmartClaw 收到 wechat_app_sse_request 后:
- 提取
data字段(OpenAI 格式请求体) - 发送 HTTP POST 到
http://<内网LMStudio>:1234/v1/chat/completions - 逐块读取 SSE 响应,每块通过 WebSocket 发回
sse_chunk - 结束时发
sse_done
// SmartClaw/src/sse_forwarder.rs
async fn handle_sse_request(pool: WebSocketPool, request_id: String, req: ChatCompletionsRequest) {
let client = reqwest::Client::new();
let lmstudio_url = "http://10.0.0.100:1234/v1/chat/completions";
let resp = match client.post(lmstudio_url)
.json(&req)
.send()
.await
{
Ok(r) => r,
Err(e) => {
let err_msg = serde_json::json!({
"type": "sse_error",
"request_id": request_id,
"error": e.to_string()
});
let _ = pool.broadcast_to_control(err_msg).await;
return;
}
};
let mut stream = resp.bytes_stream();
while let Some(chunk) = stream.next().await {
match chunk {
Ok(bytes) => {
let msg = serde_json::json!({
"type": "sse_chunk",
"request_id": request_id,
"chunk": String::from_utf8_lossy(&bytes)
});
let _ = pool.broadcast_to_control(msg).await;
}
Err(e) => {
let err_msg = serde_json::json!({
"type": "sse_error",
"request_id": request_id,
"error": e.to_string()
});
let _ = pool.broadcast_to_control(err_msg).await;
break;
}
}
}
let done = serde_json::json!({
"type": "sse_done",
"request_id": request_id
});
let _ = pool.broadcast_to_control(done).await;
}
六、WechatApp 客户端
6.1 WechatApp
// utils/api.js
/**
* 发送聊天消息(SSE)
* @param {string} content - 消息内容
* @returns {Promise<string>} 完整回复
*/
async function sendMessage(content) {
const response = await wx.request({
url: 'https://pactgo.cn/v1/chat/completions',
method: 'POST',
header: {
'Content-Type': 'application/json',
},
data: {
model: 'qwen2.5',
messages: [{ role: 'user', content }],
stream: true,
},
});
if (response.statusCode === 200 && response.data) {
return response.data; // WechatApp:等完整响应
} else {
throw new Error(`请求失败: ${response.statusCode}`);
}
}
// pages/chat/chat.js
async function onSend() {
const content = inputValue;
if (!content.trim()) return;
appendMessage('user', content);
inputValue = '';
try {
const reply = await sendMessage(content);
appendMessage('assistant', reply);
} catch (err) {
appendMessage('assistant', `错误: ${err.message}`);
}
}
6.2 浏览器 / H5(有打字机效果)
async function sendMessage(content) {
const response = await fetch('https://pactgo.cn/v1/chat/completions', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
model: 'qwen2.5',
messages: [{ role: 'user', content }],
stream: true,
}),
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const text = decoder.decode(value);
text.split('\n').forEach(line => {
if (line.startsWith('data: ')) {
const data = line.slice(6);
if (data === '[DONE]') return;
try {
const json = JSON.parse(data);
const token = json.choices?.[0]?.delta?.content;
if (token) appendToken(token); // 实时打字机效果
} catch {}
}
});
}
}
七、与设备 WebSocket 链路的区别
| WechatApp SSE 链路 | 设备 WebSocket 链路 | |
|---|---|---|
| 发起方 | WechatApp POST HTTP | 设备主动连接 WebSocket |
| 网关 → LMStudio | 经由 SmartClaw | SmartClaw 自行处理 |
| SSE 流 | WechatApp ← 网关 ← SmartClaw ← LMStudio | 不涉及 |
| 设备响应 | 无(HTTP 请求/响应) | WebSocket JSON 双工 |
| 会话管理 | 无(网关无状态,SSEManager 仅管理响应 channel) | SmartClaw 管理 |
八、文件结构
gateway/src/
├── main.rs # WebSocket handler,处理 sse_chunk/sse_done/sse_error
├── communication.rs # SmartClaw WebSocket(broadcast_to_control)
├── sse_proxy.rs # [新增] POST /v1/chat/completions handler
├── sse_manager.rs # [新增] 待响应 request_id → mpsc::Sender 映射
└── ...
SmartClaw/src/
├── main.rs # [修改] 处理 wechat_app_sse_request 消息类型
└── sse_forwarder.rs # [新增] SSE 请求转发给 LMStudio