use rand::Rng; use std::collections::{HashMap, HashSet}; use std::sync::{Arc, Mutex}; // ===================== 日志模块 ===================== /// 日志文件(Mutex 确保线程安全写入) static LOG_FILE: std::sync::Mutex> = std::sync::Mutex::new(None); /// 初始化日志文件(启动时清除当天日志,然后以追加方式打开) /// 日志目录:exe所在目录/Log/ fn init_log_file() { let mut guard = LOG_FILE.lock().unwrap(); if guard.is_some() { return; } let exe_path = match std::env::current_exe() { Ok(p) => p, Err(_) => return, }; let log_dir = match exe_path.parent() { Some(d) => d.join("Updater").join("Log"), None => return, }; // 创建 Log 目录 let _ = std::fs::create_dir_all(&log_dir); let log_name = format!("Updater_{}.log", chrono::Local::now().format("%Y%m%d")); let log_path = log_dir.join(&log_name); // 启动时清除当天的日志文件 let _ = std::fs::remove_file(&log_path); match std::fs::OpenOptions::new() .create(true) .append(true) .open(&log_path) { Ok(file) => { eprintln!("[日志] 日志文件: {:?}", log_path); *guard = Some(file); } Err(e) => { eprintln!("[日志] 无法打开日志文件: {}", e); } } } /// 日志宏:同时输出到控制台和文件 macro_rules! log_print { ($($arg:tt)*) => {{ let msg = format!($($arg)*); print!("{}\n", msg); std::io::stdout().flush().ok(); if LOG_FILE.lock().unwrap().is_none() { init_log_file(); } if let Some(ref mut file) = *LOG_FILE.lock().unwrap() { use std::io::Write; let _ = writeln!(file, "{}", msg); let _ = file.flush(); } }}; } /// 错误日志宏:同时输出到控制台stderr和文件(始终记录) macro_rules! log_eprintln { ($($arg:tt)*) => {{ let msg = format!($($arg)*); eprintln!("{}", msg); if LOG_FILE.lock().unwrap().is_none() { init_log_file(); } if let Some(ref mut file) = *LOG_FILE.lock().unwrap() { use std::io::Write; let _ = writeln!(file, "{}", msg); let _ = file.flush(); } }}; } // ===================== 更新阶段枚举 ===================== /// 更新流程的三个阶段 #[derive(Debug, Clone, Copy, PartialEq, Eq)] enum UpdatePhase { BootLoader, // 阶段1:检查 BootLoader.exe Updater, // 阶段2:检查 Updater.exe Apps, // 阶段3:检查其他应用版本 AppsWaitAllFile, // 阶段3.5:等待 GetAllFile 响应 Complete, // 所有阶段完成 } // ===================== 更新上下文(局部状态 + Arc 共享)===================== /// 更新流程的状态上下文 /// 使用 Mutex 实现 Sync(即使单线程也需要满足 Rust 的线程安全约束) struct UpdateContext { /// 当前正在下载的文件状态 download_state: Mutex>, /// 其他应用下载状态:key = "app_name/relative_path" app_download_map: Mutex>, /// 其他应用更新完成标记:set of "app_name/relative_path" app_completed_set: Mutex>, /// Updater.exe 的服务端版本号 server_updater_version: Mutex>, /// 当前更新阶段 current_phase: Mutex, /// 候选应用列表 (app_name, local_version, exe_path),阶段3使用 candidates: Mutex>, /// 等待 GetAllFile 响应的应用列表(阶段3.5使用) pending_allfile_apps: Mutex>, /// 待下载队列 (app_name, filename, offset, expected_md5),顺序下载 download_queue: Mutex>, /// 是否正在下载中(队列中有待处理项) is_downloading: Mutex, /// 当前正在下载的文件的期望 MD5(DownloadComplete 校验用) current_download_md5: Mutex>, /// 已升级的应用列表 (app_name, current_ver, latest_ver, exe_path),下载完成后通知用 upgraded_apps: Mutex>, } impl Default for UpdateContext { fn default() -> Self { Self { download_state: Mutex::new(None), app_download_map: Mutex::new(HashMap::new()), app_completed_set: Mutex::new(HashSet::new()), server_updater_version: Mutex::new(None), current_phase: Mutex::new(UpdatePhase::BootLoader), candidates: Mutex::new(Vec::<(String, String, String)>::new()), pending_allfile_apps: Mutex::new(Vec::new()), download_queue: Mutex::new(std::collections::VecDeque::new()), is_downloading: Mutex::new(false), current_download_md5: Mutex::new(None), upgraded_apps: Mutex::new(Vec::new()), } } } use serde::{Deserialize, Serialize}; use std::fs::{self, File}; use std::io::Read; use std::path::PathBuf; use std::pin::Pin; use std::process::Command; use std::io::Write as StdWrite; use std::io::Seek; use base64::{Engine as _, engine::general_purpose::STANDARD as BASE64}; use cube_lib::websocket::{WebSocketClient, WebSocketConfig}; // ===================== 文件下载状态 ===================== struct DownloadState { filename: String, offset: u64, temp_path: Option, file: Option, } // ===================== 命名管道服务端 ===================== /// 固定管道名称(单实例运行,直接用此名称) const PIPE_NAME: &str = r"\\.\pipe\Updater"; /// 从命名管道读取一行消息(阻塞,需在 spawn_blocking 中调用) fn read_pipe_message(pipe: windows::Win32::Foundation::HANDLE, debug: bool) -> Option { use windows::Win32::Storage::FileSystem::ReadFile; let mut buf = [0u8; 4096]; let mut bytes_read: u32 = 0; let result = unsafe { ReadFile( pipe, Some(&mut buf), Some(&mut bytes_read), None, ) }; if result.is_err() { if debug { eprintln!("[Pipe] 读取失败"); } return None; } if bytes_read == 0 { return None; } let msg = String::from_utf8_lossy(&buf[..bytes_read as usize]) .trim_end() .to_string(); if debug { println!("[Pipe] 收到消息:{}", msg); } Some(msg) } /// 写入 ACK 响应到命名管道 fn write_pipe_ack(pipe: windows::Win32::Foundation::HANDLE, debug: bool) { use windows::Win32::Storage::FileSystem::WriteFile; let ack = b"ACK\r\n"; let mut written: u32 = 0; let result = unsafe { WriteFile( pipe, Some(ack), Some(&mut written), None, ) }; if result.is_err() && debug { eprintln!("[Pipe] 写入 ACK 失败"); } } /// 启动命名管道异步服务端(长生命周期 async task)。 /// 通过 spawn_blocking 在独立 OS 线程中调用同步的 ConnectNamedPipe / ReadFile, /// 收到消息后通过 tokio channel 传回主循环。 /// 连接断开后自动重新监听下一条连接。 fn start_named_pipe_server( tx: tokio::sync::mpsc::Sender, debug: bool, ) -> tokio::task::JoinHandle<()> { tokio::task::spawn(async move { let pipe_name = PIPE_NAME; if debug { println!("[Pipe] 命名管道服务端启动:{}", pipe_name); } loop { // 在 blocking 线程中创建并等待客户端连接 let msg = tokio::task::spawn_blocking(move || { use windows::Win32::Storage::FileSystem::PIPE_ACCESS_DUPLEX; use windows::Win32::System::Pipes::{ ConnectNamedPipe, CreateNamedPipeW, DisconnectNamedPipe, PIPE_READMODE_BYTE, PIPE_TYPE_BYTE, PIPE_WAIT, }; use windows::core::PCWSTR; let name: Vec = pipe_name.encode_utf16().chain(std::iter::once(0)).collect(); let pipe = unsafe { CreateNamedPipeW( PCWSTR::from_raw(name.as_ptr()), PIPE_ACCESS_DUPLEX, PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT, 1, // nMaxInstances 65536, // nOutBufferSize 65536, // nInBufferSize 0, // nDefaultTimeout(毫秒,0=默认 50ms) None, // lpSecurityAttributes ) }; if pipe == windows::Win32::Foundation::INVALID_HANDLE_VALUE { eprintln!("[Pipe] CreateNamedPipeW 失败"); return None; } if debug { println!("[Pipe] 等待客户端连接..."); } // 阻塞等待客户端连接 let connected = unsafe { ConnectNamedPipe(pipe, None) }; if connected.is_err() { eprintln!("[Pipe] ConnectNamedPipe 失败"); return None; } if debug { println!("[Pipe] 客户端已连接"); } // 读取消息 let msg = read_pipe_message(pipe, debug); // 回复 ACK if msg.is_some() { write_pipe_ack(pipe, debug); } // 断开连接 unsafe { DisconnectNamedPipe(pipe).ok() }; msg }) .await; // 将消息发送到 tokio channel match msg { Ok(Some(m)) => { // quit 命令:发送到 channel 后退出循环 if m == "quit" { if tx.send(m).await.is_err() { break; } if debug { println!("[Pipe] quit 已转发,退出监听循环"); } break; } // 正常消息 if tx.send(m).await.is_err() { break; } } Ok(None) => { // 连接读取失败,继续等待下一条 } Err(_) => { // spawn_blocking 失败 break; } } } if debug { println!("[Pipe] 命名管道服务端已停止"); } }) } // ===================== 版本比较与下载 ===================== /// 获取 Updater 数据目录(X:\AppData\,存放 BootLoader.exe 等) fn get_updater_data_dir() -> PathBuf { let exe_path = std::env::current_exe().expect("Failed to get executable path"); let drive = exe_path .parent() .and_then(|p| p.as_os_str().to_str()) .and_then(|s| s.split('\\').next()) .unwrap_or("C:"); let appdata = PathBuf::from(format!("{}/AppData", drive)); let _ = fs::create_dir_all(&appdata); appdata } /// 获取本地文件版本号(使用 PowerShell 获取 PE 文件版本信息) fn get_local_file_version(filename: &str) -> String { let file_path = get_updater_data_dir().join(filename); if !file_path.exists() { return "0.0.0".to_string(); } #[cfg(windows)] { let path_str = file_path.to_string_lossy().to_string(); let ps_script = format!( "(Get-Item -LiteralPath '{}' -ErrorAction SilentlyContinue).VersionInfo.FileVersion", path_str.replace("'", "''") ); let output = Command::new("powershell") .args(["-NoProfile", "-NonInteractive", "-Command", &ps_script]) .output(); if let Ok(output) = output { let stdout = String::from_utf8_lossy(&output.stdout); let version = stdout.trim(); if !version.is_empty() && version != "0" && !version.contains("没有版本") { return version.to_string(); } } // 备用:使用文件修改时间作为"版本" if let Ok(metadata) = fs::metadata(&file_path) { use std::time::UNIX_EPOCH; if let Ok(modified) = metadata.modified() { let duration = modified.duration_since(UNIX_EPOCH).unwrap_or_default(); return format!("{}.{}", duration.as_secs(), duration.subsec_nanos() / 1_000_000); } } "0.0.0".to_string() } #[cfg(not(windows))] { "0.0.0".to_string() } } /// 比较两个版本号(a < b 返回 true) /// 使用 CubeLib 库的版本比较函数 fn version_less_than(a: &str, b: &str) -> bool { cube_lib::version_less_than(a, b) } /// 计算本地文件前 N 字节的 MD5 hash(字节数为 0 表示全部) #[allow(dead_code)] fn compute_file_hash_in_dir(app_name: &str, filename: &str, bytes: u64, _debug: bool) -> Option { let file_path = get_updater_data_dir() .join("Updater") .join("UpGrade") .join(app_name) .join(filename); if !file_path.exists() { return None; } let mut file = File::open(&file_path).ok()?; let file_size = file.metadata().ok()?.len(); let read_bytes = if bytes == 0 || bytes > file_size { file_size } else { bytes }; let mut buffer = vec![0u8; read_bytes as usize]; file.read_exact(&mut buffer).ok()?; let hash = md5::compute(&buffer); Some(format!("{:x}", hash)) } /// 计算本地文件前 N 字节的 MD5 hash(字节数为 0 表示全部) fn compute_file_hash(filename: &str, bytes: u64, _debug: bool) -> Option { let file_path = get_updater_data_dir().join(filename); if !file_path.exists() { return None; } let mut file = File::open(&file_path).ok()?; let file_size = file.metadata().ok()?.len(); // 如果 bytes=0 或超过文件大小,则计算整个文件 let read_bytes = if bytes == 0 || bytes > file_size { file_size } else { bytes }; // 读取前 read_bytes 字节 let mut buffer = vec![0u8; read_bytes as usize]; file.read_exact(&mut buffer).ok()?; let hash = md5::compute(&buffer); Some(format!("{:x}", hash)) } /// 计算文件的完整 MD5 hash fn compute_file_md5(file_path: &PathBuf) -> Option { if !file_path.exists() { log_print!("[MD5] 文件不存在: {:?}", file_path); return None; } let file_data = match fs::read(file_path) { Ok(data) => data, Err(e) => { log_print!("[MD5] 读取文件失败: {:?}, error: {}", file_path, e); return None; } }; let hash = md5::compute(&file_data); Some(format!("{:x}", hash)) } /// 获取临时文件的当前大小(字节数) /// 注意:Updater.exe 的临时文件是 Updater.new.exe.tmp fn get_tmp_file_size(filename: &str) -> u64 { let tmp_filename = if filename == "Updater.exe" { "Updater.new.exe.tmp" } else { filename }; let tmp_path = get_updater_data_dir().join(format!("{}.tmp", tmp_filename)); tmp_path.metadata().map(|m| m.len()).unwrap_or(0) } /// 检查是否存在 Updater.new.exe 文件 fn has_updater_new_exe() -> bool { let updater_new_path = get_updater_data_dir().join("Updater.new.exe"); updater_new_path.exists() } /// 安排启动 BootLoader.exe(延迟执行,等待所有下载完成) /// 通过 shutdown_tx 发送断连信号,通知主循环优雅退出 fn schedule_bootloader_launch( debug: bool, shutdown_tx_arc: std::sync::Arc>>>, ) { // 延迟 1 秒后启动,确保文件句柄已关闭 std::thread::spawn(move || { std::thread::sleep(std::time::Duration::from_secs(1)); let ts = chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f"); let bootloader_path = get_updater_data_dir().join("BootLoader.exe"); if !bootloader_path.exists() { if debug { println!("{} [错误] BootLoader.exe 不存在,无法启动", ts); } return; } // 启动 BootLoader.exe if debug { println!("{} [启动] 正在启动 BootLoader.exe...", ts); } #[cfg(windows)] { use std::os::windows::process::CommandExt; const DETACHED_PROCESS: u32 = 0x00000008; const CREATE_NO_WINDOW: u32 = 0x08000000; match Command::new(&bootloader_path) .args(["--from-updater"]) .creation_flags(DETACHED_PROCESS | CREATE_NO_WINDOW) .spawn() { Ok(_) => { if debug { println!("{} [启动] BootLoader.exe 已启动,Updater 即将退出", ts); } // 发送断连信号,通知主循环优雅退出 if let Some(tx) = shutdown_tx_arc.lock().unwrap().take() { let _ = tx.send(()); } } Err(e) => { log_eprintln!("{} [错误] 启动 BootLoader.exe 失败: {}", ts, e); // 启动失败也发送断连信号 if let Some(tx) = shutdown_tx_arc.lock().unwrap().take() { let _ = tx.send(()); } } } } #[cfg(not(windows))] { match Command::new(&bootloader_path) .arg("--from-updater") .spawn() { Ok(_) => { if debug { println!("{} [启动] BootLoader.exe 已启动,Updater 即将退出", ts); } if let Some(tx) = shutdown_tx_arc.lock().unwrap().take() { let _ = tx.send(()); } } Err(e) => { log_eprintln!("{} [错误] 启动 BootLoader.exe 失败: {}", ts, e); if let Some(tx) = shutdown_tx_arc.lock().unwrap().take() { let _ = tx.send(()); } } } } }); } /// 发送 GetFileMd5 请求 fn request_file_md5(sender: &cube_lib::websocket::MessageSender, filename: &str, bytes: u64) { let msg_str = format!( r#"{{"Type":"GetFileMd5","Data":{{"filename":"{}","bytes":{}}}}}"#, filename, bytes ); let ts = chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f"); log_print!("{} 发送消息:{}", ts, msg_str); sender.send(msg_str); } /// 发送文件下载请求(断点续传) fn request_download(sender: &cube_lib::websocket::MessageSender, filename: &str, offset: u64) { let msg_str = format!( r#"{{"Type":"DownloadFile","Data":{{"filename":"{}","offset":{}}}}}"#, filename, offset ); let ts = chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f"); log_print!("{} 发送消息:{}", ts, msg_str); sender.send(msg_str); } /// 处理收到的文件块 fn handle_file_chunk( ctx: &Arc, data: &serde_json::Map, debug: bool, ) -> Option { let filename = data.get("filename").and_then(|v| v.as_str()).unwrap_or(""); let offset = data.get("offset").and_then(|v| v.as_u64()).unwrap_or(0); let chunk_data = data.get("data").and_then(|v| v.as_str()).unwrap_or(""); let is_last = data.get("is_last").and_then(|v| v.as_bool()).unwrap_or(false); // Updater.exe 保存为 Updater.new.exe let final_filename = if filename == "Updater.exe" { "Updater.new.exe".to_string() } else { filename.to_string() }; let mut ctx = ctx.download_state.lock().unwrap(); // 只有文件名变化才重置(开始下载新文件) // 注意:不要用 offset==0 判断是否重置! // offset>0 表示续传(server 从该偏移继续发块),不应重置 tmp 文件 let need_reset = match ctx.as_ref() { Some(s) => &s.filename != &final_filename, None => true, }; if need_reset { // 关闭旧文件 if let Some(ref mut s) = *ctx { if let Some(f) = s.file.take() { drop(f); } if let Some(ref tp) = s.temp_path { let _ = fs::remove_file(tp); } } *ctx = None; // 使用 AppData/Updater/ 作为数据目录 let data_dir = get_updater_data_dir(); let temp_path = data_dir.join(format!("{}.tmp", final_filename)); // 续传:tmp 文件已存在,以追加方式打开(保留已有数据) // 新下载:tmp 文件不存在,用 File::create 创建(截断模式) let (file, resume_offset) = if temp_path.exists() { use std::fs::OpenOptions; match OpenOptions::new().write(true).append(true).open(&temp_path) { Ok(f) => { let current_size = f.metadata().map(|m| m.len()).unwrap_or(0); if debug { eprintln!("[下载] 续传:追加打开 {:?}, 当前大小={}", temp_path, current_size); } let mut file = f; file.seek(std::io::SeekFrom::Start(current_size)).ok(); (file, current_size) } Err(e) => { if debug { eprintln!("[下载] 无法追加打开临时文件 {}: {}", filename, e); } return None; } } } else { // 新下载,截断创建 match File::create(&temp_path) { Ok(file) => (file, 0u64), Err(e) => { if debug { eprintln!("[下载] 无法创建临时文件 {}: {}", filename, e); } return None; } } }; *ctx = Some(DownloadState { filename: final_filename.clone(), offset: resume_offset, temp_path: Some(temp_path), file: Some(file), }); } // 解码并追加数据 if let Some(ref mut state) = *ctx { let current_offset = state.offset; if offset == current_offset { if let Some(ref mut file) = state.file { match BASE64.decode(chunk_data) { Ok(decoded) => { match file.write_all(&decoded) { Ok(_) => { let _ = file.flush(); state.offset += decoded.len() as u64; } Err(e) => { if debug { eprintln!("[下载] 写入失败: {}", e); } } } } Err(e) => { if debug { eprintln!("[下载] Base64 解码失败: {}", e); } } } } } else { if debug { eprintln!("[下载] 偏移不匹配: 期望 {}, 收到 {}", current_offset, offset); } } } // 最后一块:重命名临时文件为正式文件 if is_last { let state = ctx.take().unwrap(); let temp_path = state.temp_path; let final_offset = state.offset; // 关闭文件句柄 if let Some(f) = state.file { drop(f); } if let Some(ref temp) = temp_path { let data_dir = get_updater_data_dir(); let final_path = data_dir.join(&final_filename); // 原子重命名 if let Err(e) = fs::rename(temp, &final_path) { if debug { eprintln!("[下载] 重命名失败: {}", e); } // 尝试直接覆盖写入 let _ = fs::copy(temp, &final_path); let _ = fs::remove_file(temp); } if debug { let ts = chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f"); println!("{} [下载] {} 下载完成,共 {} 字节", ts, final_filename, final_offset); } } // 返回下载完成的文件名,用于后续流程判断 Some(final_filename) } else { None } } /// 处理下载完成消息 fn handle_download_complete( ctx: &Arc, data: &serde_json::Map, debug: bool, ) -> Option { let filename = data.get("filename").and_then(|v| v.as_str()).unwrap_or(""); let size = data.get("size").and_then(|v| v.as_u64()).unwrap_or(0); // Updater.exe 保存为 Updater.new.exe let final_filename = if filename == "Updater.exe" { "Updater.new.exe".to_string() } else { filename.to_string() }; if debug { let ts = chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f"); println!("{} [下载] {} 下载完成,最终大小: {} 字节", ts, final_filename, size); } let mut ctx = ctx.download_state.lock().unwrap(); // 如果有临时文件,执行重命名(防止 DownloadComplete 先于最后一个 FileChunk 到达) let temp_to_rename = ctx.as_ref().and_then(|s| s.temp_path.clone()); let has_temp = temp_to_rename.is_some(); if has_temp { let data_dir = get_updater_data_dir(); let final_path = data_dir.join(&final_filename); // 先关闭文件句柄并获取 temp_path let temp_path_owned = ctx.as_mut().and_then(|s| s.temp_path.take()).unwrap(); // 重置状态 *ctx = None; // 原子重命名 if let Err(e) = fs::rename(&temp_path_owned, &final_path) { if debug { eprintln!("[下载] 重命名失败: {}", e); } // 尝试直接覆盖写入 let _ = fs::copy(&temp_path_owned, &final_path); let _ = fs::remove_file(&temp_path_owned); } if debug { let ts = chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f"); println!("{} [下载] {} 临时文件已重命名", ts, final_filename); } } else { // 没有临时文件,只重置状态 *ctx = None; } Some(final_filename) } // ===================== 其他应用程序更新 ===================== /// 获取其他应用文件的临时文件大小 #[allow(dead_code)] fn get_app_tmp_file_size(app_name: &str, relative_path: &str) -> u64 { let tmp_path = get_updater_data_dir() .join("Updater") .join("UpGrade") .join(app_name) .join(format!("{}.tmp", relative_path)); tmp_path.metadata().map(|m| m.len()).unwrap_or(0) } /// 向服务器查询指定文件的 md5(用于断点续传校验) #[allow(dead_code)] fn request_file_md5_for_app( sender: &cube_lib::websocket::MessageSender, app_name: &str, relative_path: &str, offset: u64, ) { let msg_str = format!( r#"{{"Type":"GetFileMd5","Data":{{"filename":"{}","bytes":{}}}}}"#, relative_path, offset ); let ts = chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f"); log_print!("{} [应用] {} 请求 md5 (offset {}): {}", ts, app_name, offset, relative_path); log_print!("{} 发送消息:{}", ts, msg_str); sender.send(msg_str); } /// 向服务器请求下载指定应用的文件 fn request_download_for_app( sender: &cube_lib::websocket::MessageSender, app_name: &str, relative_path: &str, offset: u64, ) { // 文件名前加上应用名称目录 let full_path = format!("{}/{}", app_name, relative_path); let msg_str = format!( r#"{{"Type":"DownloadFile","Data":{{"filename":"{}","offset":{}}}}}"#, full_path, offset ); let ts = chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f"); log_print!("{} 发送消息:{}", ts, msg_str); sender.send(msg_str); } /// 从待下载队列取出一个文件发送(顺序下载) /// shutdown_tx: 进入 Complete 阶段后发送此信号断开连接 fn send_next_download( ctx: &Arc, sender: &cube_lib::websocket::MessageSender, shutdown_tx: &std::sync::Arc>>> ) { let mut queue = ctx.download_queue.lock().unwrap(); if let Some((app_name, filename, offset, expected_md5)) = queue.pop_front() { // 记录当前下载的期望 MD5,供 DownloadComplete 校验 *ctx.current_download_md5.lock().unwrap() = Some(expected_md5); request_download_for_app(sender, &app_name, &filename, offset); } else { *ctx.is_downloading.lock().unwrap() = false; // 队列为空,检查是否所有应用都处理完了 let ts = chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f"); let pending_apps = ctx.pending_allfile_apps.lock().unwrap(); if pending_apps.is_empty() { *ctx.current_phase.lock().unwrap() = UpdatePhase::Complete; // 打印下载完成摘要 let completed = ctx.app_completed_set.lock().unwrap(); let mut app_summary: std::collections::HashMap> = std::collections::HashMap::new(); for key in completed.iter() { if let Some((app_name, filename)) = key.split_once('/') { app_summary.entry(app_name.to_string()).or_default().push(filename.to_string()); } } log_print!("{} [下载] === 所有应用文件下载完成 ===", ts); for (app_name, files) in &app_summary { log_print!("{} [下载] {}: {} 个文件 ({})", ts, app_name, files.len(), files.join(", ")); } log_print!("{} [下载] 文件保存在: {:?}", ts, get_updater_data_dir().join("Updater").join("UpGrade")); log_print!("{} [下载] ======================================", ts); // 发送 shutdown 信号,断开连接返回主循环 if let Some(tx) = shutdown_tx.lock().unwrap().take() { let _ = tx.send(()); } } else { log_print!("{} [下载] 队列为空,等待 {} 个应用完成 AllFile 响应", ts, pending_apps.len()); } } } /// 处理其他应用的文件块(写入 AppData/{app_name}/{relative_path}) fn handle_app_file_chunk( ctx: &Arc, app_name: &str, data: &serde_json::Map, debug: bool, ) -> Option<(String, String)> { // 注意:其他应用的文件块,filename 字段就是 relative_path(服务端可能含 app/ 前缀) let filename = data.get("filename").and_then(|v| v.as_str()).unwrap_or(""); if filename.is_empty() { return None; } let offset = data.get("offset").and_then(|v| v.as_u64()).unwrap_or(0); let chunk_data = data.get("data").and_then(|v| v.as_str()).unwrap_or(""); let is_last = data.get("is_last").and_then(|v| v.as_bool()).unwrap_or(false); // 服务端 filename 可能含 app/ 前缀(如 "EasyTest/xxx.dll"),统一去掉前缀 // app_map 中的 key 用 app_name/relative_path 格式存储 let expected_prefix = format!("{}/", app_name); let relative_path = if filename.starts_with(&expected_prefix) { &filename[expected_prefix.len()..] } else { filename }; let key = format!("{}/{}", app_name, relative_path); // 记录原始 relative_path,供 is_last=false 时请求下一块使用 let relative_path = relative_path.to_string(); let mut app_map = ctx.app_download_map.lock().unwrap(); // 获取当前正在下载的文件名(用于判断是否是新文件) // 注意:app_map 中存储的 filename 是不含前缀的 relative_path let current_filename = app_map.values().next().map(|s| s.filename.clone()); // 新文件(非续传):清空旧 entry,等待 DownloadComplete 处理旧文件 // 续传(同一文件 + 非零 offset):追加数据,不清 entry // 注意:current_filename 来自 app_map(不含前缀),filename 来自服务端(含前缀) // 所以应该比较 relative_path 而不是 filename if current_filename.as_ref() != Some(&relative_path) && (!app_map.contains_key(&key) || offset == 0) { if let Some(f) = app_map.get_mut(&key) { if let Some(file) = f.file.take() { drop(file); } if let Some(ref tp) = f.temp_path { let _ = fs::remove_file(tp); } } app_map.remove(&key); // relative_path 已经是去掉 app_name/ 前缀的相对路径(如 "x64/SQLite.Interop.dll") // 直接用它构建路径,保留子目录信息 // 注意:不能用 with_extension("tmp") 直接替换,因为会丢失原有扩展名! // 需要手动添加 .tmp 后缀 let base_path = get_updater_data_dir() .join("Updater") .join("UpGrade") .join(app_name) .join(&relative_path); let temp_path = PathBuf::from(format!("{}.tmp", base_path.to_string_lossy())); let ts = chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f"); log_print!("{} [应用] 新文件: filename={}, tmp_path={:?}, parent_exists={}", ts, filename, temp_path, temp_path.parent().map(|p| p.exists()).unwrap_or(false)); match File::create(&temp_path) { Ok(file) => { app_map.insert(key.clone(), DownloadState { filename: relative_path.clone(), // 不带 app_name/ 前缀,与 current_filename 检查一致 offset: 0, temp_path: Some(temp_path), file: Some(file), }); } Err(e) => { if debug { eprintln!("[应用] 无法创建临时文件 {}: {}", filename, e); } return None; } } } let current_offset = app_map.get(&key).map(|s| s.offset).unwrap_or(0); if offset == current_offset { if let Some(dl_state) = app_map.get_mut(&key) { if let Some(ref mut file) = dl_state.file { match BASE64.decode(chunk_data) { Ok(decoded) => { match file.write_all(&decoded) { Ok(_) => { let _ = file.flush(); dl_state.offset += decoded.len() as u64; } Err(e) => { if debug { eprintln!("[应用] 写入失败 {}: {}", filename, e); } } } } Err(e) => { if debug { eprintln!("[应用] Base64 解码失败 {}: {}", filename, e); } } } } } } else { if debug { eprintln!("[应用] 偏移不匹配 {}: 期望 {}, 收到 {}", filename, current_offset, offset); } } // 最后一块:原子重命名 if is_last { let temp_path = app_map.get(&key).and_then(|s| s.temp_path.clone()); let final_offset = app_map.get(&key).map(|s| s.offset).unwrap_or(0); if let Some(ref temp) = temp_path { // final_path 是 temp_path 去掉 ".tmp" 后缀,直接在同一目录 // 注意:不能用 with_extension(""),因为它会替换原有扩展名! // 对于 "x86/SQLite.Interop.dll.tmp",with_extension("") 会得到 "x86/SQLite.Interop"(.dll 丢失!) let temp_str = temp.to_string_lossy(); let final_path = if temp_str.ends_with(".tmp") { PathBuf::from(&temp_str[..temp_str.len() - 4]) } else { temp.clone() }; let _ = fs::create_dir_all(final_path.parent().unwrap()); // 重命名(同步操作,Windows 上不会异步) let rename_ok = fs::rename(temp, &final_path).is_ok(); if !rename_ok { let _ = fs::copy(temp, &final_path); let _ = fs::remove_file(temp); } let ts = chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f"); log_print!("{} [应用] 文件下载完成: filename={}, final_path={:?}, rename_ok={}, final_exists={}", ts, filename, final_path, rename_ok, final_path.exists()); if debug { println!("{} [应用] {} 下载完成,共 {} 字节", ts, filename, final_offset); } } app_map.remove(&key); drop(app_map); // 释放锁 Some((app_name.to_string(), relative_path)) // 返回 relative_path } else { drop(app_map); // 释放锁 Some((app_name.to_string(), relative_path)) // 非最后一块也返回 relative_path,供请求下一块使用 } } /// 处理其他应用的文件下载完成(与 FileChunk 互斥,这里主要用于补充处理) fn handle_app_download_complete( ctx: &Arc, app_name: &str, filename: &str, size: u64, debug: bool, ) -> Option<(String, String)> { // 服务端 filename 可能含 app/ 前缀,提取纯相对路径 let expected_prefix = format!("{}/", app_name); let relative_path = if filename.starts_with(&expected_prefix) { &filename[expected_prefix.len()..] } else { filename }; let key = format!("{}/{}", app_name, relative_path); let mut app_map = ctx.app_download_map.lock().unwrap(); let temp_path = app_map.get(&key).and_then(|s| s.temp_path.clone()); if let Some(ref temp) = temp_path { // final_path 是 temp_path 去掉 ".tmp" 后缀,直接在同一目录 let final_path = temp.with_extension(""); // 去掉 .tmp 后缀 let _ = fs::create_dir_all(final_path.parent().unwrap()); if let Err(e) = fs::rename(temp, &final_path) { let ts = chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f"); log_print!("{} [应用] 重命名失败 {} -> {:?}: {},改用 copy", ts, filename, final_path, e); let _ = fs::copy(temp, &final_path); let _ = fs::remove_file(temp); } else { let ts = chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f"); log_print!("{} [应用] 重命名成功 {} -> {:?}", ts, filename, final_path); } if debug { let ts = chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f"); println!("{} [应用] {} 下载完成(DownloadComplete),大小: {} 字节", ts, filename, size); } } app_map.remove(&key); Some((app_name.to_string(), filename.to_string())) } /// 只负责 Updater 自己的行为参数,连接地址从公共 config.json 加载 #[derive(Debug, Serialize, Deserialize)] struct Config { /// 调试模式:true 时保留控制台窗口并输出日志 debug_mode: bool, } impl Default for Config { fn default() -> Self { Self { debug_mode: false } } } /// 获取 Updater 自身配置路径 AppData/Updater/config.json fn get_updater_config_path() -> PathBuf { get_updater_data_dir().join("Updater").join("config.json") } /// 获取公共配置路径 AppData/config.json(与 BootLoader 同级) fn get_public_config_path() -> PathBuf { let exe_path = std::env::current_exe().expect("Failed to get executable path"); let drive = exe_path .parent() .and_then(|p| p.as_os_str().to_str()) .and_then(|s| s.split('\\').next()) .unwrap_or("C:"); PathBuf::from(format!("{}/AppData/config.json", drive)) } /// 加载 Updater 自身配置;若文件不存在则写入默认值 fn load_updater_config() -> Config { let config_path = get_updater_config_path(); if config_path.exists() { if let Ok(content) = fs::read_to_string(&config_path) { if let Ok(config) = serde_json::from_str::(&content) { return config; } } } // 文件不存在或解析失败 → 写入默认值 let default_config = Config::default(); if let Ok(content) = serde_json::to_string_pretty(&default_config) { let _ = fs::write(&config_path, content); } default_config } /// 从公共 config.json 读取 ServerUrl 字段 fn resolve_ws_url() -> String { let config_path = get_public_config_path(); if let Ok(content) = fs::read_to_string(&config_path) { if let Ok(json) = serde_json::from_str::(&content) { if let Some(url) = json.get("ServerUrl").and_then(|v| v.as_str()) { return url.to_string(); } } } // 读取失败 → 降级到默认值 "ws://127.0.0.1:8087/ws".to_string() } /// 从公共 config.json 读取 DeviceNumber 字段 fn resolve_device_number() -> String { let config_path = get_public_config_path(); if let Ok(content) = fs::read_to_string(&config_path) { if let Ok(json) = serde_json::from_str::(&content) { // 尝试多个可能的字段名 if let Some(id) = json.get("DeviceNumber").and_then(|v| v.as_str()) { if !id.is_empty() { return id.to_string(); } } if let Some(id) = json.get("StationId").and_then(|v| v.as_str()) { if !id.is_empty() { return id.to_string(); } } if let Some(id) = json.get("Station").and_then(|v| v.as_str()) { if !id.is_empty() { return id.to_string(); } } } } // 读取失败 → 降级到默认值 "UNKNOWN".to_string() } fn is_process_running(process_name: &str) -> bool { is_process_running_ex(process_name, None) } /// 检查进程是否在运行(可指定要排除的 PID) fn is_process_running_ex(process_name: &str, exclude_pid: Option) -> bool { use std::process::id; let current_pid = id(); let exclude = exclude_pid.unwrap_or(current_pid); let output = Command::new("tasklist") .args(["/FI", &format!("IMAGENAME eq {}", process_name), "/FO", "CSV"]) .output() .expect("Failed to execute tasklist"); let output_str = String::from_utf8_lossy(&output.stdout); let lines: Vec<&str> = output_str.lines().collect(); let mut count = 0; for line in lines { if line.contains(&format!("\"{}\"", process_name)) { // 解析 PID if let Some(pid_part) = line.split(",").nth(1) { if let Ok(pid) = pid_part.trim_matches('"').parse::() { if pid != exclude { count += 1; } } } } } count > 0 } /// 获取 AppData 目录下所有候选应用(排除 Updater) /// 返回 (app_name, local_version, exe_path) 列表 /// 版本号和路径在进程运行期间直接从进程路径读取,保证与进程实际加载的一致 fn get_app_candidates(debug: bool) -> Vec<(String, String, String)> { let appdata = get_updater_data_dir(); // X:\AppData\ if !appdata.exists() { return Vec::new(); } let mut candidates = Vec::new(); if let Ok(entries) = fs::read_dir(&appdata) { for entry in entries.flatten() { let path = entry.path(); if path.is_dir() { if let Some(name) = path.file_name().and_then(|n| n.to_str()) { if name != "Updater" && !name.starts_with('.') { let exe_name = format!("{}.exe", name); if is_process_running(&exe_name) { let (local_version, exe_path) = get_version_and_path_from_process(name); if debug { println!("[应用] 候选应用: {} v{} ({})", name, local_version, exe_path); } candidates.push((name.to_string(), local_version, exe_path)); } else if debug { println!("[应用] 跳过 {} (进程未运行)", name); } } } } } } candidates } /// 从运行中进程获取版本号和可执行文件路径 #[cfg(windows)] fn get_version_and_path_from_process(app_name: &str) -> (String, String) { let ps_script = format!( "$p = Get-Process -Name '{}' -ErrorAction SilentlyContinue | Select-Object -First 1; \ if ($p -and $p.Path) {{ \ $v = (Get-Item $p.Path -ErrorAction SilentlyContinue).VersionInfo.FileVersion; \ \"$v|$($p.Path)\" \ }} elseif ($p -and $p.Id) {{ \ $wmi = Get-CimInstance Win32_Process -Filter \"ProcessId=$($p.Id)\" -ErrorAction SilentlyContinue; \ if ($wmi.ExecutablePath) {{ \ $v = (Get-Item $wmi.ExecutablePath -ErrorAction SilentlyContinue).VersionInfo.FileVersion; \ \"$v|$($wmi.ExecutablePath)\" \ }} \ }}", app_name.replace("'", "''") ); let output = Command::new("powershell") .args(["-NoProfile", "-NonInteractive", "-Command", &ps_script]) .output(); if let Ok(output) = output { let stdout = String::from_utf8_lossy(&output.stdout); let line = stdout.trim(); if !line.is_empty() && !line.contains("没有文件") { if let Some(sep) = line.find('|') { let version = line[..sep].trim().to_string(); let path = line[sep + 1..].trim().to_string(); if !version.is_empty() && version != "0" { return (version, path); } } } } ("0.0.0".to_string(), String::new()) } /// 执行应用文件升级:把升级目录中的文件复制到目标目录 /// 升级目录:X:\AppData\Updater\UpGrade\{app_name}\* /// 目标目录:exe_path 的父目录(如 C:\AppData\EasyTest\) fn upgrade_app_files(app_name: &str, exe_path: &str) { let ts = chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f").to_string(); let upgrade_base = get_updater_data_dir() .join("Updater") .join("UpGrade") .join(app_name); if !upgrade_base.exists() { log_print!("{} [升级替换] {} 升级目录不存在: {:?}", ts, app_name, upgrade_base); return; } // 目标目录 = exe_path 的父目录 let target_dir = std::path::Path::new(exe_path) .parent() .map(|p| p.to_path_buf()) .unwrap_or_else(|| std::path::PathBuf::from(".")); log_print!("{} [升级替换] {} -> {:?}", ts, app_name, target_dir); let mut success_count = 0; let mut fail_count = 0; // 递归扫描升级目录 fn copy_dir_recursive(src: &std::path::Path, dst: &std::path::Path, ts: &str) -> (i32, i32) { let mut ok = 0; let mut fail = 0; let _ = fs::create_dir_all(dst); if let Ok(entries) = fs::read_dir(src) { for entry in entries.flatten() { let src_path = entry.path(); let file_name = entry.file_name(); let dst_path = dst.join(&file_name); if src_path.is_dir() { let (o, f) = copy_dir_recursive(&src_path, &dst_path, ts); ok += o; fail += f; } else { // 跳过 .tmp 文件 if let Some(name) = file_name.to_str() { if name.ends_with(".tmp") { continue; } } // 先删除目标文件(可能只读) let _ = fs::remove_file(&dst_path); match fs::copy(&src_path, &dst_path) { Ok(_) => { log_print!("{} [升级替换] {} -> {}", ts, file_name.to_string_lossy(), dst_path.display()); ok += 1; } Err(e) => { log_print!("{} [升级替换] 复制失败 {}: {}", ts, file_name.to_string_lossy(), e); fail += 1; } } } } } (ok, fail) } let (ok, fail) = copy_dir_recursive(&upgrade_base, &target_dir, ts.as_str()); success_count += ok; fail_count += fail; log_print!("{} [升级替换] {} 升级完成:成功 {} 个,失败 {} 个", ts, app_name, success_count, fail_count); } /// 重启指定应用(工作目录 = exe 所在目录) fn restart_app(app_name: &str, exe_path: &str) { let ts = chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f"); // 工作目录 = exe 所在目录 let work_dir = std::path::Path::new(exe_path) .parent() .map(|p| p.to_path_buf()) .unwrap_or_else(|| std::path::PathBuf::from(".")); use std::process::Command; match Command::new(exe_path) .current_dir(&work_dir) .spawn() { Ok(child) => { log_print!("{} [重启] 已启动 {} (PID={}),工作目录: {}", ts, app_name, child.id(), work_dir.display()); } Err(e) => { log_print!("{} [重启] 启动失败 {}: {}", ts, exe_path, e); } } } /// 递归扫描升级目录,返回 (relative_path, full_path) 列表 /// 跳过 .tmp 文件(如果存在同名非 tmp 文件的话) #[allow(dead_code)] fn scan_upgrade_dir(app_name: &str) -> Vec<(String, PathBuf)> { let upgrade_base = get_updater_data_dir() .join("Updater") .join("UpGrade") .join(app_name); if !upgrade_base.exists() { log_print!("[应用] {} 升级目录不存在: {:?}", app_name, upgrade_base); return Vec::new(); } let mut files = Vec::new(); let mut visited_dirs = std::collections::VecDeque::new(); visited_dirs.push_back(upgrade_base.clone()); while let Some(dir) = visited_dirs.pop_front() { if let Ok(entries) = fs::read_dir(&dir) { for entry in entries.flatten() { let path = entry.path(); if path.is_dir() { visited_dirs.push_back(path); } else if path.is_file() { // 计算相对路径 if let Ok(rel) = path.strip_prefix(&upgrade_base) { let rel_str = rel.to_string_lossy().replace('\\', "/"); let rel_str = rel_str.trim_start_matches('/').to_string(); // 如果是 .tmp 文件,检查是否存在对应的非 tmp 文件 // 如果存在同名非 tmp 文件,则跳过此 tmp 文件 if rel_str.ends_with(".tmp") { let non_tmp = rel_str.trim_end_matches(".tmp"); let non_tmp_path = upgrade_base.join(non_tmp); if non_tmp_path.exists() { log_print!("[应用] 跳过 tmp(同名文件已存在): {} -> {}", rel_str, non_tmp); continue; } } files.push((rel_str, path)); } } } } } log_print!("[应用] {} 升级文件列表 ({} 个):", app_name, files.len()); for (rel, _) in &files { log_print!("[应用] - {}", rel); } files } /// 扫描本地升级目录,返回 .tmp 文件列表(正式文件名 + 大小) fn scan_local_tmp_files(app_name: &str) -> Vec<(String, u64)> { let upgrade_base = get_updater_data_dir() .join("Updater") .join("UpGrade") .join(app_name); if !upgrade_base.exists() { return Vec::new(); } let mut tmp_files = Vec::new(); let mut visited_dirs = std::collections::VecDeque::new(); visited_dirs.push_back(upgrade_base.clone()); while let Some(dir) = visited_dirs.pop_front() { if let Ok(entries) = fs::read_dir(&dir) { for entry in entries.flatten() { let path = entry.path(); if path.is_dir() { visited_dirs.push_back(path); } else if path.is_file() { if let Some(filename) = path.file_name().and_then(|n| n.to_str()) { if filename.ends_with(".tmp") { // 计算相对路径,并去掉 .tmp 后缀(发送正式文件名) if let Ok(rel) = path.strip_prefix(&upgrade_base) { let rel_str = rel.to_string_lossy().replace('\\', "/"); let rel_str = rel_str.trim_start_matches('/').to_string(); // 去掉 .tmp 后缀,发送正式文件名 let official_name = rel_str.strip_suffix(".tmp") .unwrap_or(&rel_str); // 获取文件大小 if let Ok(metadata) = fs::metadata(&path) { // 跳过 0 字节的临时文件(MD5 无意义,视为从头续传) if metadata.len() > 0 { tmp_files.push((official_name.to_string(), metadata.len())); } } } } } } } } } tmp_files } /// 发送 GetAllFile 请求 fn send_get_all_file( sender: &cube_lib::websocket::MessageSender, app_name: &str, tmp_files: &[(String, u64)], ) { let ts = chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f"); // 构造成 {filename, size} 格式的数组 let tmp_files_json: Vec = tmp_files.iter() .map(|(name, size)| serde_json::json!({ "filename": name, "size": size })) .collect(); let tmp_files_str = serde_json::to_string(&tmp_files_json).unwrap_or_else(|_| "[]".to_string()); let msg_str = format!( r#"{{"Type":"GetAllFile","Data":{{"app_name":"{}","tmp_files":{}}}}}"#, app_name, tmp_files_str ); log_print!("{} 发送消息:{}", ts, msg_str); sender.send(msg_str); } // ===================== 向应用发送升级确认 ===================== /// 向指定应用发送升级确认消息(修复版:直接用 Win32 API 连接管道,不再用 PowerShell) /// 消息格式:{"Type":"UpgradeConfirm","Data":{"AppName":"xxx","CurrentVer":"1.0.0","LatestVer":"1.1.0"}} /// 返回 true = 用户批准,false = 用户拒绝/通信失败 fn notify_app_upgrade(app_name: &str, current_ver: &str, latest_ver: &str, _debug: bool) -> bool { #[cfg(windows)] { use std::process::Command; // 1. 获取所有运行中的进程 PID let ps_script = format!( "(Get-Process -Name '{}' -ErrorAction SilentlyContinue).Id", app_name ); let output = Command::new("powershell") .args(["-NoProfile", "-NonInteractive", "-Command", &ps_script]) .output(); let pids: Vec = match output { Ok(o) => { let s = String::from_utf8_lossy(&o.stdout); s.lines().filter_map(|l| l.trim().parse().ok()).collect() } Err(_) => return false, }; if pids.is_empty() { let ts = chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f"); log_print!("{} [升级确认] {} 未运行,跳过通知", ts, app_name); return false; } // 2. 构造消息 let msg = format!( r#"{{"Type":"UpgradeConfirm","Data":{{"AppName":"{}","CurrentVer":"{}","LatestVer":"{}"}}}}"#, app_name, current_ver, latest_ver ); // 3. 对每个 PID 发送升级确认并等待用户响应(双向握手) for pid in pids { let pipe_name = format!("\\\\.\\pipe\\EasyTest_{}", pid); let ts = chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f"); log_print!("{} [升级确认] 连接到 {} (PID={}),等待响应...", ts, pipe_name, pid); // 发送 UpgradeConfirm 并等待 UpgradeResponse(超时 10 秒) let response = connect_and_wait_response(&pipe_name, &msg, 10000); match response { Ok(true) => { log_print!("{} [升级确认] 用户批准升级 {} (PID={}),应用将退出", ts, app_name, pid); return true; } Ok(false) => { log_print!("{} [升级确认] 用户拒绝升级 {} (PID={})", ts, app_name, pid); return false; } Err(e) => { log_print!("{} [升级确认] 通信失败 {} (PID={}): {}", ts, app_name, pid, e); return false; } } } return false; } #[cfg(not(windows))] { let ts = chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f"); log_print!("{} [升级确认] 非 Windows 平台,跳过通知", ts); return false; } } /// 用 Win32 API 连接命名管道,发送消息后等待响应(双向握手) /// 返回 Ok(true) = 用户批准,Ok(false) = 用户拒绝,Err = 通信失败/超时 fn connect_and_wait_response( pipe_name: &str, msg: &str, timeout_ms: u32, ) -> Result { use windows::Win32::Foundation::{CloseHandle, INVALID_HANDLE_VALUE, HANDLE}; use windows::Win32::Storage::FileSystem::{CreateFileW, WriteFile, ReadFile, FILE_ATTRIBUTE_NORMAL}; use windows::Win32::System::Threading::CreateEventW; use windows::core::PCWSTR; // Windows API 常量 use windows::Win32::Storage::FileSystem::{FILE_SHARE_MODE, FILE_CREATION_DISPOSITION}; const GENERIC_READ: u32 = 0x80000000u32; const GENERIC_WRITE: u32 = 0x40000000u32; const FILE_SHARE_NONE: u32 = 0u32; const OPEN_EXISTING: u32 = 3u32; // 将管道名转换为宽字符串 let wide_name: Vec = pipe_name.encode_utf16().chain(std::iter::once(0)).collect(); // 打开命名管道(同时读写模式) let pipe = unsafe { CreateFileW( PCWSTR::from_raw(wide_name.as_ptr()), GENERIC_READ | GENERIC_WRITE, FILE_SHARE_MODE(FILE_SHARE_NONE), None, FILE_CREATION_DISPOSITION(OPEN_EXISTING), FILE_ATTRIBUTE_NORMAL, None, ) }; let pipe = match pipe { Ok(h) => h, Err(e) => return Err(format!("连接管道失败: {:?}", e)), }; if pipe == INVALID_HANDLE_VALUE { return Err("管道句柄无效".to_string()); } // 写入消息 + 换行(C# 端用 StreamReader.ReadLine 读取) let mut msg_with_newline = msg.to_string(); msg_with_newline.push_str("\r\n"); let data = msg_with_newline.as_bytes(); let mut written: u32 = 0; let write_ok = unsafe { WriteFile( pipe, Some(data), Some(&mut written), None, ) }; if write_ok.is_err() || written == 0 { unsafe { let _ = CloseHandle(pipe); }; return Err("写入消息失败".to_string()); } // 创建事件用于等待响应(失败则用 INVALID_HANDLE_VALUE) let event = unsafe { CreateEventW(None, true, false, None) } .unwrap_or(HANDLE(0)); let mut buf = [0u8; 4096]; let mut bytes_read: u32 = 0; let mut resp_buf = Vec::new(); // 循环读取直到收到完整响应(读到 \n)或超时 let start_time = std::time::Instant::now(); let deadline = timeout_ms as u64; loop { // 检查是否超时 if start_time.elapsed().as_millis() as u64 > deadline { if event != INVALID_HANDLE_VALUE { unsafe { let _ = CloseHandle(event); }; } unsafe { let _ = CloseHandle(pipe); }; return Err("等待响应超时".to_string()); } // 尝试读取数据(非阻塞轮询) let result = unsafe { ReadFile( pipe, Some(&mut buf), Some(&mut bytes_read), None, ) }; if result.is_ok() && bytes_read > 0 { resp_buf.extend_from_slice(&buf[..bytes_read as usize]); // 检查是否包含换行符(完整消息) if resp_buf.contains(&b'\n') { break; } } // 短暂等待后重试 std::thread::sleep(std::time::Duration::from_millis(50)); } if event != INVALID_HANDLE_VALUE { unsafe { let _ = CloseHandle(event); }; } unsafe { let _ = CloseHandle(pipe); }; // 解析响应 JSON let response = String::from_utf8_lossy(&resp_buf) .trim_end_matches(|c| c == '\r' || c == '\n') .to_string(); // 调试:打印原始响应 { let ts = chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f"); log_print!("{} [升级确认] 收到原始响应: {:?}", ts, response); } // 解析 {"Type":"UpgradeResponse","Approved":true/false} if let Some(start) = response.find("\"Approved\":") { // "Approved": 共11个字符,true/false 从 start+11 开始 let rest = &response[start + 11..]; let approved = rest.starts_with("true"); return Ok(approved); } Err(format!("无法解析响应: {}", response)) } /// 通知所有升级了的应用(EasyTest 等) /// app_upgrades: Vec<(app_name, current_ver, latest_ver, exe_path)> fn notify_all_app_upgrades(app_upgrades: &[(String, String, String, String)], _debug: bool) { let ts = chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f"); if app_upgrades.is_empty() { log_print!("{} [升级确认] 没有应用需要升级通知(upgraded_apps 为空)", ts); return; } log_print!("{} [升级确认] 准备通知 {} 个应用: {:?}", ts, app_upgrades.len(), app_upgrades); for (app_name, current_ver, latest_ver, exe_path) in app_upgrades { // 发送升级确认消息,等待用户响应 let approved = notify_app_upgrade(app_name, current_ver, latest_ver, true); // 用户批准升级后,等待应用退出,然后复制文件并重启 if approved { // 等待应用退出(最多 15 秒) log_print!("{} [升级确认] 等待 {} 退出...", ts, app_name); let start = std::time::Instant::now(); let mut exited = false; loop { let check = Command::new("powershell") .args(["-NoProfile", "-NonInteractive", "-Command", &format!("if ((Get-Process -Name '{}' -ErrorAction SilentlyContinue) -eq $null) {{ 'exit' }} else {{ 'running' }}", app_name)]) .output(); if let Ok(out) = check { if String::from_utf8_lossy(&out.stdout).contains("exit") { exited = true; break; } } if start.elapsed().as_secs() > 15 { log_print!("{} [升级确认] 等待退出超时,跳过", ts); break; } std::thread::sleep(std::time::Duration::from_millis(500)); } // 应用退出后,复制文件并重启 if exited { upgrade_app_files(app_name, exe_path); restart_app(app_name, exe_path); } } } } /// 主动断连信号(用于在消息回调中请求优雅退出) /// - shutdown_tx: 从同步回调中发送,通知主循环主动断开 /// - disconnect_rx: 在主循环中等待 fn make_shutdown_channel() -> ( std::sync::Arc>>>, tokio::sync::oneshot::Receiver<()>, ) { let (tx, rx) = tokio::sync::oneshot::channel(); (std::sync::Arc::new(std::sync::Mutex::new(Some(tx))), rx) } /// 运行 Updater(使用 CubeLib 内置的自动重连) /// 返回本次运行是否更新了 Updater 自身(只有 Updater 更新了才需要退出重启) async fn run_updater(debug_mode: bool) -> bool { // 创建本次运行的上下文 let ctx = Arc::new(UpdateContext::default()); // 标记本次运行是否执行了更新(下载了文件) let update_performed = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)); let update_performed_clone = update_performed.clone(); // 标记 BootLoader 是否已下载(或无需下载) let bootloader_downloaded = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)); let bootloader_downloaded_clone = bootloader_downloaded.clone(); // 标记 Updater 是否已下载 let updater_downloaded = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)); let updater_downloaded_clone = updater_downloaded.clone(); // 标记一次更新检查是否已完成 let update_check_done = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)); let update_check_done_clone = update_check_done.clone(); // 主动断连通道 let (shutdown_tx_arc, mut disconnect_rx) = make_shutdown_channel(); // 加载初始 URL let server_url = resolve_ws_url(); // 扫描候选应用(阶段3使用) let app_candidates = get_app_candidates(debug_mode); // 将候选应用保存到 ctx *ctx.candidates.lock().unwrap() = app_candidates.clone(); // 初始化阶段为 BootLoader *ctx.current_phase.lock().unwrap() = UpdatePhase::BootLoader; if debug_mode { println!("========================================"); println!("Updater 启动 (调试模式)"); println!("服务器地址: {}", server_url); println!("自动重连: 启用 (指数退避: 1s - 30s)"); println!("更新阶段: BootLoader -> Updater -> Apps -> Complete"); if !app_candidates.is_empty() { println!("候选应用: {:?}", app_candidates); } println!("========================================"); } // 创建 WebSocket 配置(启用自动重连和心跳) let config = WebSocketConfig::new(&server_url) .with_client_type("Updater") .with_debug(debug_mode) .with_reconnect(true) // 启用自动重连 .with_reconnect_delay(1000) // 初始延迟 1s .with_max_reconnect_delay(30000) // 最大延迟 30s .with_heartbeat(30000); // 30秒心跳间隔 // 创建 WebSocket 客户端 let mut client = WebSocketClient::new(config); // 设置连接成功回调 let debug_connected = debug_mode; client.on_connected(move |url| { if debug_connected { let ts = chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f"); println!("{} 收到消息:{}", ts, url); } }); // 设置消息接收回调 let debug_msg = debug_mode; let device_number = resolve_device_number(); let shutdown_tx_arc_clone = shutdown_tx_arc.clone(); let ctx_clone = ctx.clone(); let update_performed_clone2 = update_performed_clone.clone(); let bootloader_downloaded_clone2 = bootloader_downloaded_clone.clone(); let updater_downloaded_clone2 = updater_downloaded_clone.clone(); let update_check_done_clone2 = update_check_done_clone.clone(); client.on_message(move |msg_type, data, sender| { let ts = chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f"); // 收到消息日志(始终记录) // FileChunk 跳过完整 Data 序列化(Data 中含约40KB base64,同步阻塞tokio runtime导致30秒延迟) if msg_type == "FileChunk" { let filename = data.get("Data") .and_then(|v| v.get("filename")) .and_then(|v| v.as_str()) .unwrap_or("?"); let offset = data.get("Data") .and_then(|v| v.get("offset")) .and_then(|v| v.as_u64()) .unwrap_or(0); let is_last = data.get("Data") .and_then(|v| v.get("is_last")) .and_then(|v| v.as_bool()) .unwrap_or(false); log_print!("{} 收到消息:{{\"Type\":\"FileChunk\",\"filename\":\"{}\",\"offset\":{},\"is_last\":{}}}", ts, filename, offset, is_last); } else { let actual_data = data.get("Data").unwrap_or(&serde_json::Value::Null); let data_str = serde_json::to_string(actual_data).unwrap_or_else(|_| "{}".to_string()); log_print!("{} 收到消息:{{\"Type\":{},\"Data\":{}}}", ts, serde_json::to_string(&msg_type).unwrap_or_default(), data_str ); } // 收到 welcome 后,重置状态并从头开始执行流程 if msg_type == "welcome" { // 重连后强制从头开始 *ctx_clone.current_phase.lock().unwrap() = UpdatePhase::BootLoader; let ts = chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f"); log_print!("{} [重连] 重置状态,从头开始执行流程", ts); // 等待 device_number 生效(无次数限制,每10秒重试一次) let wait_seconds = 10; let mut current_device = device_number.clone(); let mut attempt = 0; log_print!("{} [调试] 初始 DeviceNumber: '{}'", ts, current_device); loop { if !current_device.is_empty() && current_device != "UNKNOWN" { // 阶段1:只请求 BootLoader.exe log_print!("{} [阶段1] 检查 BootLoader (DeviceNumber: '{}')...", ts, current_device); let msg_str = format!( r#"{{"Type":"GetFileVer","Data":{{"DeviceNumber":"{}","file_list":["BootLoader.exe"]}}}}"#, current_device ); log_print!("{} 发送消息:{}", ts, msg_str); sender.send(msg_str); log_print!("{} [调试] GetFileVer 消息已发送", ts); break; } attempt += 1; log_print!("{} [警告] DeviceNumber 为空或 UNKNOWN (尝试 {}),等待 {} 秒后重试...", ts, attempt, wait_seconds); std::thread::sleep(std::time::Duration::from_secs(wait_seconds)); // 重新读取配置文件(可能刚被写入) current_device = resolve_device_number(); log_print!("{} [调试] 重新读取 DeviceNumber: '{}'", ts, current_device); } } // 处理 FileVer 响应(根据当前阶段分发) if msg_type == "FileVer" { if let Some(file_versions) = data.get("Data").and_then(|d| d.get("file_versions")).and_then(|v| v.as_object()) { let current_phase = *ctx_clone.current_phase.lock().unwrap(); let ts = chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f"); match current_phase { UpdatePhase::AppsWaitAllFile => { // 等待 AllFile 响应,忽略 FileVer } UpdatePhase::BootLoader => { // ========== 阶段1:处理 BootLoader ========== if let Some(server_ver) = file_versions.get("BootLoader.exe") { let server_version = server_ver.as_str().unwrap_or("0.0.0"); let local_version = get_local_file_version("BootLoader.exe"); let tmp_size = get_tmp_file_size("BootLoader.exe"); log_print!("{} [版本] BootLoader.exe: 服务端={}, 本地={}", ts, server_version, local_version); let need_update = local_version == "0.0.0" || version_less_than(&local_version, server_version); if need_update { if tmp_size > 0 { log_print!("{} [续传] BootLoader.exe 发现未完成下载,请求 hash 校验...", ts); request_file_md5(&sender, "BootLoader.exe", tmp_size); } else { log_print!("{} [升级] BootLoader.exe 需要更新,开始下载...", ts); request_download(&sender, "BootLoader.exe", 0); } update_performed_clone2.store(true, std::sync::atomic::Ordering::SeqCst); } else { log_print!("{} [版本] BootLoader.exe 版本已是最新", ts); // 推进到阶段2 let next_phase = UpdatePhase::Updater; *ctx_clone.current_phase.lock().unwrap() = next_phase; log_print!("{} [阶段2] 检查 Updater...", ts); let msg_str = format!( r#"{{"Type":"GetFileVer","Data":{{"DeviceNumber":"{}","file_list":["Updater.exe"]}}}}"#, device_number ); log_print!("{} 发送消息:{}", ts, msg_str); sender.send(msg_str); } } else { // BootLoader 不在响应中,推进到阶段2 *ctx_clone.current_phase.lock().unwrap() = UpdatePhase::Updater; log_print!("{} [阶段2] 检查 Updater...", ts); let msg_str = format!( r#"{{"Type":"GetFileVer","Data":{{"DeviceNumber":"{}","file_list":["Updater.exe"]}}}}"#, device_number ); log_print!("{} 发送消息:{}", ts, msg_str); sender.send(msg_str); } } UpdatePhase::Updater => { // ========== 阶段2:处理 Updater ========== if let Some(server_ver) = file_versions.get("Updater.exe") { let server_version = server_ver.as_str().unwrap_or("0.0.0"); let local_version = get_local_file_version("Updater.exe"); let tmp_size = get_tmp_file_size("Updater.exe"); // 保存 Updater.exe 的服务端版本号 *ctx_clone.server_updater_version.lock().unwrap() = Some(server_version.to_string()); log_print!("{} [版本] Updater.exe: 服务端={}, 本地={}", ts, server_version, local_version); let need_update = local_version == "0.0.0" || version_less_than(&local_version, server_version); if need_update { if tmp_size > 0 { log_print!("{} [续传] Updater.exe 发现未完成下载,请求 hash 校验...", ts); request_file_md5(&sender, "Updater.exe", tmp_size); } else { log_print!("{} [升级] Updater.exe 需要更新,开始下载...", ts); request_download(&sender, "Updater.exe", 0); } update_performed_clone2.store(true, std::sync::atomic::Ordering::SeqCst); } else { log_print!("{} [版本] Updater.exe 版本已是最新", ts); // 检查是否有旧的 Updater.new.exe if has_updater_new_exe() { let updater_new_path = get_updater_data_dir().join("Updater.new.exe"); let _ = fs::remove_file(&updater_new_path); log_print!("{} [清理] 删除旧的 Updater.new.exe", ts); } // 推进到阶段3 let candidates = ctx_clone.candidates.lock().unwrap().clone(); if candidates.is_empty() { // 没有候选应用,所有阶段完成 *ctx_clone.current_phase.lock().unwrap() = UpdatePhase::Complete; log_print!("{} [阶段3] 没有候选应用,所有阶段完成", ts); update_check_done_clone2.store(true, std::sync::atomic::Ordering::SeqCst); // 发送断连信号 if let Some(tx) = shutdown_tx_arc_clone.lock().unwrap().take() { let _ = tx.send(()); } } else { *ctx_clone.current_phase.lock().unwrap() = UpdatePhase::Apps; // 打印候选应用及其预存版本 let app_names: Vec = candidates.iter().map(|(n, v, _)| format!("{}(v{})", n, v)).collect(); log_print!("{} [阶段3] 检查应用: {:?}", ts, app_names); // 构建应用版本查询 let mut file_list = Vec::new(); for (app_name, _, _) in &candidates { file_list.push(format!("{}\\{}.exe", app_name, app_name)); } let file_list_json = serde_json::to_string(&file_list).unwrap_or_else(|_| "[]".to_string()); let msg_str = format!( r#"{{"Type":"GetFileVer","Data":{{"DeviceNumber":"{}","file_list":{}}}}}"#, device_number, file_list_json ); log_print!("{} 发送消息:{}", ts, msg_str); sender.send(msg_str); } } } else { // Updater 不在响应中,推进到阶段3 let candidates = ctx_clone.candidates.lock().unwrap().clone(); if candidates.is_empty() { *ctx_clone.current_phase.lock().unwrap() = UpdatePhase::Complete; println!("{} [阶段3] 没有候选应用,所有阶段完成", ts); update_check_done_clone2.store(true, std::sync::atomic::Ordering::SeqCst); if let Some(tx) = shutdown_tx_arc_clone.lock().unwrap().take() { let _ = tx.send(()); } } else { *ctx_clone.current_phase.lock().unwrap() = UpdatePhase::Apps; // 打印候选应用及其预存版本 let app_names: Vec = candidates.iter().map(|(n, v, _)| format!("{}(v{})", n, v)).collect(); println!("{} [阶段3] 检查应用: {:?}", ts, app_names); let mut file_list = Vec::new(); for (app_name, _, _) in &candidates { file_list.push(format!("{}\\{}.exe", app_name, app_name)); } let file_list_json = serde_json::to_string(&file_list).unwrap_or_else(|_| "[]".to_string()); let msg_str = format!( r#"{{"Type":"GetFileVer","Data":{{"DeviceNumber":"{}","file_list":{}}}}}"#, device_number, file_list_json ); log_print!("{} 发送消息:{}", ts, msg_str); sender.send(msg_str); } } } UpdatePhase::Apps => { // ========== 阶段3:处理应用版本,决定是否需要升级 ========== // 使用预存的版本(在候选发现阶段已从运行中进程读取) let candidates = ctx_clone.candidates.lock().unwrap().clone(); let mut apps_to_update = Vec::new(); let mut upgraded_apps = Vec::new(); // (app_name, current_ver, latest_ver, exe_path) for (app_name, local_version, exe_path) in candidates { let exe_name = format!("{}\\{}.exe", app_name, app_name); if let Some(server_ver) = file_versions.get(&exe_name) { let server_version = server_ver.as_str().unwrap_or(""); // 服务端版本为空,说明没有该应用,跳过 if server_version.is_empty() { log_print!("{} [应用] {} 服务端版本为空,跳过升级", ts, app_name); continue; } log_print!("{} [应用] {}: 服务端={}, 本地={}", ts, app_name, server_version, local_version); if local_version == "0.0.0" || version_less_than(&local_version, server_version) { apps_to_update.push(app_name.clone()); // 记录需要升级的应用信息(用于后续发送通知) upgraded_apps.push((app_name.clone(), local_version.clone(), server_version.to_string(), exe_path.clone())); } } } // 保存升级应用列表 *ctx_clone.upgraded_apps.lock().unwrap() = upgraded_apps; if apps_to_update.is_empty() { // 没有需要更新的应用,所有阶段完成 log_print!("{} [阶段完成] 所有应用已是最新版本,等待下次检查...", ts); *ctx_clone.current_phase.lock().unwrap() = UpdatePhase::Complete; update_check_done_clone2.store(true, std::sync::atomic::Ordering::SeqCst); if let Some(tx) = shutdown_tx_arc_clone.lock().unwrap().take() { let _ = tx.send(()); } } else { // 存储待处理的应用列表,发送 GetAllFile 请求 *ctx_clone.pending_allfile_apps.lock().unwrap() = apps_to_update.clone(); *ctx_clone.current_phase.lock().unwrap() = UpdatePhase::AppsWaitAllFile; log_print!("{} [阶段3.5] 等待 GetAllFile 响应 ({} 个应用)", ts, apps_to_update.len()); for app_name in &apps_to_update { let tmp_files = scan_local_tmp_files(app_name); send_get_all_file(&sender, app_name, &tmp_files); } } } UpdatePhase::Complete => { // 已在 Complete 阶段,不再处理 } } } } // 处理 Md5 响应 if msg_type == "Md5" { if let Some(md5_data) = data.get("Data").and_then(|v| v.as_object()) { let filename = md5_data.get("filename").and_then(|v| v.as_str()).unwrap_or(""); if filename.is_empty() { return; } // ========== BootLoader/Updater 的 Md5 处理 ========== let server_md5 = md5_data.get("md5").and_then(|v| v.as_str()).unwrap_or(""); let bytes = md5_data.get("bytes").and_then(|v| v.as_u64()).unwrap_or(0); // Updater.exe 的临时文件名是 Updater.new.exe.tmp let tmp_filename = if filename == "Updater.exe" { "Updater.new.exe.tmp".to_string() } else { format!("{}.tmp", filename) }; // 计算本地 tmp 文件前 bytes 字节的 md5 let local_md5 = compute_file_hash(&tmp_filename, bytes, debug_msg); if debug_msg { let ts = chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f"); if let Some(ref lm) = local_md5 { println!("{} [续传] {} md5对比: 本地={}, 服务端={}", ts, filename, lm, server_md5); } else { println!("{} [续传] {} 无法计算本地md5,重新下载", ts, filename); } } // 比较 md5 if local_md5.as_deref() == Some(server_md5) { // md5 相同,从 offset bytes 处续传 let ts = chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f"); log_print!("{} [续传] {} md5匹配,从 {} 字节处续传", ts, filename, bytes); request_download(&sender, filename, bytes); } else { // md5 不同,删除临时文件,重新下载 let ts = chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f"); log_print!("{} [续传] {} md5不匹配,重新下载", ts, filename); let tmp_path = get_updater_data_dir().join(&tmp_filename); let _ = fs::remove_file(&tmp_path); request_download(&sender, filename, 0); } } } // 处理 AllFile 响应 if msg_type == "AllFile" { if let Some(allfile_data) = data.get("Data").and_then(|v| v.as_object()) { let app_name = allfile_data.get("app_name").and_then(|v| v.as_str()).unwrap_or(""); if app_name.is_empty() { return; } // 检查是否在等待此应用的响应 let mut pending_apps = ctx_clone.pending_allfile_apps.lock().unwrap(); if !pending_apps.contains(&app_name.to_string()) { return; } pending_apps.retain(|a| a != app_name); let ts = chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f"); log_print!("{} [AllFile] 处理 {} 的文件列表", ts, app_name); // 获取服务端文件列表 let server_files: Vec<(String, u64, String)> = allfile_data .get("files") .and_then(|v| v.as_array()) .map(|arr| { arr.iter().filter_map(|f| { let filename = f.get("filename").and_then(|v| v.as_str())?; let size = f.get("size").and_then(|v| v.as_u64()).unwrap_or(0); let md5 = f.get("md5").and_then(|v| v.as_str()).unwrap_or(""); Some((filename.to_string(), size, md5.to_string())) }).collect() }) .unwrap_or_default(); // 获取服务端临时文件列表 let _server_tmp_files: Vec<(String, Option, Option)> = allfile_data .get("tmp_files") .and_then(|v| v.as_array()) .map(|arr| { arr.iter().filter_map(|f| { let filename = f.get("filename").and_then(|v| v.as_str())?; let size = f.get("size").and_then(|v| v.as_u64()); let md5 = f.get("md5").and_then(|v| v.as_str()); Some(( filename.to_string(), size, md5.map(|s| s.to_string()), )) }).collect() }) .unwrap_or_default(); // 升级目录 let upgrade_base = get_updater_data_dir() .join("Updater") .join("UpGrade") .join(app_name); // 1. 扫描本地文件(分别记录正式文件和 tmp 文件) let mut local_files: std::collections::HashSet = std::collections::HashSet::new(); #[allow(dead_code)] let mut local_tmp_files: std::collections::HashSet = std::collections::HashSet::new(); if upgrade_base.exists() { if let Ok(entries) = fs::read_dir(&upgrade_base) { for entry in entries.flatten() { let path = entry.path(); if path.is_file() { if let Some(name) = path.file_name().and_then(|n| n.to_str()) { if name.ends_with(".tmp") { // tmp 文件:去掉 .tmp 后缀存储 let official_name = name.trim_end_matches(".tmp"); local_tmp_files.insert(official_name.to_string()); } else { local_files.insert(name.to_string()); } } } } } } // 2. 以服务端为准,删除本地多余的正式文件和临时文件 let server_official_names: std::collections::HashSet = server_files .iter() .map(|(name, _, _)| name.clone()) .collect(); for local_file in &local_files { if !server_official_names.contains(local_file) { // 删除本地正式文件 let file_path = upgrade_base.join(local_file); if file_path.exists() { log_print!("{} [AllFile] 删除多余正式文件: {}", ts, local_file); let _ = fs::remove_file(&file_path); } // 同时删除对应的临时文件(如有) let tmp_file_with_ext = format!("{}.tmp", local_file); let tmp_path = upgrade_base.join(&tmp_file_with_ext); if tmp_path.exists() { log_print!("{} [AllFile] 删除多余临时文件: {}", ts, tmp_file_with_ext); let _ = fs::remove_file(&tmp_path); } } } // 3. 处理服务端文件列表:比较 MD5,将需要下载的文件加入队列 let mut downloads_to_queue: Vec<(String, String, u64, String)> = Vec::new(); for (filename, _, server_md5) in &server_files { let file_path = upgrade_base.join(filename); let tmp_filename = format!("{}.tmp", filename); let tmp_path = upgrade_base.join(&tmp_filename); let has_tmp = tmp_path.exists(); let has_official = file_path.exists(); let tmp_is_zero = has_tmp && tmp_path.metadata().map(|m| m.len() == 0).unwrap_or(false); if has_tmp { if tmp_is_zero { // 情况0:本地有 0 字节临时文件,直接从头下载 downloads_to_queue.push((app_name.to_string(), filename.to_string(), 0, server_md5.clone())); update_performed_clone2.store(true, std::sync::atomic::Ordering::SeqCst); } else { // 情况A:本地有临时文件(>0字节)→ 比较临时文件 let local_md5 = compute_file_md5(&tmp_path); if let Some(lm) = local_md5 { if &lm != server_md5 { log_print!("{} [AllFile] tmp {} MD5不一致 (本地={}, 服务端={}),清空", ts, tmp_filename, lm, server_md5); if let Ok(file) = fs::File::create(&tmp_path) { let _ = file.set_len(0); } downloads_to_queue.push((app_name.to_string(), filename.to_string(), 0, server_md5.clone())); update_performed_clone2.store(true, std::sync::atomic::Ordering::SeqCst); } else { log_print!("{} [AllFile] tmp {} MD5一致,续传", ts, tmp_filename); let file_size = fs::metadata(&tmp_path).map(|m| m.len()).unwrap_or(0); downloads_to_queue.push((app_name.to_string(), filename.to_string(), file_size, server_md5.clone())); } } } } else if has_official { // 情况B:本地没有临时文件但有正式文件 → 比较正式文件 let local_md5 = compute_file_md5(&file_path); if let Some(lm) = local_md5 { if &lm != server_md5 { log_print!("{} [AllFile] {} MD5不一致 (本地={}, 服务端={}),创建 tmp", ts, filename, lm, server_md5); if let Some(parent) = tmp_path.parent() { let _ = fs::create_dir_all(parent); } if let Ok(_) = File::create(&tmp_path) { log_print!("{} [AllFile] 创建空 tmp 文件: {}", ts, tmp_filename); downloads_to_queue.push((app_name.to_string(), filename.to_string(), 0, server_md5.clone())); update_performed_clone2.store(true, std::sync::atomic::Ordering::SeqCst); } } else { log_print!("{} [AllFile] {} MD5一致,无需更新", ts, filename); } } } else { // 情况C:本地连正式文件都没有 → 直接创建空的临时文件 log_print!("{} [AllFile] {} 不存在,创建 tmp", ts, filename); if let Some(parent) = tmp_path.parent() { let _ = fs::create_dir_all(parent); } if let Ok(_) = File::create(&tmp_path) { log_print!("{} [AllFile] 创建空 tmp 文件: {}", ts, tmp_filename); downloads_to_queue.push((app_name.to_string(), filename.to_string(), 0, server_md5.clone())); update_performed_clone2.store(true, std::sync::atomic::Ordering::SeqCst); } } } // 4. 将所有待下载文件加入队列,按顺序发送第一个 if !downloads_to_queue.is_empty() { let mut queue = ctx_clone.download_queue.lock().unwrap(); for item in downloads_to_queue { queue.push_back(item); } drop(queue); // 如果当前没有在下载,发第一个 let should_start = { let downloading = ctx_clone.is_downloading.lock().unwrap(); !*downloading }; if should_start { *ctx_clone.is_downloading.lock().unwrap() = true; send_next_download(&ctx_clone, &sender, &shutdown_tx_arc_clone); } } // 6. 检查是否所有应用都处理完成(AllFile 收完 且 队列清空) if pending_apps.is_empty() && ctx_clone.download_queue.lock().unwrap().is_empty() { log_print!("{} [AllFile] 所有应用文件同步完成,进入 Complete 阶段", ts); *ctx_clone.current_phase.lock().unwrap() = UpdatePhase::Complete; update_check_done_clone2.store(true, std::sync::atomic::Ordering::SeqCst); if let Some(tx) = shutdown_tx_arc_clone.lock().unwrap().take() { let _ = tx.send(()); } } } } // 处理文件块 if msg_type == "FileChunk" { if let Some(data_obj) = data.get("Data").and_then(|v| v.as_object()) { let filename = data_obj.get("filename").and_then(|v| v.as_str()).unwrap_or(""); let current_phase = *ctx_clone.current_phase.lock().unwrap(); // ========== 阶段3.5:处理应用 FileChunk ========== if current_phase == UpdatePhase::AppsWaitAllFile && !filename.is_empty() { let is_last = data_obj.get("is_last").and_then(|v| v.as_bool()).unwrap_or(false); let chunk_offset = data_obj.get("offset").and_then(|v| v.as_u64()).unwrap_or(0); let file_size = data_obj.get("file_size").and_then(|v| v.as_u64()).unwrap_or(0); let chunk_size = 4096u64; // 调试:打印队列状态 let queue_len = ctx_clone.download_queue.lock().unwrap().len(); let ts = chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f"); log_print!("{} [FileChunk] is_last={}, queue_len={}", ts, is_last, queue_len); let candidates = ctx_clone.candidates.lock().unwrap().clone(); for (app_name, _, _) in &candidates { let result = handle_app_file_chunk(&ctx_clone, app_name, data_obj, debug_msg); if let Some((_, relative_path)) = result { if is_last { // 文件下完,标记并请求下一个文件 if debug_msg { let ts = chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f"); println!("{} [应用] {} / {} 下载完成", ts, app_name, filename); } ctx_clone.app_completed_set.lock().unwrap().insert(format!("{}/{}", app_name, filename)); log_print!("{} [FileChunk] 调用 send_next_download, queue_len={}", ts, queue_len); send_next_download(&ctx_clone, &sender, &shutdown_tx_arc_clone); } else { // 非最后一块,立即请求下一块(用 relative_path,不带 app_name/ 前缀) let next_offset = chunk_offset + chunk_size; if next_offset < file_size { request_download_for_app(&sender, app_name, &relative_path, next_offset); } } } break; } } else { // ========== BootLoader/Updater 的 FileChunk 处理 ========== let completed = handle_file_chunk(&ctx_clone, data_obj, debug_msg); if let Some(ref completed_file) = completed { // BootLoader 下载完成:推进到阶段2 if completed_file == "BootLoader.exe" { let ts = chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f"); *ctx_clone.current_phase.lock().unwrap() = UpdatePhase::Updater; log_print!("{} [阶段2] 检查 Updater...", ts); let msg_str = format!( r#"{{"Type":"GetFileVer","Data":{{"DeviceNumber":"{}","file_list":["Updater.exe"]}}}}"#, device_number ); log_print!("{} 发送消息:{}", ts, msg_str); sender.send(msg_str); } // Updater 下载完成:启动 BootLoader if completed_file == "Updater.new.exe" { if !updater_downloaded_clone2.load(std::sync::atomic::Ordering::SeqCst) { updater_downloaded_clone2.store(true, std::sync::atomic::Ordering::SeqCst); if debug_msg { let ts = chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f"); println!("{} [升级] Updater.new.exe 下载完成(FileChunk),准备启动 BootLoader...", ts); } schedule_bootloader_launch(debug_msg, shutdown_tx_arc_clone.clone()); } } } else { // 非最后一块:主动请求下一块(BootLoader/Updater 流式下载需客户端驱动) let is_last = data_obj.get("is_last").and_then(|v| v.as_bool()).unwrap_or(false); let chunk_offset = data_obj.get("offset").and_then(|v| v.as_u64()).unwrap_or(0); let file_size = data_obj.get("file_size").and_then(|v| v.as_u64()).unwrap_or(0); let chunk_size = 4096u64; if !is_last { let next_offset = chunk_offset + chunk_size; if next_offset < file_size { request_download(&sender, filename, next_offset); } } } } } } // 处理下载完成(统一处理,避免 FileChunk 和 DownloadComplete 重复处理) if msg_type == "DownloadComplete" { if let Some(data_obj) = data.get("Data").and_then(|v| v.as_object()) { let filename = data_obj.get("filename").and_then(|v| v.as_str()).unwrap_or(""); if filename.is_empty() { return; } let current_phase = *ctx_clone.current_phase.lock().unwrap(); let ts = chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f"); // 调试:所有 DownloadComplete 都打印阶段信息 let phase_name = match current_phase { UpdatePhase::BootLoader => "BootLoader", UpdatePhase::Updater => "Updater", UpdatePhase::Apps => "Apps", UpdatePhase::AppsWaitAllFile => "AppsWaitAllFile", UpdatePhase::Complete => "Complete", }; log_print!("{} [DownloadComplete] 收到文件: {}, 当前阶段: {}", ts, filename, phase_name); // ========== 阶段3.5:处理应用 DownloadComplete ========== if current_phase == UpdatePhase::AppsWaitAllFile { let candidates = ctx_clone.candidates.lock().unwrap().clone(); let candidate_names: Vec<&str> = candidates.iter().map(|(n, _, _)| n.as_str()).collect(); log_print!("{} [DownloadComplete] 文件: {}, 候选应用: {:?}", ts, filename, candidate_names); for (app_name, _, _) in &candidates { // filename 可能含路径分隔符(如 "EasyTest/Audio.wav"),只取纯文件名部分 let tmp_filename = std::path::Path::new(filename) .file_name() .and_then(|n| n.to_str()) .unwrap_or(filename); let upgrade_dir = get_updater_data_dir() .join("Updater") .join("UpGrade") .join(app_name); let tmp_path = upgrade_dir.join(format!("{}.tmp", tmp_filename)); // rename 后的最终路径(与 handle_app_file_chunk 的 is_last 块保持一致) let final_path = upgrade_dir.join(tmp_filename); // tmp 或 final 文件存在才处理 log_print!("{} [DownloadComplete] 检查路径: tmp={:?}, final={:?}, tmp_exists={}, final_exists={}", ts, tmp_path, final_path, tmp_path.exists(), final_path.exists()); if !tmp_path.exists() && !final_path.exists() { continue; } let size = data_obj.get("size").and_then(|v| v.as_u64()).unwrap_or(0); handle_app_download_complete(&ctx_clone, app_name, filename, size, debug_msg); ctx_clone.app_completed_set.lock().unwrap().insert(format!("{}/{}", app_name, filename)); // 校验下载文件的 MD5 let expected_md5 = ctx_clone.current_download_md5.lock().unwrap().clone(); let ts = chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f"); if let Some(expected) = expected_md5 { let local_md5 = compute_file_md5(&final_path); if let Some(lm) = local_md5 { if &lm == &expected { log_print!("{} [应用] {} MD5校验通过 ({}), 继续下一个文件", ts, filename, expected); *ctx_clone.current_download_md5.lock().unwrap() = None; send_next_download(&ctx_clone, &sender, &shutdown_tx_arc_clone); } else { log_print!("{} [应用] {} MD5校验失败 (本地={}, 期望={}),重新下载", ts, filename, lm, expected); // 删除损坏文件,加入队列末尾重新下载 let _ = fs::remove_file(&final_path); let mut queue = ctx_clone.download_queue.lock().unwrap(); queue.push_back((app_name.to_string(), filename.to_string(), 0, expected)); *ctx_clone.current_download_md5.lock().unwrap() = None; drop(queue); send_next_download(&ctx_clone, &sender, &shutdown_tx_arc_clone); } } else { log_print!("{} [应用] {} 无法计算MD5,继续下一个文件", ts, filename); *ctx_clone.current_download_md5.lock().unwrap() = None; send_next_download(&ctx_clone, &sender, &shutdown_tx_arc_clone); } } else { // 没有期望MD5(理论上不应发生),直接继续 send_next_download(&ctx_clone, &sender, &shutdown_tx_arc_clone); } break; } } else { // ========== BootLoader/Updater 的 DownloadComplete 处理 ========== let completed = handle_download_complete(&ctx_clone, data_obj, debug_msg); if let Some(ref completed_file) = completed { // BootLoader.exe 下载完成,推进到阶段2检查 Updater 版本 if completed_file == "BootLoader.exe" { if !bootloader_downloaded_clone2.load(std::sync::atomic::Ordering::SeqCst) { bootloader_downloaded_clone2.store(true, std::sync::atomic::Ordering::SeqCst); if debug_msg { let ts = chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f"); log_print!("{} [升级] BootLoader.exe 下载完成,推进到阶段2检查 Updater...", ts); } // 推进到阶段2,由阶段2决定是否需要下载 Updater *ctx_clone.current_phase.lock().unwrap() = UpdatePhase::Updater; let msg = format!( r#"{{"Type":"GetFileVer","Data":{{"DeviceNumber":"{}","file_list":["Updater.exe"]}}}}"#, device_number ); let ts = chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f"); log_print!("{} 发送消息:{}", ts, msg); sender.send(msg); } } // Updater.new.exe 下载完成 else if completed_file == "Updater.new.exe" { if !updater_downloaded_clone2.load(std::sync::atomic::Ordering::SeqCst) { updater_downloaded_clone2.store(true, std::sync::atomic::Ordering::SeqCst); let ts = chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f"); log_print!("{} [升级] Updater.new.exe 下载完成(DownloadComplete),准备启动 BootLoader...", ts); schedule_bootloader_launch(debug_msg, shutdown_tx_arc.clone()); } } } } } } }); // 设置断开连接回调 let debug_disconnect = debug_mode; client.on_disconnected(move || { if debug_disconnect { let ts = chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f"); println!("{} [断开] 连接已断开", ts); } }); // 设置错误回调 client.on_error(|error| { log_eprintln!("[错误] WebSocket: {}", error); }); // 设置首次连接回调 let debug_first = debug_mode; let device_number_first = resolve_device_number(); client.on_first_connect(move |_url, _sender| { let device_number = device_number_first.clone(); Box::pin(async move { let ts = chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f"); if device_number.is_empty() || device_number == "UNKNOWN" { if debug_first { println!("{} [连接] 已连接,未配置设备号,仅维持心跳", ts); } } else { if debug_first { println!("{} [连接] 已连接,等待服务器欢迎消息...", ts); } } }) as Pin + Send + Sync>> }); // 设置重连回调 let debug_reconnect = debug_mode; client.on_reconnecting(move |attempt, url_arc| { Box::pin(async move { if debug_reconnect { let ts = chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f"); println!("{} [重连] 第 {} 次重连中...", ts, attempt); } let new_url = resolve_ws_url(); *url_arc.lock().await = new_url.clone(); if debug_reconnect { let ts = chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f"); println!("{} [重连] 配置已更新,服务器: {}", ts, new_url); } }) as Pin + Send + Sync>> }); // 设置重连成功回调 let debug_reconnected = debug_mode; let device_number_reconn = resolve_device_number(); client.on_reconnected(move |_url, _sender| { let device_number = device_number_reconn.clone(); Box::pin(async move { let ts = chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f"); if device_number.is_empty() || device_number == "UNKNOWN" { if debug_reconnected { println!("{} [重连] 已重连,未配置设备号", ts); } } else { if debug_reconnected { println!("{} [重连] 已重连,等待服务器欢迎消息...", ts); } } }) as Pin + Send + Sync>> }); // 设置心跳确认回调 let debug_heartbeat = debug_mode; client.on_heartbeat_ack(move |latency_ms, server_timestamp| { if debug_heartbeat { let ts = chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f"); println!("{} [心跳] pong响应延迟: {}ms, 服务端时间: {}", ts, latency_ms, server_timestamp); } }); // 连接(CubeLib 会自动处理重连) if debug_mode { println!("[启动] 开始连接..."); } // 等待断连信号或连接正常结束 tokio::select! { _ = &mut disconnect_rx => { // 收到断连信号,主动断开 if debug_mode { println!("[启动] 收到断连信号,主动断开..."); } client.disconnect().await; } _ = client.connect() => { // 连接正常结束(CubeLib 自动重连后的最终退出) if debug_mode { println!("[启动] 连接已结束"); } } } if debug_mode { println!("Updater 本次运行结束"); } // 在退出前,通知所有升级的应用(发送升级确认消息) let upgraded_apps = ctx.upgraded_apps.lock().unwrap().clone(); drop(ctx); // 释放 ctx 引用 if !upgraded_apps.is_empty() { notify_all_app_upgrades(&upgraded_apps, debug_mode); } // 返回本次运行是否更新了 Updater.exe 自身(只有 Updater 自身更新了才需要退出重启) updater_downloaded.load(std::sync::atomic::Ordering::SeqCst) } #[tokio::main] async fn main() { // 检查是否已有 Updater 进程在运行 if is_process_running("Updater.exe") { return; } // 加载 Updater 自身配置(debug_mode) let config = load_updater_config(); // 非 debug 模式下释放控制台,后台静默运行 if !config.debug_mode { #[cfg(windows)] { use windows::Win32::System::Console; use windows::Win32::Foundation::HWND; unsafe { let console = Console::GetConsoleWindow(); if console != HWND::default() { let _ = Console::FreeConsole(); } } } } // 启动命名管道服务端(长生命周期 async task) let (pipe_tx, mut pipe_rx) = tokio::sync::mpsc::channel::(64); let _pipe_server_handle = start_named_pipe_server(pipe_tx, config.debug_mode); // 主循环:常驻运行,定期检查更新 loop { // 运行一次 Updater(连接、检查、下载) let updater_updated = run_updater(config.debug_mode).await; if updater_updated { if config.debug_mode { println!("Updater 自身已更新,即将退出(等待 BootLoader 重启)"); } std::process::exit(0); } // 无更新 → 随机等待 5-10 分钟后再次检查 let wait_seconds = { let mut rng = rand::thread_rng(); let minutes: f64 = rng.gen_range(5.0..=10.0); (minutes * 60.0) as u64 }; // 打印等待信息(log_print! 会同时写入控制台和日志文件) let wait_mins = wait_seconds as f64 / 60.0; log_print!("Updater 本次检查完成,无更新,等待 {:.1} 分钟后再次检查...", wait_mins); // 分段等待(每 30 秒为一个周期),期间可响应管道消息 let mut remaining = wait_seconds; let interval_secs = 30; while remaining > 0 { let chunk = std::cmp::min(remaining, interval_secs); let sleep = tokio::time::sleep(std::time::Duration::from_secs(chunk)); tokio::pin!(sleep); tokio::select! { _ = &mut sleep => { remaining -= chunk; } // 收到管道消息(由 EasyTest 等客户端发送) msg = pipe_rx.recv() => { if let Some(m) = msg { if config.debug_mode { println!("[Pipe] 主循环收到消息:{}", m); } // quit 命令 → 退出 if m == "quit" { if config.debug_mode { println!("[Pipe] 收到 quit 命令,Updater 退出"); } std::process::exit(0); } } } } } } }