Files
JoyD/Claw/docs/小程序网关SSE架构方案.md
2026-04-21 13:46:20 +08:00

13 KiB
Raw Permalink Blame History

WechatApp ↔ 网关SSE 代理方案

WechatApp 通过 HTTP POST 发送消息,网关通过 WebSocket 将请求转发给 SmartClawSmartClaw 代为请求 LMStudio SSE 端点,再通过 WebSocket 将 SSE 流透传给网关,最终以 SSE 格式返回给 WechatApp。网关全程不直接连接 LMStudio。


一、整体架构

┌──────────────┐                              ┌──────────────┐
│              │    POST /v1/chat/completions    │              │
│ WechatApp/浏览器 │ ────────────────────────────► │   Gateway    │
│              │    SSE Stream                   │   (Actix)   │
└──────────────┘ ◄───────────────────────────── │              │
                                                └──────┬───────┘
                                                       │ WebSocket
                                              ┌────────▼────────┐
                                              │  SmartClaw     │
                                              │  (Rust 桌面端)  │
                                              └────────┬────────┘
                                                       │ SSE (HTTP)
                                              ┌────────▼────────┐
                                              │   LMStudio     │
                                              │  (内网)        │
                                              └─────────────────┘

两条链路完全独立:

链路 协议 说明
WechatApp → 网关 HTTP POST + SSE WechatApp 发请求,网关返回 SSE 流
网关 → SmartClaw WebSocket 网关透传消息SmartClaw 代为请求 LMStudio
SmartClaw → LMStudio HTTP SSE SmartClaw 直连内网 LMStudio
SmartClaw → 网关 WebSocket → SSE LMStudio SSE 流透传回网关

二、SSE 端点设计

2.1 端点

POST /v1/chat/completions

标准 OpenAI 兼容格式,网关将其转发给 SmartClaw由 SmartClaw 代为请求 LMStudio。

2.2 请求

POST /v1/chat/completions
Content-Type: application/json

{
  "model": "qwen2.5",
  "messages": [
    {"role": "user", "content": "你好"}
  ],
  "stream": true
}

2.3 响应

HTTP/1.1 200 OK
Content-Type: text/event-stream
X-Request-Id: req_xxx

data: {"id":"chatcmpl-xxx","choices":[{"delta":{"content":"你"}}]}
data: {"id":"chatcmpl-xxx","choices":[{"delta":{"content":"好"}}]}
data: {"id":"chatcmpl-xxx","choices":[{"delta":{"content":""}}]}
data: [DONE]

网关收到 SmartClaw 透传的 LMStudio SSE 流,原样返回给客户端,不解析、不转换、不缓冲

2.4 状态码

状态码 含义
200 正常 SSE 流
400 参数错误
502 SmartClaw 不可达
504 LMStudio 处理超时

三、完整消息流

1. WechatApp ──POST /v1/chat/completions──► 网关
2. 网关 ──WebSocket (wechat_app_sse_request)──► SmartClaw
3. SmartClaw ──POST /v1/chat/completions──► LMStudio
4. LMStudio ──SSE Stream──► SmartClaw
5. SmartClaw ──WebSocket (sse_chunk)──► 网关
6. 网关 ──SSE Stream──► WechatApp

3.1 WebSocket 消息格式

网关 → SmartClaw请求

{
  "type": "wechat_app_sse_request",
  "request_id": "req_abc123",
  "data": {
    "model": "qwen2.5",
    "messages": [
      {"role": "user", "content": "你好"}
    ],
    "stream": true
  }
}

SmartClaw → 网关(响应片段):

{
  "type": "sse_chunk",
  "request_id": "req_abc123",
  "chunk": "data: {\"choices\":[{\"delta\":{\"content\":\"你\"}}]}\n\n"
}

SmartClaw → 网关(结束):

{
  "type": "sse_done",
  "request_id": "req_abc123"
}

SmartClaw → 网关(错误):

{
  "type": "sse_error",
  "request_id": "req_abc123",
  "error": "LMStudio 不可达"
}

四、网关实现

4.1 Actix-web Handler

// src/sse_proxy.rs

use actix_web::{web, HttpRequest, HttpResponse};
use tokio::sync::mpsc;
use futures_util::StreamExt;

