逐个文件地下载

This commit is contained in:
zqm
2026-04-10 10:22:58 +08:00
parent 9c10e08f90
commit acb5760f38

View File

@@ -98,10 +98,16 @@ struct UpdateContext {
server_updater_version: Mutex<Option<String>>,
/// 当前更新阶段
current_phase: Mutex<UpdatePhase>,
/// 候选应用列表阶段3使用
candidates: Mutex<Vec<String>>,
/// 候选应用列表 (app_name, local_version)阶段3使用
candidates: Mutex<Vec<(String, String)>>,
/// 等待 GetAllFile 响应的应用列表阶段3.5使用)
pending_allfile_apps: Mutex<Vec<String>>,
/// 待下载队列 (app_name, filename, offset, expected_md5),顺序下载
download_queue: Mutex<std::collections::VecDeque<(String, String, u64, String)>>,
/// 是否正在下载中(队列中有待处理项)
is_downloading: Mutex<bool>,
/// 当前正在下载的文件的期望 MD5DownloadComplete 校验用)
current_download_md5: Mutex<Option<String>>,
}
impl Default for UpdateContext {
@@ -114,6 +120,9 @@ impl Default for UpdateContext {
current_phase: Mutex::new(UpdatePhase::BootLoader),
candidates: Mutex::new(Vec::new()),
pending_allfile_apps: Mutex::new(Vec::new()),
download_queue: Mutex::new(std::collections::VecDeque::new()),
is_downloading: Mutex::new(false),
current_download_md5: Mutex::new(None),
}
}
}
@@ -430,10 +439,17 @@ fn compute_file_hash(filename: &str, bytes: u64, _debug: bool) -> Option<String>
/// 计算文件的完整 MD5 hash
fn compute_file_md5(file_path: &PathBuf) -> Option<String> {
if !file_path.exists() {
log_print!("[MD5] 文件不存在: {:?}", file_path);
return None;
}
let file_data = fs::read(file_path).ok()?;
let file_data = match fs::read(file_path) {
Ok(data) => data,
Err(e) => {
log_print!("[MD5] 读取文件失败: {:?}, error: {}", file_path, e);
return None;
}
};
let hash = md5::compute(&file_data);
Some(format!("{:x}", hash))
}
@@ -749,54 +765,7 @@ fn get_app_tmp_file_size(app_name: &str, relative_path: &str) -> u64 {
tmp_path.metadata().map(|m| m.len()).unwrap_or(0)
}
/// 获取其他应用的本地版本号(通过 PowerShell 读取 PE 版本)
fn get_local_app_version(app_name: &str, relative_path: &str) -> String {
// 应用主 exe 一般在相对路径的顶级,如 app.exe 或 bin/app.exe
// 取第一段目录名或文件名作为 exe 查找起点
let file_path = get_updater_data_dir()
.join(app_name)
.join(relative_path);
if !file_path.exists() {
return "0.0.0".to_string();
}
#[cfg(windows)]
{
let path_str = file_path.to_string_lossy().to_string();
let ps_script = format!(
"(Get-Item -LiteralPath '{}' -ErrorAction SilentlyContinue).VersionInfo.FileVersion",
path_str.replace("'", "''")
);
let output = Command::new("powershell")
.args(["-NoProfile", "-NonInteractive", "-Command", &ps_script])
.output();
if let Ok(output) = output {
let stdout = String::from_utf8_lossy(&output.stdout);
let version = stdout.trim();
if !version.is_empty() && version != "0" && !version.contains("没有版本") {
return version.to_string();
}
}
if let Ok(metadata) = fs::metadata(&file_path) {
use std::time::UNIX_EPOCH;
if let Ok(modified) = metadata.modified() {
let duration = modified.duration_since(UNIX_EPOCH).unwrap_or_default();
return format!("{}.{}", duration.as_secs(), duration.subsec_nanos() / 1_000_000);
}
}
"0.0.0".to_string()
}
#[cfg(not(windows))]
{
"0.0.0".to_string()
}
}
/// 向服务器查询指定文件的 md5用于断点续传校验
#[allow(dead_code)]
@@ -835,6 +804,19 @@ fn request_download_for_app(
sender.send(msg_str);
}
/// 从待下载队列取出一个文件发送(顺序下载)
fn send_next_download(ctx: &Arc<UpdateContext>, sender: &cube_lib::websocket::MessageSender) {
let mut queue = ctx.download_queue.lock().unwrap();
if let Some((app_name, filename, offset, expected_md5)) = queue.pop_front() {
// 记录当前下载的期望 MD5供 DownloadComplete 校验
*ctx.current_download_md5.lock().unwrap() = Some(expected_md5);
request_download_for_app(sender, &app_name, &filename, offset);
} else {
*ctx.is_downloading.lock().unwrap() = false;
}
}
/// 处理其他应用的文件块(写入 AppData/{app_name}/{relative_path}
fn handle_app_file_chunk(
ctx: &Arc<UpdateContext>,
@@ -855,8 +837,12 @@ fn handle_app_file_chunk(
let key = format!("{}/{}", app_name, filename);
let mut app_map = ctx.app_download_map.lock().unwrap();
// 新文件或 offset=0重置状态
if !app_map.contains_key(&key) || offset == 0 {
// 获取当前正在下载的文件名(用于判断是否是新文件)
let current_filename = app_map.values().next().map(|s| s.filename.clone());
// 新文件(非续传):清空旧 entry等待 DownloadComplete 处理旧文件
// 续传(同一文件 + 非零 offset追加数据不清 entry
if current_filename.as_ref() != Some(&filename.to_string()) && (!app_map.contains_key(&key) || offset == 0) {
if let Some(f) = app_map.get_mut(&key) {
if let Some(file) = f.file.take() {
drop(file);
@@ -867,16 +853,22 @@ fn handle_app_file_chunk(
}
app_map.remove(&key);
// filename 可能含路径分隔符,取纯文件名部分
let tmp_filename = std::path::Path::new(filename)
.file_name()
.and_then(|n| n.to_str())
.unwrap_or(filename);
let app_data_dir = get_updater_data_dir().join(app_name);
let _ = fs::create_dir_all(&app_data_dir);
let final_path = app_data_dir.join(filename);
let final_path = app_data_dir.join(tmp_filename);
let _ = fs::create_dir_all(final_path.parent().unwrap());
let temp_path = get_updater_data_dir()
.join("Updater")
.join("UpGrade")
.join(app_name)
.join(format!("{}.tmp", filename));
.join(format!("{}.tmp", tmp_filename));
match File::create(&temp_path) {
Ok(file) => {
@@ -934,8 +926,15 @@ fn handle_app_file_chunk(
let final_offset = app_map.get(&key).map(|s| s.offset).unwrap_or(0);
if let Some(ref temp) = temp_path {
let app_data_dir = get_updater_data_dir().join(app_name);
let final_path = app_data_dir.join(filename);
// filename 来自服务端消息,可能含路径前缀(如 "sticker/app_config.json"
// 只取纯文件名,与 DownloadComplete 处理器保持路径一致
let final_filename = std::path::Path::new(filename)
.file_name()
.and_then(|n| n.to_str())
.unwrap_or(filename);
// final_path 与 tmp 文件同一目录(升级目录),不是 app_data_dir
let upgrade_dir = temp.parent().unwrap();
let final_path = upgrade_dir.join(final_filename);
let _ = fs::create_dir_all(final_path.parent().unwrap());
if let Err(e) = fs::rename(temp, &final_path) {
@@ -976,15 +975,22 @@ fn handle_app_download_complete(
if let Some(ref temp) = temp_path {
let app_data_dir = get_updater_data_dir().join(app_name);
let final_path = app_data_dir.join(filename);
// filename 可能含路径分隔符,取纯文件名部分以避免路径重复
let tmp_filename = std::path::Path::new(filename)
.file_name()
.and_then(|n| n.to_str())
.unwrap_or(filename);
let final_path = app_data_dir.join(tmp_filename);
let _ = fs::create_dir_all(final_path.parent().unwrap());
if let Err(e) = fs::rename(temp, &final_path) {
if debug {
eprintln!("[应用] 重命名失败 {}: {}", filename, e);
}
let ts = chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f");
log_print!("{} [应用] 重命名失败 {} -> {:?}: {},改用 copy", ts, filename, final_path, e);
let _ = fs::copy(temp, &final_path);
let _ = fs::remove_file(temp);
} else {
let ts = chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f");
log_print!("{} [应用] 重命名成功 {} -> {:?}", ts, filename, final_path);
}
if debug {
@@ -1120,8 +1126,41 @@ fn is_process_running_ex(process_name: &str, exclude_pid: Option<u32>) -> bool {
count > 0
}
/// 获取 AppData 目录下所有候选应用(排除 Updater返回应用名称列表
fn get_app_candidates(debug: bool) -> Vec<String> {
/// 从运行中进程获取版本号(通过进程的可执行文件路径
#[cfg(windows)]
fn get_version_from_process(app_name: &str) -> String {
let ps_script = format!(
"$p = Get-Process -Name '{}' -ErrorAction SilentlyContinue | Select-Object -First 1; \
if ($p -and $p.Path) {{ (Get-Item $p.Path -ErrorAction SilentlyContinue).VersionInfo.FileVersion }} \
elseif ($p -and $p.Id) {{ \
$wmi = Get-CimInstance Win32_Process -Filter \"ProcessId=$($p.Id)\" -ErrorAction SilentlyContinue; \
if ($wmi.ExecutablePath) {{ (Get-Item $wmi.ExecutablePath -ErrorAction SilentlyContinue).VersionInfo.FileVersion }} \
}}",
app_name.replace("'", "''")
);
let output = Command::new("powershell")
.args(["-NoProfile", "-NonInteractive", "-Command", &ps_script])
.output();
if let Ok(output) = output {
let stdout = String::from_utf8_lossy(&output.stdout);
let version = stdout.trim();
if !version.is_empty() && version != "0" && !version.contains("没有文件") {
return version.to_string();
}
}
"0.0.0".to_string()
}
#[cfg(not(windows))]
fn get_version_from_process(_app_name: &str) -> String {
"0.0.0".to_string()
}
/// 获取 AppData 目录下所有候选应用(排除 Updater返回 (app_name, local_version) 列表)
/// 版本号在进程运行期间直接从进程路径读取,保证版本与进程实际加载的一致
fn get_app_candidates(debug: bool) -> Vec<(String, String)> {
let appdata = get_updater_data_dir(); // X:\AppData\
if !appdata.exists() {
return Vec::new();
@@ -1138,10 +1177,11 @@ fn get_app_candidates(debug: bool) -> Vec<String> {
// 检查该目录是否对应一个运行中的进程
let exe_name = format!("{}.exe", name);
if is_process_running(&exe_name) {
let local_version = get_version_from_process(name);
if debug {
println!("[应用] 候选应用: {} (进程运行中)", name);
println!("[应用] 候选应用: {} v{} (进程运行中)", name, local_version);
}
candidates.push(name.to_string());
candidates.push((name.to_string(), local_version));
} else if debug {
println!("[应用] 跳过 {} (进程未运行)", name);
}
@@ -1509,12 +1549,14 @@ async fn run_updater(debug_mode: bool) -> bool {
}
} else {
*ctx_clone.current_phase.lock().unwrap() = UpdatePhase::Apps;
log_print!("{} [阶段3] 检查应用: {:?}", ts, candidates);
// 打印候选应用及其预存版本
let app_names: Vec<String> = candidates.iter().map(|(n, v)| format!("{}(v{})", n, v)).collect();
log_print!("{} [阶段3] 检查应用: {:?}", ts, app_names);
// 构建应用版本查询
let mut file_list = Vec::new();
for app in &candidates {
file_list.push(format!("{}\\{}.exe", app, app));
for (app_name, _) in &candidates {
file_list.push(format!("{}\\{}.exe", app_name, app_name));
}
let file_list_json = serde_json::to_string(&file_list).unwrap_or_else(|_| "[]".to_string());
let msg_str = format!(
@@ -1537,11 +1579,13 @@ async fn run_updater(debug_mode: bool) -> bool {
}
} else {
*ctx_clone.current_phase.lock().unwrap() = UpdatePhase::Apps;
println!("{} [阶段3] 检查应用: {:?}", ts, candidates);
// 打印候选应用及其预存版本
let app_names: Vec<String> = candidates.iter().map(|(n, v)| format!("{}(v{})", n, v)).collect();
println!("{} [阶段3] 检查应用: {:?}", ts, app_names);
let mut file_list = Vec::new();
for app in &candidates {
file_list.push(format!("{}\\{}.exe", app, app));
for (app_name, _) in &candidates {
file_list.push(format!("{}\\{}.exe", app_name, app_name));
}
let file_list_json = serde_json::to_string(&file_list).unwrap_or_else(|_| "[]".to_string());
let msg_str = format!(
@@ -1556,10 +1600,11 @@ async fn run_updater(debug_mode: bool) -> bool {
UpdatePhase::Apps => {
// ========== 阶段3处理应用版本决定是否需要升级 ==========
// 使用预存的版本(在候选发现阶段已从运行中进程读取)
let candidates = ctx_clone.candidates.lock().unwrap().clone();
let mut apps_to_update = Vec::new();
for app_name in &candidates {
for (app_name, local_version) in candidates {
let exe_name = format!("{}\\{}.exe", app_name, app_name);
if let Some(server_ver) = file_versions.get(&exe_name) {
let server_version = server_ver.as_str().unwrap_or("");
@@ -1568,7 +1613,6 @@ async fn run_updater(debug_mode: bool) -> bool {
log_print!("{} [应用] {} 服务端版本为空,跳过升级", ts, app_name);
continue;
}
let local_version = get_local_app_version(app_name, &exe_name);
log_print!("{} [应用] {}: 服务端={}, 本地={}", ts, app_name, server_version, local_version);
@@ -1756,7 +1800,8 @@ async fn run_updater(debug_mode: bool) -> bool {
}
}
// 3. 处理服务端文件列表:比较 MD5决定如何处理
// 3. 处理服务端文件列表:比较 MD5将需要下载的文件加入队列
let mut downloads_to_queue: Vec<(String, String, u64, String)> = Vec::new();
for (filename, _, server_md5) in &server_files {
let file_path = upgrade_base.join(filename);
let tmp_filename = format!("{}.tmp", filename);
@@ -1767,9 +1812,8 @@ async fn run_updater(debug_mode: bool) -> bool {
if has_tmp {
if tmp_is_zero {
// 情况0本地有 0 字节临时文件0 字节未上报服务端,无法比对 MD5
// 直接从 offset 0 断点续传
request_download_for_app(&sender, &app_name, filename, 0);
// 情况0本地有 0 字节临时文件,直接从头下载
downloads_to_queue.push((app_name.to_string(), filename.to_string(), 0, server_md5.clone()));
update_performed_clone2.store(true, std::sync::atomic::Ordering::SeqCst);
} else {
// 情况A本地有临时文件>0字节→ 比较临时文件
@@ -1780,10 +1824,12 @@ async fn run_updater(debug_mode: bool) -> bool {
if let Ok(file) = fs::File::create(&tmp_path) {
let _ = file.set_len(0);
}
request_download_for_app(&sender, &app_name, filename, 0);
downloads_to_queue.push((app_name.to_string(), filename.to_string(), 0, server_md5.clone()));
update_performed_clone2.store(true, std::sync::atomic::Ordering::SeqCst);
} else {
log_print!("{} [AllFile] tmp {} MD5一致续传", ts, tmp_filename);
let file_size = fs::metadata(&tmp_path).map(|m| m.len()).unwrap_or(0);
downloads_to_queue.push((app_name.to_string(), filename.to_string(), file_size, server_md5.clone()));
}
}
}
@@ -1798,7 +1844,7 @@ async fn run_updater(debug_mode: bool) -> bool {
}
if let Ok(_) = File::create(&tmp_path) {
log_print!("{} [AllFile] 创建空 tmp 文件: {}", ts, tmp_filename);
request_download_for_app(&sender, &app_name, filename, 0);
downloads_to_queue.push((app_name.to_string(), filename.to_string(), 0, server_md5.clone()));
update_performed_clone2.store(true, std::sync::atomic::Ordering::SeqCst);
}
} else {
@@ -1813,28 +1859,33 @@ async fn run_updater(debug_mode: bool) -> bool {
}
if let Ok(_) = File::create(&tmp_path) {
log_print!("{} [AllFile] 创建空 tmp 文件: {}", ts, tmp_filename);
request_download_for_app(&sender, &app_name, filename, 0);
downloads_to_queue.push((app_name.to_string(), filename.to_string(), 0, server_md5.clone()));
update_performed_clone2.store(true, std::sync::atomic::Ordering::SeqCst);
}
}
}
// 4. 为有效的临时文件(>0字节发送续传请求
for (filename, _, _) in &server_files {
let tmp_filename = format!("{}.tmp", filename);
let tmp_path = upgrade_base.join(&tmp_filename);
if tmp_path.exists() {
let file_size = fs::metadata(&tmp_path).map(|m| m.len()).unwrap_or(0);
// 0 字节 tmp 已在步骤3中处理过跳过
if file_size == 0 {
continue;
}
request_download_for_app(&sender, &app_name, filename, file_size);
// 4. 将所有待下载文件加入队列,按顺序发送第一个
if !downloads_to_queue.is_empty() {
let mut queue = ctx_clone.download_queue.lock().unwrap();
for item in downloads_to_queue {
queue.push_back(item);
}
drop(queue);
// 如果当前没有在下载,发第一个
let should_start = {
let downloading = ctx_clone.is_downloading.lock().unwrap();
!*downloading
};
if should_start {
*ctx_clone.is_downloading.lock().unwrap() = true;
send_next_download(&ctx_clone, &sender);
}
}
// 6. 检查是否所有应用都处理完成
if pending_apps.is_empty() {
// 6. 检查是否所有应用都处理完成AllFile 收完 且 队列清空)
if pending_apps.is_empty() && ctx_clone.download_queue.lock().unwrap().is_empty() {
log_print!("{} [AllFile] 所有应用文件同步完成,进入 Complete 阶段", ts);
*ctx_clone.current_phase.lock().unwrap() = UpdatePhase::Complete;
update_check_done_clone2.store(true, std::sync::atomic::Ordering::SeqCst);
@@ -1855,12 +1906,17 @@ async fn run_updater(debug_mode: bool) -> bool {
// ========== 阶段3.5:处理应用 FileChunk ==========
if current_phase == UpdatePhase::AppsWaitAllFile && !filename.is_empty() {
let candidates = ctx_clone.candidates.lock().unwrap().clone();
for app_name in &candidates {
for (app_name, _) in &candidates {
// filename 可能含路径分隔符,取纯文件名部分来构造 tmp 路径
let tmp_filename = std::path::Path::new(filename)
.file_name()
.and_then(|n| n.to_str())
.unwrap_or(filename);
let upgrade_path = get_updater_data_dir()
.join("Updater")
.join("UpGrade")
.join(app_name)
.join(filename);
.join(format!("{}.tmp", tmp_filename));
if !upgrade_path.exists() {
continue;
}
@@ -1872,6 +1928,8 @@ async fn run_updater(debug_mode: bool) -> bool {
println!("{} [应用] {} / {} 下载完成", ts, app, file);
}
ctx_clone.app_completed_set.lock().unwrap().insert(format!("{}/{}", app, file));
// 一个文件下完,发下一个
send_next_download(&ctx_clone, &sender);
}
break;
}
@@ -1904,23 +1962,76 @@ async fn run_updater(debug_mode: bool) -> bool {
}
let current_phase = *ctx_clone.current_phase.lock().unwrap();
let ts = chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f");
// 调试:所有 DownloadComplete 都打印阶段信息
let phase_name = match current_phase {
UpdatePhase::BootLoader => "BootLoader",
UpdatePhase::Updater => "Updater",
UpdatePhase::Apps => "Apps",
UpdatePhase::AppsWaitAllFile => "AppsWaitAllFile",
UpdatePhase::Complete => "Complete",
};
log_print!("{} [DownloadComplete] 收到文件: {}, 当前阶段: {}", ts, filename, phase_name);
// ========== 阶段3.5:处理应用 DownloadComplete ==========
if current_phase == UpdatePhase::AppsWaitAllFile {
let candidates = ctx_clone.candidates.lock().unwrap().clone();
for app_name in &candidates {
let upgrade_path = get_updater_data_dir()
let candidate_names: Vec<&str> = candidates.iter().map(|(n, _)| n.as_str()).collect();
log_print!("{} [DownloadComplete] 文件: {}, 候选应用: {:?}", ts, filename, candidate_names);
for (app_name, _) in &candidates {
// filename 可能含路径分隔符(如 "EasyTest/Audio.wav"),只取纯文件名部分
let tmp_filename = std::path::Path::new(filename)
.file_name()
.and_then(|n| n.to_str())
.unwrap_or(filename);
let tmp_path = get_updater_data_dir()
.join("Updater")
.join("UpGrade")
.join(app_name)
.join(filename);
if !upgrade_path.exists() {
.join(format!("{}.tmp", tmp_filename));
// rename 后的最终路径(与 handle_app_download_complete 内部保持一致)
let final_path = get_updater_data_dir().join(app_name).join(tmp_filename);
// tmp 或 final 文件存在才处理
log_print!("{} [DownloadComplete] 检查路径: tmp={:?}, final={:?}, tmp_exists={}, final_exists={}",
ts, tmp_path, final_path, tmp_path.exists(), final_path.exists());
if !tmp_path.exists() && !final_path.exists() {
continue;
}
let size = data_obj.get("size").and_then(|v| v.as_u64()).unwrap_or(0);
handle_app_download_complete(&ctx_clone, app_name, filename, size, debug_msg);
ctx_clone.app_completed_set.lock().unwrap().insert(format!("{}/{}", app_name, filename));
// 校验下载文件的 MD5
let expected_md5 = ctx_clone.current_download_md5.lock().unwrap().clone();
let ts = chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.3f");
if let Some(expected) = expected_md5 {
let local_md5 = compute_file_md5(&final_path);
if let Some(lm) = local_md5 {
if &lm == &expected {
log_print!("{} [应用] {} MD5校验通过 ({}), 继续下一个文件", ts, filename, expected);
*ctx_clone.current_download_md5.lock().unwrap() = None;
send_next_download(&ctx_clone, &sender);
} else {
log_print!("{} [应用] {} MD5校验失败 (本地={}, 期望={}),重新下载", ts, filename, lm, expected);
// 删除损坏文件,加入队列末尾重新下载
let _ = fs::remove_file(&final_path);
let mut queue = ctx_clone.download_queue.lock().unwrap();
queue.push_back((app_name.to_string(), filename.to_string(), 0, expected));
*ctx_clone.current_download_md5.lock().unwrap() = None;
drop(queue);
send_next_download(&ctx_clone, &sender);
}
} else {
log_print!("{} [应用] {} 无法计算MD5继续下一个文件", ts, filename);
*ctx_clone.current_download_md5.lock().unwrap() = None;
send_next_download(&ctx_clone, &sender);
}
} else {
// 没有期望MD5理论上不应发生直接继续
send_next_download(&ctx_clone, &sender);
}
break;
}
} else {