命名管道服务端启动:\\.\pipe\Updater

This commit is contained in:
zqm
2026-04-09 10:23:07 +08:00
parent cd7d033e3e
commit 649f886413
2 changed files with 216 additions and 9 deletions

View File

@@ -8,7 +8,15 @@ serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
dirs = "5.0"
tokio = { version = "1.37", features = ["full"] }
windows = { version = "0.56", features = ["Win32_System_Console"] }
windows = { version = "0.56", features = [
"Win32_System_Console",
"Win32_System_Pipes",
"Win32_System_IO",
"Win32_Foundation",
"Win32_Security",
"Win32_Storage",
"Win32_Storage_FileSystem",
] }
chrono = "0.4"
base64 = "0.22"
md5 = "0.7"

View File

@@ -1,11 +1,11 @@
use rand::Rng;
use serde::{Deserialize, Serialize};
use std::io::Read;
use std::fs::{self, File};
use std::io::Write as IoWrite;
use std::io::Read;
use std::path::PathBuf;
use std::pin::Pin;
use std::process::Command;
use std::io::Write as StdWrite;
use base64::{Engine as _, engine::general_purpose::STANDARD as BASE64};
use cube_lib::websocket::{WebSocketClient, WebSocketConfig};
@@ -45,6 +45,176 @@ static UPDATE_CHECK_DONE: std::sync::Mutex<bool> = std::sync::Mutex::new(false);
/// 标记本次运行是否执行了更新(下载了文件)
static UPDATE_PERFORMED: std::sync::Mutex<bool> = std::sync::Mutex::new(false);
// ===================== 命名管道服务端 =====================
/// 固定管道名称(单实例运行,直接用此名称)
const PIPE_NAME: &str = r"\\.\pipe\Updater";
/// 从命名管道读取一行消息(阻塞,需在 spawn_blocking 中调用)
fn read_pipe_message(pipe: windows::Win32::Foundation::HANDLE, debug: bool) -> Option<String> {
use windows::Win32::Storage::FileSystem::ReadFile;
let mut buf = [0u8; 4096];
let mut bytes_read: u32 = 0;
let result = unsafe {
ReadFile(
pipe,
Some(&mut buf),
Some(&mut bytes_read),
None,
)
};
if result.is_err() {
if debug {
eprintln!("[Pipe] 读取失败");
}
return None;
}
if bytes_read == 0 {
return None;
}
let msg = String::from_utf8_lossy(&buf[..bytes_read as usize])
.trim_end()
.to_string();
if debug {
println!("[Pipe] 收到消息:{}", msg);
}
Some(msg)
}
/// 写入 ACK 响应到命名管道
fn write_pipe_ack(pipe: windows::Win32::Foundation::HANDLE, debug: bool) {
use windows::Win32::Storage::FileSystem::WriteFile;
let ack = b"ACK\r\n";
let mut written: u32 = 0;
let result = unsafe {
WriteFile(
pipe,
Some(ack),
Some(&mut written),
None,
)
};
if result.is_err() && debug {
eprintln!("[Pipe] 写入 ACK 失败");
}
}
/// 启动命名管道异步服务端(长生命周期 async task
/// 通过 spawn_blocking 在独立 OS 线程中调用同步的 ConnectNamedPipe / ReadFile
/// 收到消息后通过 tokio channel 传回主循环。
/// 连接断开后自动重新监听下一条连接。
fn start_named_pipe_server(
tx: tokio::sync::mpsc::Sender<String>,
debug: bool,
) -> tokio::task::JoinHandle<()> {
tokio::task::spawn(async move {
let pipe_name = PIPE_NAME;
if debug {
println!("[Pipe] 命名管道服务端启动:{}", pipe_name);
}
loop {
// 在 blocking 线程中创建并等待客户端连接
let msg = tokio::task::spawn_blocking(move || {
use windows::Win32::Storage::FileSystem::PIPE_ACCESS_DUPLEX;
use windows::Win32::System::Pipes::{
ConnectNamedPipe, CreateNamedPipeW, DisconnectNamedPipe,
PIPE_READMODE_MESSAGE, PIPE_TYPE_MESSAGE, PIPE_WAIT,
};
use windows::core::PCWSTR;
let name: Vec<u16> = pipe_name.encode_utf16().chain(std::iter::once(0)).collect();
let pipe = unsafe {
CreateNamedPipeW(
PCWSTR::from_raw(name.as_ptr()),
PIPE_ACCESS_DUPLEX,
PIPE_TYPE_MESSAGE | PIPE_READMODE_MESSAGE | PIPE_WAIT,
1, // nMaxInstances
65536, // nOutBufferSize
65536, // nInBufferSize
0, // nDefaultTimeout毫秒0=默认 50ms
None, // lpSecurityAttributes
)
};
if pipe == windows::Win32::Foundation::INVALID_HANDLE_VALUE {
eprintln!("[Pipe] CreateNamedPipeW 失败");
return None;
}
if debug {
println!("[Pipe] 等待客户端连接...");
}
// 阻塞等待客户端连接
let connected = unsafe { ConnectNamedPipe(pipe, None) };
if connected.is_err() {
eprintln!("[Pipe] ConnectNamedPipe 失败");
return None;
}
if debug {
println!("[Pipe] 客户端已连接");
}
// 读取消息
let msg = read_pipe_message(pipe, debug);
// 回复 ACK
if msg.is_some() {
write_pipe_ack(pipe, debug);
}
// 断开连接
unsafe { DisconnectNamedPipe(pipe).ok() };
msg
})
.await;
// 将消息发送到 tokio channel
match msg {
Ok(Some(m)) => {
// quit 命令:发送到 channel 后退出循环
if m == "quit" {
if tx.send(m).await.is_err() {
break;
}
if debug {
println!("[Pipe] quit 已转发,退出监听循环");
}
break;
}
// 正常消息
if tx.send(m).await.is_err() {
break;
}
}
Ok(None) => {
// 连接读取失败,继续等待下一条
}
Err(_) => {
// spawn_blocking 失败
break;
}
}
}
if debug {
println!("[Pipe] 命名管道服务端已停止");
}
})
}
// ===================== 版本比较与下载 =====================
/// 获取 Updater 数据目录X:\AppData\,存放 BootLoader.exe 等)
fn get_updater_data_dir() -> PathBuf {
@@ -550,6 +720,8 @@ fn make_shutdown_channel() -> (
(std::sync::Arc::new(std::sync::Mutex::new(Some(tx))), rx)
}
/// 运行 Updater使用 CubeLib 内置的自动重连)
/// 返回本次运行是否执行了更新(下载了文件)
async fn run_updater(debug_mode: bool) -> bool {
@@ -940,17 +1112,20 @@ async fn main() {
}
}
// 启动命名管道服务端(长生命周期 async task
let (pipe_tx, mut pipe_rx) = tokio::sync::mpsc::channel::<String>(64);
let _pipe_server_handle = start_named_pipe_server(pipe_tx, config.debug_mode);
// 主循环:常驻运行,定期检查更新
loop {
// 运行一次 Updater连接、检查、下载
let update_performed = run_updater(config.debug_mode).await;
if update_performed {
// 执行了更新(下载了文件)→ 退出循环,由 BootLoader 重启 Updater
if config.debug_mode {
println!("Updater 执行了更新,即将退出(等待 BootLoader 重启)");
}
break;
std::process::exit(0);
}
// 无更新 → 随机等待 5-10 分钟后再次检查
@@ -965,10 +1140,34 @@ async fn main() {
println!("Updater 本次检查完成,无更新,等待 {:.1} 分钟后再次检查...", wait_mins);
}
tokio::time::sleep(std::time::Duration::from_secs(wait_seconds)).await;
}
// 分段等待(每 30 秒为一个周期),期间可响应管道消息
let mut remaining = wait_seconds;
let interval_secs = 30;
while remaining > 0 {
let chunk = std::cmp::min(remaining, interval_secs);
let sleep = tokio::time::sleep(std::time::Duration::from_secs(chunk));
tokio::pin!(sleep);
if config.debug_mode {
println!("Updater 已退出");
tokio::select! {
_ = &mut sleep => {
remaining -= chunk;
}
// 收到管道消息(由 EasyTest 等客户端发送)
msg = pipe_rx.recv() => {
if let Some(m) = msg {
if config.debug_mode {
println!("[Pipe] 主循环收到消息:{}", m);
}
// quit 命令 → 退出
if m == "quit" {
if config.debug_mode {
println!("[Pipe] 收到 quit 命令Updater 退出");
}
std::process::exit(0);
}
}
}
}
}
}
}