/// POST /v1/chat/completions
/// 网关将请求透传给 SmartClaw由 SmartClaw 代为请求 LMStudio
pub async fn chat_completions(
    req: HttpRequest,
    body: web::Json<ChatCompletionsRequest>,
    pool: web::Data<WebSocketPool>,
) -> HttpResponse {
    let request_id = uuid::Uuid::new_v4().to_string();
    let body = body.into_inner();

    // 1. 验证 stream 参数(必须为 true
    if !body.stream {
        return HttpResponse::BadRequest().json(serde_json::json!({
            "error": { "message": "stream must be true", "type": "invalid_request_error" }
        }));
    }

    // 2. 构建转发给 SmartClaw 的 WebSocket 消息
    let forward = serde_json::json!({
        "type": "wechat_app_sse_request",
        "request_id": request_id,
        "data": body,
    });

    // 3. 创建 SSE channel等待 SmartClaw 响应
    let (client_tx, server_rx) = mpsc::channel::<Vec<u8>>(1024);

    // 4. 注册请求到待响应映射
    let sse_manager = app_data.sse_manager.clone();
    sse_manager.add(request_id.clone(), client_tx).await;

    // 5. 发给 SmartClaw
    if let Err(e) = pool.broadcast_to_smartclaw(forward).await {
        sse_manager.remove(&request_id).await;
        return HttpResponse::BadGateway().json(serde_json::json!({
            "error": { "message": "SmartClaw 不可用", "type": "bad_gateway" }
        }));
    }

    // 6. 将 SmartClaw 的 WebSocket 消息转换为 SSE 流返回
    let stream = server_rx.map(|chunk| Ok::<_, std::convert::Infallible>(actix_web::web::Bytes::from(chunk)));

    HttpResponse::Ok()
        .content_type("text/event-stream")
        .append_header(("Cache-Control", "no-cache"))
        .append_header(("X-Request-Id", &request_id))
        .streaming(stream)
}

4.2 接收 SmartClaw 的 SSE 片段

SmartClaw 通过 /api/v1/ws/control WebSocket 通道发回 SSE 片段,网关需要处理 sse_chunk 类型:

// src/main.rs — WebSocket on_message handler 中

Some("sse_chunk") => {
    let request_id = parsed
        .get("request_id")
        .and_then(|v| v.as_str())
        .unwrap_or("");

    let chunk = parsed
        .get("chunk")
        .and_then(|v| v.as_str())
        .unwrap_or("");

    if let Some(tx) = sse_manager.get(request_id).await {
        let _ = tx.send(chunk.as_bytes().to_vec()).await;
    }
}

Some("sse_done") => {
    let request_id = parsed
        .get("request_id")
        .and_then(|v| v.as_str())
        .unwrap_or("");

    sse_manager.remove(request_id).await;
}

Some("sse_error") => {
    let request_id = parsed
        .get("request_id")
        .and_then(|v| v.as_str())
        .unwrap_or("");

    let error_msg = parsed.get("error")
        .and_then(|v| v.as_str())
        .unwrap_or("未知错误");

    if let Some(tx) = sse_manager.get(request_id).await {
        let _ = tx.send(format!("data: {{\"error\": \"{}\"}}\n\n", error_msg).into_bytes()).await;
        sse_manager.remove(request_id).await;
    }
}

4.3 SSE Manager

// src/sse_manager.rs

use tokio::sync::mpsc;
use std::collections::HashMap;

/// 管理所有活跃的 SSE 请求,等待 SmartClaw 的 WebSocket 响应
pub struct SseManager {
    pending: RwLock<HashMap<String, mpsc::Sender<Vec<u8>>>),
}

impl SseManager {
    pub fn new() -> Self {
        Self { pending: RwLock::new(HashMap::new()).into() }
    }

    /// 注册一个 SSE 请求,关联其响应 channel
    pub async fn add(&self, request_id: String, tx: mpsc::Sender<Vec<u8>>) {
        self.pending.write().await.insert(request_id, tx);
    }

    /// 根据 request_id 获取对应的响应 channel
    pub async fn get(&self, request_id: &str) -> Option<mpsc::Sender<Vec<u8>>> {
        self.pending.read().await.get(request_id).cloned()
    }

    /// 移除完成的请求
    pub async fn remove(&self, request_id: &str) {
        self.pending.write().await.remove(request_id);
    }
}

五、SmartClaw 侧实现

SmartClaw 收到 wechat_app_sse_request 后:

  1. 提取 data 字段OpenAI 格式请求体)
  2. 发送 HTTP POST 到 http://<内网LMStudio>:1234/v1/chat/completions
  3. 逐块读取 SSE 响应,每块通过 WebSocket 发回 sse_chunk
  4. 结束时发 sse_done
