Files
JoyD/Claw/docs/小程序网关SSE架构方案.md
2026-04-21 13:46:20 +08:00

477 lines
13 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# WechatApp ↔ 网关SSE 代理方案
> WechatApp 通过 HTTP POST 发送消息,网关通过 WebSocket 将请求转发给 SmartClawSmartClaw 代为请求 LMStudio SSE 端点,再通过 WebSocket 将 SSE 流透传给网关,最终以 SSE 格式返回给 WechatApp。网关全程不直接连接 LMStudio。
---
## 一、整体架构
```
┌──────────────┐ ┌──────────────┐
│ │ POST /v1/chat/completions │ │
│ WechatApp/浏览器 │ ────────────────────────────► │ Gateway │
│ │ SSE Stream │ (Actix) │
└──────────────┘ ◄───────────────────────────── │ │
└──────┬───────┘
│ WebSocket
┌────────▼────────┐
│ SmartClaw │
│ (Rust 桌面端) │
└────────┬────────┘
│ SSE (HTTP)
┌────────▼────────┐
│ LMStudio │
│ (内网) │
└─────────────────┘
```
**两条链路完全独立:**
| 链路 | 协议 | 说明 |
|------|------|------|
| WechatApp → 网关 | HTTP POST + SSE | WechatApp 发请求,网关返回 SSE 流 |
| 网关 → SmartClaw | WebSocket | 网关透传消息SmartClaw 代为请求 LMStudio |
| SmartClaw → LMStudio | HTTP SSE | SmartClaw 直连内网 LMStudio |
| SmartClaw → 网关 | WebSocket → SSE | LMStudio SSE 流透传回网关 |
---
## 二、SSE 端点设计
### 2.1 端点
```
POST /v1/chat/completions
```
标准 OpenAI 兼容格式,网关将其转发给 SmartClaw由 SmartClaw 代为请求 LMStudio。
### 2.2 请求
```
POST /v1/chat/completions
Content-Type: application/json
{
"model": "qwen2.5",
"messages": [
{"role": "user", "content": "你好"}
],
"stream": true
}
```
### 2.3 响应
```
HTTP/1.1 200 OK
Content-Type: text/event-stream
X-Request-Id: req_xxx
data: {"id":"chatcmpl-xxx","choices":[{"delta":{"content":"你"}}]}
data: {"id":"chatcmpl-xxx","choices":[{"delta":{"content":"好"}}]}
data: {"id":"chatcmpl-xxx","choices":[{"delta":{"content":""}}]}
data: [DONE]
```
网关收到 SmartClaw 透传的 LMStudio SSE 流,原样返回给客户端,**不解析、不转换、不缓冲**。
### 2.4 状态码
| 状态码 | 含义 |
|--------|------|
| 200 | 正常 SSE 流 |
| 400 | 参数错误 |
| 502 | SmartClaw 不可达 |
| 504 | LMStudio 处理超时 |
---
## 三、完整消息流
```
1. WechatApp ──POST /v1/chat/completions──► 网关
2. 网关 ──WebSocket (wechat_app_sse_request)──► SmartClaw
3. SmartClaw ──POST /v1/chat/completions──► LMStudio
4. LMStudio ──SSE Stream──► SmartClaw
5. SmartClaw ──WebSocket (sse_chunk)──► 网关
6. 网关 ──SSE Stream──► WechatApp
```
### 3.1 WebSocket 消息格式
**网关 → SmartClaw请求**
```json
{
"type": "wechat_app_sse_request",
"request_id": "req_abc123",
"data": {
"model": "qwen2.5",
"messages": [
{"role": "user", "content": "你好"}
],
"stream": true
}
}
```
**SmartClaw → 网关(响应片段):**
```json
{
"type": "sse_chunk",
"request_id": "req_abc123",
"chunk": "data: {\"choices\":[{\"delta\":{\"content\":\"你\"}}]}\n\n"
}
```
**SmartClaw → 网关(结束):**
```json
{
"type": "sse_done",
"request_id": "req_abc123"
}
```
**SmartClaw → 网关(错误):**
```json
{
"type": "sse_error",
"request_id": "req_abc123",
"error": "LMStudio 不可达"
}
```
---
## 四、网关实现
### 4.1 Actix-web Handler
```rust
// src/sse_proxy.rs
use actix_web::{web, HttpRequest, HttpResponse};
use tokio::sync::mpsc;
use futures_util::StreamExt;
/// POST /v1/chat/completions
/// 网关将请求透传给 SmartClaw由 SmartClaw 代为请求 LMStudio
pub async fn chat_completions(
req: HttpRequest,
body: web::Json<ChatCompletionsRequest>,
pool: web::Data<WebSocketPool>,
) -> HttpResponse {
let request_id = uuid::Uuid::new_v4().to_string();
let body = body.into_inner();
// 1. 验证 stream 参数(必须为 true
if !body.stream {
return HttpResponse::BadRequest().json(serde_json::json!({
"error": { "message": "stream must be true", "type": "invalid_request_error" }
}));
}
// 2. 构建转发给 SmartClaw 的 WebSocket 消息
let forward = serde_json::json!({
"type": "wechat_app_sse_request",
"request_id": request_id,
"data": body,
});
// 3. 创建 SSE channel等待 SmartClaw 响应
let (client_tx, server_rx) = mpsc::channel::<Vec<u8>>(1024);
// 4. 注册请求到待响应映射
let sse_manager = app_data.sse_manager.clone();
sse_manager.add(request_id.clone(), client_tx).await;
// 5. 发给 SmartClaw
if let Err(e) = pool.broadcast_to_smartclaw(forward).await {
sse_manager.remove(&request_id).await;
return HttpResponse::BadGateway().json(serde_json::json!({
"error": { "message": "SmartClaw 不可用", "type": "bad_gateway" }
}));
}
// 6. 将 SmartClaw 的 WebSocket 消息转换为 SSE 流返回
let stream = server_rx.map(|chunk| Ok::<_, std::convert::Infallible>(actix_web::web::Bytes::from(chunk)));
HttpResponse::Ok()
.content_type("text/event-stream")
.append_header(("Cache-Control", "no-cache"))
.append_header(("X-Request-Id", &request_id))
.streaming(stream)
}
```
### 4.2 接收 SmartClaw 的 SSE 片段
SmartClaw 通过 `/api/v1/ws/control` WebSocket 通道发回 SSE 片段,网关需要处理 `sse_chunk` 类型:
```rust
// src/main.rs — WebSocket on_message handler 中
Some("sse_chunk") => {
let request_id = parsed
.get("request_id")
.and_then(|v| v.as_str())
.unwrap_or("");
let chunk = parsed
.get("chunk")
.and_then(|v| v.as_str())
.unwrap_or("");
if let Some(tx) = sse_manager.get(request_id).await {
let _ = tx.send(chunk.as_bytes().to_vec()).await;
}
}
Some("sse_done") => {
let request_id = parsed
.get("request_id")
.and_then(|v| v.as_str())
.unwrap_or("");
sse_manager.remove(request_id).await;
}
Some("sse_error") => {
let request_id = parsed
.get("request_id")
.and_then(|v| v.as_str())
.unwrap_or("");
let error_msg = parsed.get("error")
.and_then(|v| v.as_str())
.unwrap_or("未知错误");
if let Some(tx) = sse_manager.get(request_id).await {
let _ = tx.send(format!("data: {{\"error\": \"{}\"}}\n\n", error_msg).into_bytes()).await;
sse_manager.remove(request_id).await;
}
}
```
### 4.3 SSE Manager
```rust
// src/sse_manager.rs
use tokio::sync::mpsc;
use std::collections::HashMap;
/// 管理所有活跃的 SSE 请求,等待 SmartClaw 的 WebSocket 响应
pub struct SseManager {
pending: RwLock<HashMap<String, mpsc::Sender<Vec<u8>>>),
}
impl SseManager {
pub fn new() -> Self {
Self { pending: RwLock::new(HashMap::new()).into() }
}
/// 注册一个 SSE 请求,关联其响应 channel
pub async fn add(&self, request_id: String, tx: mpsc::Sender<Vec<u8>>) {
self.pending.write().await.insert(request_id, tx);
}
/// 根据 request_id 获取对应的响应 channel
pub async fn get(&self, request_id: &str) -> Option<mpsc::Sender<Vec<u8>>> {
self.pending.read().await.get(request_id).cloned()
}
/// 移除完成的请求
pub async fn remove(&self, request_id: &str) {
self.pending.write().await.remove(request_id);
}
}
```
---
## 五、SmartClaw 侧实现
SmartClaw 收到 `wechat_app_sse_request` 后:
1. 提取 `data` 字段OpenAI 格式请求体)
2. 发送 HTTP POST 到 `http://<内网LMStudio>:1234/v1/chat/completions`
3. 逐块读取 SSE 响应,每块通过 WebSocket 发回 `sse_chunk`
4. 结束时发 `sse_done`
```rust
// SmartClaw/src/sse_forwarder.rs
async fn handle_sse_request(pool: WebSocketPool, request_id: String, req: ChatCompletionsRequest) {
let client = reqwest::Client::new();
let lmstudio_url = "http://10.0.0.100:1234/v1/chat/completions";
let resp = match client.post(lmstudio_url)
.json(&req)
.send()
.await
{
Ok(r) => r,
Err(e) => {
let err_msg = serde_json::json!({
"type": "sse_error",
"request_id": request_id,
"error": e.to_string()
});
let _ = pool.broadcast_to_control(err_msg).await;
return;
}
};
let mut stream = resp.bytes_stream();
while let Some(chunk) = stream.next().await {
match chunk {
Ok(bytes) => {
let msg = serde_json::json!({
"type": "sse_chunk",
"request_id": request_id,
"chunk": String::from_utf8_lossy(&bytes)
});
let _ = pool.broadcast_to_control(msg).await;
}
Err(e) => {
let err_msg = serde_json::json!({
"type": "sse_error",
"request_id": request_id,
"error": e.to_string()
});
let _ = pool.broadcast_to_control(err_msg).await;
break;
}
}
}
let done = serde_json::json!({
"type": "sse_done",
"request_id": request_id
});
let _ = pool.broadcast_to_control(done).await;
}
```
---
## 六、WechatApp 客户端
### 6.1 WechatApp
```javascript
// utils/api.js
/**
* 发送聊天消息SSE
* @param {string} content - 消息内容
* @returns {Promise<string>} 完整回复
*/
async function sendMessage(content) {
const response = await wx.request({
url: 'https://pactgo.cn/v1/chat/completions',
method: 'POST',
header: {
'Content-Type': 'application/json',
},
data: {
model: 'qwen2.5',
messages: [{ role: 'user', content }],
stream: true,
},
});
if (response.statusCode === 200 && response.data) {
return response.data; // WechatApp等完整响应
} else {
throw new Error(`请求失败: ${response.statusCode}`);
}
}
// pages/chat/chat.js
async function onSend() {
const content = inputValue;
if (!content.trim()) return;
appendMessage('user', content);
inputValue = '';
try {
const reply = await sendMessage(content);
appendMessage('assistant', reply);
} catch (err) {
appendMessage('assistant', `错误: ${err.message}`);
}
}
```
### 6.2 浏览器 / H5有打字机效果
```javascript
async function sendMessage(content) {
const response = await fetch('https://pactgo.cn/v1/chat/completions', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
model: 'qwen2.5',
messages: [{ role: 'user', content }],
stream: true,
}),
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const text = decoder.decode(value);
text.split('\n').forEach(line => {
if (line.startsWith('data: ')) {
const data = line.slice(6);
if (data === '[DONE]') return;
try {
const json = JSON.parse(data);
const token = json.choices?.[0]?.delta?.content;
if (token) appendToken(token); // 实时打字机效果
} catch {}
}
});
}
}
```
---
## 七、与设备 WebSocket 链路的区别
| | WechatApp SSE 链路 | 设备 WebSocket 链路 |
|--|---------------|-----------------|
| 发起方 | WechatApp POST HTTP | 设备主动连接 WebSocket |
| 网关 → LMStudio | 经由 SmartClaw | SmartClaw 自行处理 |
| SSE 流 | WechatApp ← 网关 ← SmartClaw ← LMStudio | 不涉及 |
| 设备响应 | 无HTTP 请求/响应) | WebSocket JSON 双工 |
| 会话管理 | 无网关无状态SSEManager 仅管理响应 channel| SmartClaw 管理 |
---
## 八、文件结构
```
gateway/src/
├── main.rs # WebSocket handler处理 sse_chunk/sse_done/sse_error
├── communication.rs # SmartClaw WebSocketbroadcast_to_control
├── sse_proxy.rs # [新增] POST /v1/chat/completions handler
├── sse_manager.rs # [新增] 待响应 request_id → mpsc::Sender 映射
└── ...
SmartClaw/src/
├── main.rs # [修改] 处理 wechat_app_sse_request 消息类型
└── sse_forwarder.rs # [新增] SSE 请求转发给 LMStudio
```