// SmartClaw/src/sse_forwarder.rs

async fn handle_sse_request(pool: WebSocketPool, request_id: String, req: ChatCompletionsRequest) {
    let client = reqwest::Client::new();
    let lmstudio_url = "http://10.0.0.100:1234/v1/chat/completions";

    let resp = match client.post(lmstudio_url)
        .json(&req)
        .send()
        .await
    {
        Ok(r) => r,
        Err(e) => {
            let err_msg = serde_json::json!({
                "type": "sse_error",
                "request_id": request_id,
                "error": e.to_string()
            });
            let _ = pool.broadcast_to_control(err_msg).await;
            return;
        }
    };

    let mut stream = resp.bytes_stream();
    while let Some(chunk) = stream.next().await {
        match chunk {
            Ok(bytes) => {
                let msg = serde_json::json!({
                    "type": "sse_chunk",
                    "request_id": request_id,
                    "chunk": String::from_utf8_lossy(&bytes)
                });
                let _ = pool.broadcast_to_control(msg).await;
            }
            Err(e) => {
                let err_msg = serde_json::json!({
                    "type": "sse_error",
                    "request_id": request_id,
                    "error": e.to_string()
                });
                let _ = pool.broadcast_to_control(err_msg).await;
                break;
            }
        }
    }

    let done = serde_json::json!({
        "type": "sse_done",
        "request_id": request_id
    });
    let _ = pool.broadcast_to_control(done).await;
}

六、WechatApp 客户端

6.1 WechatApp

// utils/api.js

/**
 * 发送聊天消息SSE
 * @param {string} content - 消息内容
 * @returns {Promise<string>} 完整回复
 */
async function sendMessage(content) {
  const response = await wx.request({
    url: 'https://pactgo.cn/v1/chat/completions',
    method: 'POST',
    header: {
      'Content-Type': 'application/json',
    },
    data: {
      model: 'qwen2.5',
      messages: [{ role: 'user', content }],
      stream: true,
    },
  });

  if (response.statusCode === 200 && response.data) {
    return response.data;  // WechatApp等完整响应
  } else {
    throw new Error(`请求失败: ${response.statusCode}`);
  }
}

// pages/chat/chat.js
async function onSend() {
  const content = inputValue;
  if (!content.trim()) return;

  appendMessage('user', content);
  inputValue = '';

  try {
    const reply = await sendMessage(content);
    appendMessage('assistant', reply);
  } catch (err) {
    appendMessage('assistant', `错误: ${err.message}`);
  }
}

6.2 浏览器 / H5有打字机效果

async function sendMessage(content) {
  const response = await fetch('https://pactgo.cn/v1/chat/completions', {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify({
      model: 'qwen2.5',
      messages: [{ role: 'user', content }],
      stream: true,
    }),
  });

  const reader = response.body.getReader();
  const decoder = new TextDecoder();

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;

    const text = decoder.decode(value);
    text.split('\n').forEach(line => {
      if (line.startsWith('data: ')) {
        const data = line.slice(6);
        if (data === '[DONE]') return;
        try {
          const json = JSON.parse(data);
          const token = json.choices?.[0]?.delta?.content;
          if (token) appendToken(token);  // 实时打字机效果
        } catch {}
      }
    });
  }
}

七、与设备 WebSocket 链路的区别

WechatApp SSE 链路 设备 WebSocket 链路
发起方 WechatApp POST HTTP 设备主动连接 WebSocket
网关 → LMStudio 经由 SmartClaw SmartClaw 自行处理
SSE 流 WechatApp ← 网关 ← SmartClaw ← LMStudio 不涉及
设备响应 HTTP 请求/响应) WebSocket JSON 双工
会话管理 网关无状态SSEManager 仅管理响应 channel SmartClaw 管理

八、文件结构

gateway/src/
├── main.rs              # WebSocket handler处理 sse_chunk/sse_done/sse_error
├── communication.rs     # SmartClaw WebSocketbroadcast_to_control
├── sse_proxy.rs         # [新增] POST /v1/chat/completions handler
├── sse_manager.rs       # [新增] 待响应 request_id → mpsc::Sender 映射
└── ...

SmartClaw/src/
├── main.rs              # [修改] 处理 wechat_app_sse_request 消息类型
└── sse_forwarder.rs    # [新增] SSE 请求转发给 LMStudio