From c4ae9701d5286089f909e59e008498b99e8f3c13 Mon Sep 17 00:00:00 2001 From: Peter Rekdal Khan-Sunde Date: Sun, 28 Jun 2026 10:17:43 +0200 Subject: [PATCH] Speed up sparse delta base materialization --- crates/surge-core/src/update/manager.rs | 271 +++++++++++- crates/surge-core/src/update/manager/apply.rs | 386 ++++-------------- .../src/update/manager/apply/base.rs | 256 ++++++++++++ .../src/update/manager/apply/delta.rs | 170 ++++++++ .../src/update/manager/apply/installed_app.rs | 120 ++++++ .../src/update/manager/progress_substep.rs | 28 +- 6 files changed, 898 insertions(+), 333 deletions(-) create mode 100644 crates/surge-core/src/update/manager/apply/base.rs create mode 100644 crates/surge-core/src/update/manager/apply/delta.rs create mode 100644 crates/surge-core/src/update/manager/apply/installed_app.rs diff --git a/crates/surge-core/src/update/manager.rs b/crates/surge-core/src/update/manager.rs index 429b17b..212f8e2 100644 --- a/crates/surge-core/src/update/manager.rs +++ b/crates/surge-core/src/update/manager.rs @@ -372,22 +372,17 @@ impl UpdateManager { let extract_dir = staging_dir.join("extracted"); tokio::fs::create_dir_all(&extract_dir).await?; - let extracted_final_dir = progress_emitter - .run_with_heartbeat( - 5, - update_phase::PACKAGE_APPLY_STARTED, - 60, - progress_substep::HEARTBEAT_INTERVAL, - materialize_update_payload( - self, - info, - &staging_dir, - &artifact_cache_dir, - &extract_dir, - progress.as_ref(), - ), - ) - .await?; + progress_emitter.emit_substep(5, update_phase::PACKAGE_APPLY_STARTED, 60); + let extracted_final_dir = materialize_update_payload( + self, + info, + &staging_dir, + &artifact_cache_dir, + &extract_dir, + progress.as_ref(), + &progress_emitter, + ) + .await?; progress_emitter.emit_completed_phase(update_phase::PACKAGE_APPLY_COMPLETED); // Phase 6: Finalize @@ -2335,6 +2330,15 @@ echo started > new-child-started write_app_scoped_release_index(&store_root, app_id, &index); + let active_app_dir = install_root.join("app"); + std::fs::create_dir_all(active_app_dir.join(".surge")).unwrap(); + std::fs::write(active_app_dir.join("payload.txt"), "v2 payload").unwrap(); + std::fs::write( + active_app_dir.join(crate::install::RUNTIME_MANIFEST_RELATIVE_PATH), + format!("id: {app_id}\nversion: 1.1.0\n"), + ) + .unwrap(); + let ctx = Arc::new(Context::new()); ctx.set_storage( StorageProvider::Filesystem, @@ -2348,11 +2352,40 @@ echo started > new-child-started let mut manager = UpdateManager::new(ctx, app_id, "1.1.0", "stable", install_root.to_str().unwrap()).unwrap(); let info = manager.check_for_updates().await.unwrap().unwrap(); assert_eq!(info.apply_strategy, ApplyStrategy::Delta); + + let observed = Arc::new(Mutex::new(Vec::::new())); + let observed_for_progress = Arc::clone(&observed); manager - .download_and_apply(&info, None::) + .download_and_apply( + &info, + Some(move |progress: ProgressInfo| { + observed_for_progress + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) + .push(progress); + }), + ) .await .unwrap(); + let observed = observed + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) + .clone(); + let apply_labels: Vec<&'static str> = observed + .iter() + .filter(|progress| progress.phase == 5 && !progress.phase_label.is_empty()) + .map(|progress| progress.phase_label) + .collect(); + assert!( + apply_labels.contains(&finalize_phase::RESTORING_CURRENT_PACKAGE_FROM_RELEASE_GRAPH), + "expected release-graph base restore label in {apply_labels:?}" + ); + assert!( + !apply_labels.contains(&finalize_phase::RESTORING_CURRENT_PACKAGE_FROM_INSTALLED_APP), + "byte-exact deltas should not prefer installed-app synthesis: {apply_labels:?}" + ); + let installed = std::fs::read_to_string(install_root.join("app").join("payload.txt")).unwrap(); assert_eq!(installed, "v3 payload"); } @@ -2760,11 +2793,40 @@ echo started > new-child-started assert_eq!(info.apply_releases.len(), 1); assert_eq!(info.apply_releases[0].version, "1.2.0"); assert_eq!(info.apply_releases[0].full_sha256, sha256_hex(&full_v3)); + + let observed = Arc::new(Mutex::new(Vec::::new())); + let observed_for_progress = Arc::clone(&observed); manager - .download_and_apply(&info, None::) + .download_and_apply( + &info, + Some(move |progress: ProgressInfo| { + observed_for_progress + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) + .push(progress); + }), + ) .await .unwrap(); + let observed = observed + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) + .clone(); + let apply_labels: Vec<&'static str> = observed + .iter() + .filter(|progress| progress.phase == 5 && !progress.phase_label.is_empty()) + .map(|progress| progress.phase_label) + .collect(); + assert!( + apply_labels.contains(&finalize_phase::RESTORING_CURRENT_PACKAGE_FROM_INSTALLED_APP), + "expected installed-app base restore label in {apply_labels:?}" + ); + assert!( + !apply_labels.contains(&finalize_phase::RESTORING_CURRENT_PACKAGE_FROM_RELEASE_GRAPH), + "sparse deltas should not rebuild the base through the release graph first: {apply_labels:?}" + ); + let installed = std::fs::read_to_string(install_root.join("app").join("payload.txt")).unwrap(); assert_eq!(installed, "v3 payload"); @@ -2772,6 +2834,179 @@ echo started > new-child-started assert!(!cached_current_full.exists()); } + #[tokio::test] + async fn test_download_and_apply_delta_retries_release_graph_when_installed_app_base_drifts() { + let tmp = tempfile::tempdir().unwrap(); + let store_root = tmp.path().join("store"); + let install_root = tmp.path().join("install"); + let app_id = "test-app"; + std::fs::create_dir_all(&store_root).unwrap(); + std::fs::create_dir_all(&install_root).unwrap(); + let app_store = app_scoped_store_root(&store_root, app_id); + + let rid = current_rid(); + let os = current_os_label_for_tests(); + + let source_v2 = tmp.path().join("source-v2"); + let source_v3 = tmp.path().join("source-v3"); + std::fs::create_dir_all(&source_v2).unwrap(); + std::fs::create_dir_all(&source_v3).unwrap(); + std::fs::write(source_v2.join("payload.txt"), "v2 payload").unwrap(); + std::fs::write(source_v2.join("stable.txt"), "stable payload").unwrap(); + std::fs::write(source_v3.join("payload.txt"), "v3 payload").unwrap(); + std::fs::write(source_v3.join("stable.txt"), "stable payload").unwrap(); + + let mut packer_v2 = ArchivePacker::new(3).unwrap(); + packer_v2.add_directory(&source_v2, "").unwrap(); + let full_v2 = packer_v2.finalize().unwrap(); + + let mut packer_v3 = ArchivePacker::new(3).unwrap(); + packer_v3.add_directory(&source_v3, "").unwrap(); + let full_v3 = packer_v3.finalize().unwrap(); + + let patch_v3 = build_sparse_file_patch(&full_v2, &full_v3, 3, 0, &ChunkedDiffOptions::default()).unwrap(); + let delta_v3 = zstd::encode_all(patch_v3.as_slice(), 3).unwrap(); + + let full_v2_name = format!("{app_id}-1.1.0-{rid}-full.tar.zst"); + let full_v3_name = format!("{app_id}-1.2.0-{rid}-full.tar.zst"); + let delta_v3_name = format!("{app_id}-1.2.0-{rid}-delta.tar.zst"); + + std::fs::write(app_store.join(&full_v2_name), &full_v2).unwrap(); + std::fs::write(app_store.join(&delta_v3_name), &delta_v3).unwrap(); + + let index = ReleaseIndex { + app_id: app_id.to_string(), + releases: vec![ + ReleaseEntry { + version: "1.1.0".to_string(), + channels: vec!["stable".to_string()], + os: os.clone(), + rid: rid.clone(), + is_genesis: true, + full_filename: full_v2_name, + full_size: full_v2.len() as i64, + full_sha256: sha256_hex(&full_v2), + full_compression_level: 0, + full_zstd_workers: 0, + deltas: Vec::new(), + preferred_delta_id: String::new(), + created_utc: chrono::Utc::now().to_rfc3339(), + release_notes: String::new(), + name: String::new(), + main_exe: app_id.to_string(), + install_directory: app_id.to_string(), + supervisor_id: String::new(), + icon: String::new(), + shortcuts: Vec::new(), + persistent_assets: Vec::new(), + installers: Vec::new(), + environment: std::collections::BTreeMap::new(), + }, + ReleaseEntry { + version: "1.2.0".to_string(), + channels: vec!["stable".to_string()], + os, + rid: rid.clone(), + is_genesis: false, + full_filename: full_v3_name, + full_size: full_v3.len() as i64, + full_sha256: sha256_hex(&full_v3), + full_compression_level: 0, + full_zstd_workers: 0, + deltas: vec![DeltaArtifact::sparse_file_ops_zstd( + "primary", + "1.1.0", + &delta_v3_name, + delta_v3.len() as i64, + &sha256_hex(&delta_v3), + )], + preferred_delta_id: "primary".to_string(), + created_utc: chrono::Utc::now().to_rfc3339(), + release_notes: String::new(), + name: String::new(), + main_exe: app_id.to_string(), + install_directory: app_id.to_string(), + supervisor_id: String::new(), + icon: String::new(), + shortcuts: Vec::new(), + persistent_assets: Vec::new(), + installers: Vec::new(), + environment: std::collections::BTreeMap::new(), + }, + ], + ..ReleaseIndex::default() + }; + + write_app_scoped_release_index(&store_root, app_id, &index); + + let active_app_dir = install_root.join("app"); + std::fs::create_dir_all(active_app_dir.join(".surge")).unwrap(); + std::fs::write(active_app_dir.join("payload.txt"), "v2 payload").unwrap(); + std::fs::write(active_app_dir.join("stable.txt"), "locally drifted stable payload").unwrap(); + std::fs::write( + active_app_dir.join(crate::install::RUNTIME_MANIFEST_RELATIVE_PATH), + format!("id: {app_id}\nversion: 1.1.0\n"), + ) + .unwrap(); + std::fs::write( + active_app_dir.join(crate::install::LEGACY_RUNTIME_MANIFEST_RELATIVE_PATH), + format!("id: {app_id}\nversion: 1.1.0\n"), + ) + .unwrap(); + + let ctx = Arc::new(Context::new()); + ctx.set_storage( + StorageProvider::Filesystem, + store_root.to_str().unwrap(), + "", + "", + "", + "", + ); + + let mut manager = UpdateManager::new(ctx, app_id, "1.1.0", "stable", install_root.to_str().unwrap()).unwrap(); + let info = manager.check_for_updates().await.unwrap().unwrap(); + assert_eq!(info.apply_strategy, ApplyStrategy::Delta); + + let observed = Arc::new(Mutex::new(Vec::::new())); + let observed_for_progress = Arc::clone(&observed); + manager + .download_and_apply( + &info, + Some(move |progress: ProgressInfo| { + observed_for_progress + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) + .push(progress); + }), + ) + .await + .unwrap(); + + let observed = observed + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) + .clone(); + let apply_labels: Vec<&'static str> = observed + .iter() + .filter(|progress| progress.phase == 5 && !progress.phase_label.is_empty()) + .map(|progress| progress.phase_label) + .collect(); + assert!( + apply_labels.contains(&finalize_phase::RESTORING_CURRENT_PACKAGE_FROM_INSTALLED_APP), + "expected installed-app base attempt in {apply_labels:?}" + ); + assert!( + apply_labels.contains(&finalize_phase::RESTORING_CURRENT_PACKAGE_FROM_RELEASE_GRAPH), + "expected release-graph retry after installed-app drift in {apply_labels:?}" + ); + + let installed = std::fs::read_to_string(install_root.join("app").join("payload.txt")).unwrap(); + assert_eq!(installed, "v3 payload"); + let stable = std::fs::read_to_string(install_root.join("app").join("stable.txt")).unwrap(); + assert_eq!(stable, "stable payload"); + } + #[tokio::test] async fn test_download_and_apply_delta_prefers_app_scoped_release_index_lineage() { let tmp = tempfile::tempdir().unwrap(); diff --git a/crates/surge-core/src/update/manager/apply.rs b/crates/surge-core/src/update/manager/apply.rs index 8f0eab2..51c5501 100644 --- a/crates/surge-core/src/update/manager/apply.rs +++ b/crates/surge-core/src/update/manager/apply.rs @@ -1,34 +1,31 @@ -use std::collections::BTreeSet; use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::Instant; -use tracing::{debug, warn}; +use tracing::warn; use crate::archive::extractor::extract_file_to_with_progress; -use crate::config::constants::RELEASES_FILE_COMPRESSED; -use crate::context::Context; -use crate::crypto::sha256::sha256_hex; use crate::error::{Result, SurgeError}; -use crate::pack::builder::build_canonical_archive_from_directory; -use crate::platform::detect::current_rid; -use crate::platform::fs::write_file_atomic; use crate::releases::artifact_cache::{cache_path_for_key, fetch_or_reuse_file}; -use crate::releases::delta::{ - DeltaApplyProgress, apply_delta_patch_with_progress, decode_delta_patch, is_supported_delta, -}; -use crate::releases::manifest::{ReleaseEntry, decompress_release_index}; -use crate::releases::restore::{ - RebuiltFullCachePolicy, RestoreOptions, find_release_for_version_rid, restore_full_archive_for_version_with_options, -}; -use crate::supervisor::stub::find_latest_app_dir; +use crate::releases::manifest::ReleaseEntry; use super::progress::{ - ProgressInfo, average_speed_bytes_per_sec, clamp_progress_percent, clamp_progress_percent_u64, emit_progress, - phase_total_percent, saturating_i64_from_u64, + ProgressInfo, average_speed_bytes_per_sec, clamp_progress_percent_u64, emit_progress, phase_total_percent, + saturating_i64_from_u64, }; +use super::progress_substep::{HEARTBEAT_INTERVAL, PhaseProgressEmitter, labels as apply_phase}; use super::{ApplyStrategy, UpdateInfo, UpdateManager}; +mod base; +mod delta; +mod installed_app; + +use self::base::{BaseFullArchiveSource, restore_base_full_archive, restore_release_graph_base_full_archive}; +use self::delta::apply_target_deltas; +pub(super) use self::installed_app::find_previous_app_dir; +#[cfg(test)] +pub(super) use self::installed_app::synthesize_current_full_archive_from_installed_app; + pub(super) async fn materialize_update_payload( manager: &UpdateManager, info: &UpdateInfo, @@ -36,12 +33,23 @@ pub(super) async fn materialize_update_payload( artifact_cache_dir: &Path, extract_dir: &Path, progress: Option<&Arc>, + progress_emitter: &PhaseProgressEmitter<'_, F>, ) -> Result where F: Fn(ProgressInfo) + Send + Sync, { if matches!(info.apply_strategy, ApplyStrategy::Delta) { - match materialize_delta_payload(manager, info, staging_dir, artifact_cache_dir, extract_dir, progress).await { + match materialize_delta_payload( + manager, + info, + staging_dir, + artifact_cache_dir, + extract_dir, + progress, + progress_emitter, + ) + .await + { Ok(path) => Ok(path), Err(SurgeError::Cancelled) => Err(SurgeError::Cancelled), Err(delta_error) => { @@ -212,11 +220,11 @@ async fn materialize_delta_payload( artifact_cache_dir: &Path, extract_dir: &Path, progress: Option<&Arc>, + progress_emitter: &PhaseProgressEmitter<'_, F>, ) -> Result where F: Fn(ProgressInfo) + Send + Sync, { - let apply_delta_started_at = Instant::now(); let apply_delta_total_items = i64::try_from(info.apply_releases.len()).unwrap_or(i64::MAX); let apply_delta_total_bytes = info .apply_releases @@ -235,125 +243,62 @@ where }, ); - let mut rebuilt_archive = restore_base_full_archive(manager, artifact_cache_dir).await?; - let mut apply_delta_items_done = 0i64; - let mut apply_delta_bytes_done = 0i64; - - for release in &info.apply_releases { - manager.ctx.check_cancelled()?; - - let Some(delta) = release.selected_delta() else { - return Err(SurgeError::Update(format!( - "Delta update path is missing delta filename for {}", - release.version - ))); - }; - - if !is_supported_delta(&delta) { - return Err(SurgeError::Update(format!( - "Delta {} for {} uses unsupported descriptor (algorithm='{}', format='{}', compression='{}')", - delta.filename, release.version, delta.algorithm, delta.patch_format, delta.compression - ))); - } - - let delta_path = staging_dir.join(&delta.filename); - let delta_compressed = tokio::fs::read(&delta_path).await?; - let patch = decode_delta_patch(delta_compressed.as_slice(), &delta) - .map_err(|e| SurgeError::Archive(format!("Failed to decompress delta {}: {e}", delta.filename)))?; - let progress_for_delta = progress.cloned(); - let completed_bytes_before_delta = apply_delta_bytes_done; - let completed_items_before_delta = apply_delta_items_done; - let current_delta_bytes = delta.size.max(0); - let delta_progress = move |delta_progress: DeltaApplyProgress| { - let bytes_done = completed_bytes_before_delta.saturating_add(scale_progress_units_i64( - current_delta_bytes, - delta_progress.units_done, - delta_progress.units_total, - )); - let phase_percent = if apply_delta_total_bytes > 0 { - clamp_progress_percent(bytes_done, apply_delta_total_bytes) - } else { - scale_apply_delta_items_percent( - completed_items_before_delta, - apply_delta_total_items, - delta_progress.units_done, - delta_progress.units_total, - ) - }; - emit_progress( - progress_for_delta.as_ref(), - ProgressInfo { - phase: 5, - phase_percent, - total_percent: phase_total_percent(60, 20, phase_percent), - bytes_done, - bytes_total: apply_delta_total_bytes, - items_done: completed_items_before_delta, - items_total: apply_delta_total_items, - speed_bytes_per_sec: average_speed_bytes_per_sec( - u64::try_from(bytes_done.max(0)).unwrap_or(u64::MAX), - apply_delta_started_at, - ), - ..ProgressInfo::default() - }, + let base_archive = restore_base_full_archive(manager, info, artifact_cache_dir, progress, progress_emitter).await?; + let rebuilt_archive = match apply_target_deltas( + manager, + info, + staging_dir, + base_archive.archive, + progress, + progress_emitter, + apply_delta_total_items, + apply_delta_total_bytes, + ) + .await + { + Ok(archive) => archive, + Err(delta_error) + if base_archive.source == BaseFullArchiveSource::InstalledApp + && should_retry_delta_with_release_graph(&delta_error) => + { + warn!( + error = %delta_error, + "Installed app base did not produce a valid delta result; retrying with release graph base" ); - }; - - rebuilt_archive = apply_delta_patch_with_progress(&rebuilt_archive, &patch, &delta, Some(&delta_progress)) - .map_err(|e| SurgeError::Update(format!("Failed to apply delta {}: {e}", delta.filename)))?; - - if !release.full_sha256.is_empty() { - let hash = sha256_hex(&rebuilt_archive); - if hash != release.full_sha256 { - return Err(SurgeError::Update(format!( - "SHA-256 mismatch for rebuilt full archive {}: expected {}, got {hash}", - release.version, release.full_sha256 - ))); - } + let release_graph_base = + restore_release_graph_base_full_archive(manager, artifact_cache_dir, progress, progress_emitter) + .await?; + apply_target_deltas( + manager, + info, + staging_dir, + release_graph_base.archive, + progress, + progress_emitter, + apply_delta_total_items, + apply_delta_total_bytes, + ) + .await + .map_err(|retry_error| { + SurgeError::Update(format!( + "Installed-app delta application failed: {delta_error}; release-graph retry failed: {retry_error}" + )) + })? } - - apply_delta_items_done = apply_delta_items_done.saturating_add(1); - apply_delta_bytes_done = apply_delta_bytes_done.saturating_add(delta.size.max(0)); - let phase_percent = clamp_progress_percent(apply_delta_items_done, apply_delta_total_items.max(1)); - emit_progress( - progress, - ProgressInfo { - phase: 5, - phase_percent, - total_percent: phase_total_percent(60, 20, phase_percent), - bytes_done: apply_delta_bytes_done, - bytes_total: apply_delta_total_bytes, - items_done: apply_delta_items_done, - items_total: apply_delta_total_items, - speed_bytes_per_sec: average_speed_bytes_per_sec( - u64::try_from(apply_delta_bytes_done.max(0)).unwrap_or(u64::MAX), - apply_delta_started_at, - ), - ..ProgressInfo::default() - }, - ); - } - - emit_progress( - progress, - ProgressInfo { - phase: 5, - phase_percent: 100, - total_percent: 80, - bytes_done: apply_delta_total_bytes, - bytes_total: apply_delta_total_bytes, - items_done: apply_delta_total_items, - items_total: apply_delta_total_items, - speed_bytes_per_sec: average_speed_bytes_per_sec( - u64::try_from(apply_delta_total_bytes.max(0)).unwrap_or(u64::MAX), - apply_delta_started_at, - ), - ..ProgressInfo::default() - }, - ); + Err(delta_error) => return Err(delta_error), + }; let rebuilt_archive_path = staging_dir.join("rebuilt-full.tar.zst"); - tokio::fs::write(&rebuilt_archive_path, &rebuilt_archive).await?; + progress_emitter + .run_with_heartbeat( + 5, + apply_phase::WRITING_REBUILT_PACKAGE, + 80, + HEARTBEAT_INTERVAL, + tokio::fs::write(&rebuilt_archive_path, &rebuilt_archive), + ) + .await?; + progress_emitter.emit_substep(5, apply_phase::EXTRACTING_REBUILT_PACKAGE, 80); extract_archive_with_progress(&rebuilt_archive_path, extract_dir, progress, 80, 90)?; let source = extract_dir.join(&info.latest_version); @@ -364,68 +309,11 @@ where } } -fn scale_progress_units_i64(total: i64, done: u64, units_total: u64) -> i64 { - if total <= 0 || units_total == 0 { - return 0; - } - let total = u64::try_from(total).unwrap_or(u64::MAX); - let scaled = total.saturating_mul(done.min(units_total)) / units_total; - i64::try_from(scaled).unwrap_or(i64::MAX) -} - -fn scale_apply_delta_items_percent(completed_items: i64, total_items: i64, done: u64, units_total: u64) -> i32 { - let total_items = u64::try_from(total_items.max(1)).unwrap_or(u64::MAX); - let completed_items = u64::try_from(completed_items.max(0)).unwrap_or(u64::MAX); - let units_total = units_total.max(1); - let done = done.min(units_total); - let scaled_done = completed_items.saturating_mul(units_total).saturating_add(done); - let scaled_total = total_items.saturating_mul(units_total); - clamp_progress_percent_u64(scaled_done, scaled_total) -} - -async fn restore_base_full_archive(manager: &UpdateManager, artifact_cache_dir: &Path) -> Result> { - let index = if let Some(cached) = &manager.cached_index { - cached.clone() - } else { - let data = manager.storage.get_object(RELEASES_FILE_COMPRESSED).await?; - decompress_release_index(&data)? - }; - let rid = current_rid(); - let current_release = find_release_for_version_rid(&index, &rid, &manager.current_version).ok_or_else(|| { - SurgeError::Update(format!( - "Current release {} ({rid}) was not found in the release index", - manager.current_version - )) - })?; - - match restore_full_archive_for_version_with_options( - manager.storage.as_ref(), - &index, - &rid, - &manager.current_version, - RestoreOptions { - cache_dir: Some(artifact_cache_dir), - progress: None, - rebuilt_full_cache_policy: RebuiltFullCachePolicy::TargetOnly, - }, +fn should_retry_delta_with_release_graph(error: &SurgeError) -> bool { + matches!( + error, + SurgeError::Update(_) | SurgeError::Integrity(_) | SurgeError::Diff(_) | SurgeError::Archive(_) ) - .await - { - Ok(archive) => Ok(archive), - Err(restore_err) => synthesize_current_full_archive_from_installed_app( - &manager.install_dir, - &manager.current_version, - current_release, - artifact_cache_dir, - &manager.ctx, - ) - .map_err(|fallback_err| { - SurgeError::Update(format!( - "Failed to restore base full archive for {}: {restore_err}; installed-app fallback failed: {fallback_err}", - manager.current_version - )) - }), - } } fn extract_archive_with_progress( @@ -489,109 +377,3 @@ where Ok(()) } - -pub(super) fn synthesize_current_full_archive_from_installed_app( - install_dir: &Path, - current_version: &str, - current_release: &ReleaseEntry, - artifact_cache_dir: &Path, - ctx: &Arc, -) -> Result> { - let app_dir = find_previous_app_dir(install_dir, current_version).ok_or_else(|| { - SurgeError::NotFound(format!( - "No active installed app directory was found for current version {current_version}" - )) - })?; - - let mut excluded_relative_paths = BTreeSet::new(); - excluded_relative_paths.insert(crate::install::RUNTIME_MANIFEST_RELATIVE_PATH.to_string()); - excluded_relative_paths.insert(crate::install::LEGACY_RUNTIME_MANIFEST_RELATIVE_PATH.to_string()); - if runtime_state_dir_contains_only_manifests(&app_dir)? { - excluded_relative_paths.insert(".surge".to_string()); - } - - let budget = ctx.resource_budget(); - let archive = build_canonical_archive_from_directory( - &app_dir, - budget.zstd_compression_level, - budget.effective_zstd_workers(), - &excluded_relative_paths, - )?; - - let mut cache_path = None; - if !current_release.full_sha256.trim().is_empty() { - let actual_sha256 = sha256_hex(&archive); - if actual_sha256 == current_release.full_sha256 { - cache_path = Some(cache_path_for_key(artifact_cache_dir, ¤t_release.full_filename)?); - } else { - warn!( - version = %current_release.version, - expected_sha256 = %current_release.full_sha256, - actual_sha256 = %actual_sha256, - "Installed app content reproduced the current package payload but not the original compressed full archive bytes; using synthesized archive for in-flight delta application without caching it" - ); - } - } - - if let Some(cache_path) = cache_path { - write_file_atomic(&cache_path, &archive)?; - debug!( - version = %current_release.version, - app_dir = %app_dir.display(), - cache_path = %cache_path.display(), - "Rebuilt current full archive from installed app content" - ); - } - Ok(archive) -} - -pub(super) fn find_previous_app_dir(install_dir: &Path, current_version: &str) -> Option { - let active = install_dir.join("app"); - if active.is_dir() { - return Some(active); - } - - let explicit = install_dir.join(format!("app-{current_version}")); - if explicit.is_dir() { - return Some(explicit); - } - - find_latest_app_dir(install_dir).ok() -} - -fn runtime_state_dir_contains_only_manifests(app_dir: &Path) -> Result { - let surge_dir = app_dir.join(".surge"); - if !surge_dir.exists() { - return Ok(false); - } - if !surge_dir.is_dir() { - return Ok(false); - } - - let allowed = BTreeSet::from([ - crate::install::RUNTIME_MANIFEST_RELATIVE_PATH.to_string(), - crate::install::LEGACY_RUNTIME_MANIFEST_RELATIVE_PATH.to_string(), - ]); - let mut stack = vec![surge_dir]; - while let Some(dir) = stack.pop() { - let entries = std::fs::read_dir(&dir)?.collect::, std::io::Error>>()?; - for entry in entries { - let path = entry.path(); - let metadata = std::fs::symlink_metadata(&path)?; - if metadata.is_dir() { - stack.push(path); - continue; - } - - let relative = path - .strip_prefix(app_dir) - .map_err(|e| SurgeError::Update(format!("Failed to relativize installed app path: {e}")))?; - let relative = relative.to_string_lossy().replace('\\', "/"); - if !allowed.contains(&relative) { - return Ok(false); - } - } - } - - Ok(true) -} diff --git a/crates/surge-core/src/update/manager/apply/base.rs b/crates/surge-core/src/update/manager/apply/base.rs new file mode 100644 index 0000000..2ab512f --- /dev/null +++ b/crates/surge-core/src/update/manager/apply/base.rs @@ -0,0 +1,256 @@ +use std::path::Path; +use std::sync::Arc; +use std::time::Instant; + +use tracing::debug; + +use crate::config::constants::RELEASES_FILE_COMPRESSED; +use crate::error::{Result, SurgeError}; +use crate::platform::detect::current_rid; +use crate::releases::manifest::{ + PATCH_FORMAT_SPARSE_FILE_OPS_V1, ReleaseEntry, ReleaseIndex, decompress_release_index, +}; +use crate::releases::restore::{ + RebuiltFullCachePolicy, RestoreOptions, RestoreProgress, find_release_for_version_rid, + restore_full_archive_for_version_with_options, +}; + +use super::super::progress::{ + ProgressInfo, average_speed_bytes_per_sec, clamp_progress_percent, emit_progress, phase_total_percent, +}; +use super::super::progress_substep::{HEARTBEAT_INTERVAL, PhaseProgressEmitter, labels as apply_phase}; +use super::super::{UpdateInfo, UpdateManager}; +use super::installed_app::synthesize_current_full_archive_from_installed_app; + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub(super) enum BaseFullArchiveSource { + InstalledApp, + ReleaseGraph, +} + +pub(super) struct BaseFullArchive { + pub(super) archive: Vec, + pub(super) source: BaseFullArchiveSource, +} + +pub(super) async fn restore_base_full_archive( + manager: &UpdateManager, + info: &UpdateInfo, + artifact_cache_dir: &Path, + progress: Option<&Arc>, + progress_emitter: &PhaseProgressEmitter<'_, F>, +) -> Result +where + F: Fn(ProgressInfo) + Send + Sync, +{ + let (index, rid, current_release) = load_current_release_context(manager).await?; + + if should_prefer_installed_app_base(info) { + match restore_base_full_archive_from_installed_app( + manager, + ¤t_release, + artifact_cache_dir, + progress_emitter, + ) + .await + { + Ok(archive) => { + return Ok(BaseFullArchive { + archive, + source: BaseFullArchiveSource::InstalledApp, + }); + } + Err(installed_app_err) => { + debug!( + version = %manager.current_version, + error = %installed_app_err, + "Installed app current package restoration failed; falling back to release graph" + ); + } + } + } + + restore_base_full_archive_from_release_graph( + manager, + &index, + &rid, + ¤t_release, + artifact_cache_dir, + progress, + progress_emitter, + ) + .await + .map(|archive| BaseFullArchive { + archive, + source: BaseFullArchiveSource::ReleaseGraph, + }) +} + +pub(super) async fn restore_release_graph_base_full_archive( + manager: &UpdateManager, + artifact_cache_dir: &Path, + progress: Option<&Arc>, + progress_emitter: &PhaseProgressEmitter<'_, F>, +) -> Result +where + F: Fn(ProgressInfo) + Send + Sync, +{ + let (index, rid, current_release) = load_current_release_context(manager).await?; + restore_base_full_archive_from_release_graph( + manager, + &index, + &rid, + ¤t_release, + artifact_cache_dir, + progress, + progress_emitter, + ) + .await + .map(|archive| BaseFullArchive { + archive, + source: BaseFullArchiveSource::ReleaseGraph, + }) +} + +async fn load_current_release_context(manager: &UpdateManager) -> Result<(ReleaseIndex, String, ReleaseEntry)> { + let index = if let Some(cached) = &manager.cached_index { + cached.clone() + } else { + let data = manager.storage.get_object(RELEASES_FILE_COMPRESSED).await?; + decompress_release_index(&data)? + }; + let rid = current_rid(); + let current_release = find_release_for_version_rid(&index, &rid, &manager.current_version) + .cloned() + .ok_or_else(|| { + SurgeError::Update(format!( + "Current release {} ({rid}) was not found in the release index", + manager.current_version + )) + })?; + Ok((index, rid, current_release)) +} + +async fn restore_base_full_archive_from_release_graph( + manager: &UpdateManager, + index: &ReleaseIndex, + rid: &str, + current_release: &ReleaseEntry, + artifact_cache_dir: &Path, + progress: Option<&Arc>, + progress_emitter: &PhaseProgressEmitter<'_, F>, +) -> Result> +where + F: Fn(ProgressInfo) + Send + Sync, +{ + let restore_started_at = Instant::now(); + let progress_for_restore = progress.cloned(); + let restore_progress = |restore: RestoreProgress| { + let phase_percent = if restore.bytes_total > 0 { + clamp_progress_percent(restore.bytes_done, restore.bytes_total) + } else { + clamp_progress_percent(restore.items_done, restore.items_total) + }; + emit_progress( + progress_for_restore.as_ref(), + ProgressInfo { + phase: 5, + phase_label: apply_phase::RESTORING_CURRENT_PACKAGE_FROM_RELEASE_GRAPH, + phase_percent, + total_percent: phase_total_percent(60, 10, phase_percent), + bytes_done: restore.bytes_done, + bytes_total: restore.bytes_total, + items_done: restore.items_done, + items_total: restore.items_total, + speed_bytes_per_sec: average_speed_bytes_per_sec( + u64::try_from(restore.bytes_done.max(0)).unwrap_or(u64::MAX), + restore_started_at, + ), + }, + ); + progress_emitter.persist_current_phase(apply_phase::RESTORING_CURRENT_PACKAGE_FROM_RELEASE_GRAPH); + }; + + let restore_future = restore_full_archive_for_version_with_options( + manager.storage.as_ref(), + index, + rid, + &manager.current_version, + RestoreOptions { + cache_dir: Some(artifact_cache_dir), + progress: Some(&restore_progress), + rebuilt_full_cache_policy: RebuiltFullCachePolicy::TargetOnly, + }, + ); + match progress_emitter + .run_with_heartbeat( + 5, + apply_phase::RESTORING_CURRENT_PACKAGE_FROM_RELEASE_GRAPH, + 60, + HEARTBEAT_INTERVAL, + restore_future, + ) + .await + { + Ok(archive) => Ok(archive), + Err(restore_err) => synthesize_current_full_archive_from_installed_app( + &manager.install_dir, + &manager.current_version, + current_release, + artifact_cache_dir, + &manager.ctx, + ) + .map_err(|fallback_err| { + SurgeError::Update(format!( + "Failed to restore base full archive for {}: {restore_err}; installed-app fallback failed: {fallback_err}", + manager.current_version + )) + }), + } +} + +async fn restore_base_full_archive_from_installed_app( + manager: &UpdateManager, + current_release: &ReleaseEntry, + artifact_cache_dir: &Path, + progress_emitter: &PhaseProgressEmitter<'_, F>, +) -> Result> +where + F: Fn(ProgressInfo) + Send + Sync, +{ + let install_dir = manager.install_dir.clone(); + let current_version = manager.current_version.clone(); + let current_release = current_release.clone(); + let artifact_cache_dir = artifact_cache_dir.to_path_buf(); + let ctx = Arc::clone(&manager.ctx); + + progress_emitter + .run_with_heartbeat( + 5, + apply_phase::RESTORING_CURRENT_PACKAGE_FROM_INSTALLED_APP, + 60, + HEARTBEAT_INTERVAL, + tokio::task::spawn_blocking(move || { + synthesize_current_full_archive_from_installed_app( + &install_dir, + ¤t_version, + ¤t_release, + &artifact_cache_dir, + &ctx, + ) + }), + ) + .await + .map_err(|e| SurgeError::Update(format!("Failed to join installed app archive synthesis task: {e}")))? +} + +fn should_prefer_installed_app_base(info: &UpdateInfo) -> bool { + info.apply_releases.iter().all(|release| { + release.selected_delta().is_some_and(|delta| { + delta + .patch_format + .trim() + .eq_ignore_ascii_case(PATCH_FORMAT_SPARSE_FILE_OPS_V1) + }) + }) +} diff --git a/crates/surge-core/src/update/manager/apply/delta.rs b/crates/surge-core/src/update/manager/apply/delta.rs new file mode 100644 index 0000000..1a47691 --- /dev/null +++ b/crates/surge-core/src/update/manager/apply/delta.rs @@ -0,0 +1,170 @@ +use std::path::Path; +use std::sync::Arc; +use std::time::Instant; + +use crate::crypto::sha256::sha256_hex; +use crate::error::{Result, SurgeError}; +use crate::releases::delta::{ + DeltaApplyProgress, apply_delta_patch_with_progress, decode_delta_patch, is_supported_delta, +}; + +use super::super::progress::{ + ProgressInfo, average_speed_bytes_per_sec, clamp_progress_percent, clamp_progress_percent_u64, emit_progress, + phase_total_percent, +}; +use super::super::progress_substep::{PhaseProgressEmitter, labels as apply_phase}; +use super::super::{UpdateInfo, UpdateManager}; + +pub(super) async fn apply_target_deltas( + manager: &UpdateManager, + info: &UpdateInfo, + staging_dir: &Path, + mut rebuilt_archive: Vec, + progress: Option<&Arc>, + progress_emitter: &PhaseProgressEmitter<'_, F>, + apply_delta_total_items: i64, + apply_delta_total_bytes: i64, +) -> Result> +where + F: Fn(ProgressInfo) + Send + Sync, +{ + let apply_delta_started_at = Instant::now(); + let mut apply_delta_items_done = 0i64; + let mut apply_delta_bytes_done = 0i64; + + progress_emitter.emit_substep(5, apply_phase::APPLYING_TARGET_DELTAS, 60); + for release in &info.apply_releases { + manager.ctx.check_cancelled()?; + + let Some(delta) = release.selected_delta() else { + return Err(SurgeError::Update(format!( + "Delta update path is missing delta filename for {}", + release.version + ))); + }; + + if !is_supported_delta(&delta) { + return Err(SurgeError::Update(format!( + "Delta {} for {} uses unsupported descriptor (algorithm='{}', format='{}', compression='{}')", + delta.filename, release.version, delta.algorithm, delta.patch_format, delta.compression + ))); + } + + let delta_path = staging_dir.join(&delta.filename); + let delta_compressed = tokio::fs::read(&delta_path).await?; + let patch = decode_delta_patch(delta_compressed.as_slice(), &delta) + .map_err(|e| SurgeError::Archive(format!("Failed to decompress delta {}: {e}", delta.filename)))?; + let progress_for_delta = progress.cloned(); + let completed_bytes_before_delta = apply_delta_bytes_done; + let completed_items_before_delta = apply_delta_items_done; + let current_delta_bytes = delta.size.max(0); + let delta_progress = move |delta_progress: DeltaApplyProgress| { + let bytes_done = completed_bytes_before_delta.saturating_add(scale_progress_units_i64( + current_delta_bytes, + delta_progress.units_done, + delta_progress.units_total, + )); + let phase_percent = if apply_delta_total_bytes > 0 { + clamp_progress_percent(bytes_done, apply_delta_total_bytes) + } else { + scale_apply_delta_items_percent( + completed_items_before_delta, + apply_delta_total_items, + delta_progress.units_done, + delta_progress.units_total, + ) + }; + emit_progress( + progress_for_delta.as_ref(), + ProgressInfo { + phase: 5, + phase_label: apply_phase::APPLYING_TARGET_DELTAS, + phase_percent, + total_percent: phase_total_percent(60, 20, phase_percent), + bytes_done, + bytes_total: apply_delta_total_bytes, + items_done: completed_items_before_delta, + items_total: apply_delta_total_items, + speed_bytes_per_sec: average_speed_bytes_per_sec( + u64::try_from(bytes_done.max(0)).unwrap_or(u64::MAX), + apply_delta_started_at, + ), + }, + ); + progress_emitter.persist_current_phase(apply_phase::APPLYING_TARGET_DELTAS); + }; + + rebuilt_archive = apply_delta_patch_with_progress(&rebuilt_archive, &patch, &delta, Some(&delta_progress)) + .map_err(|e| SurgeError::Update(format!("Failed to apply delta {}: {e}", delta.filename)))?; + + if !release.full_sha256.is_empty() { + let hash = sha256_hex(&rebuilt_archive); + if hash != release.full_sha256 { + return Err(SurgeError::Update(format!( + "SHA-256 mismatch for rebuilt full archive {}: expected {}, got {hash}", + release.version, release.full_sha256 + ))); + } + } + + apply_delta_items_done = apply_delta_items_done.saturating_add(1); + apply_delta_bytes_done = apply_delta_bytes_done.saturating_add(delta.size.max(0)); + let phase_percent = clamp_progress_percent(apply_delta_items_done, apply_delta_total_items.max(1)); + emit_progress( + progress, + ProgressInfo { + phase: 5, + phase_label: apply_phase::APPLYING_TARGET_DELTAS, + phase_percent, + total_percent: phase_total_percent(60, 20, phase_percent), + bytes_done: apply_delta_bytes_done, + bytes_total: apply_delta_total_bytes, + items_done: apply_delta_items_done, + items_total: apply_delta_total_items, + speed_bytes_per_sec: average_speed_bytes_per_sec( + u64::try_from(apply_delta_bytes_done.max(0)).unwrap_or(u64::MAX), + apply_delta_started_at, + ), + }, + ); + } + + emit_progress( + progress, + ProgressInfo { + phase: 5, + phase_label: apply_phase::APPLYING_TARGET_DELTAS, + phase_percent: 100, + total_percent: 80, + bytes_done: apply_delta_total_bytes, + bytes_total: apply_delta_total_bytes, + items_done: apply_delta_total_items, + items_total: apply_delta_total_items, + speed_bytes_per_sec: average_speed_bytes_per_sec( + u64::try_from(apply_delta_total_bytes.max(0)).unwrap_or(u64::MAX), + apply_delta_started_at, + ), + }, + ); + + Ok(rebuilt_archive) +} + +fn scale_progress_units_i64(total: i64, done: u64, units_total: u64) -> i64 { + if total <= 0 || units_total == 0 { + return 0; + } + let total = u64::try_from(total).unwrap_or(u64::MAX); + let scaled = total.saturating_mul(done.min(units_total)) / units_total; + i64::try_from(scaled).unwrap_or(i64::MAX) +} + +fn scale_apply_delta_items_percent(completed_items: i64, total_items: i64, done: u64, units_total: u64) -> i32 { + let total_items = u64::try_from(total_items.max(1)).unwrap_or(u64::MAX); + let completed_items = u64::try_from(completed_items.max(0)).unwrap_or(u64::MAX); + let units_total = units_total.max(1); + let done = done.min(units_total); + let scaled_done = completed_items.saturating_mul(units_total).saturating_add(done); + let scaled_total = total_items.saturating_mul(units_total); + clamp_progress_percent_u64(scaled_done, scaled_total) +} diff --git a/crates/surge-core/src/update/manager/apply/installed_app.rs b/crates/surge-core/src/update/manager/apply/installed_app.rs new file mode 100644 index 0000000..6e3d9f1 --- /dev/null +++ b/crates/surge-core/src/update/manager/apply/installed_app.rs @@ -0,0 +1,120 @@ +use std::collections::BTreeSet; +use std::path::{Path, PathBuf}; +use std::sync::Arc; + +use tracing::{debug, warn}; + +use crate::context::Context; +use crate::crypto::sha256::sha256_hex; +use crate::error::{Result, SurgeError}; +use crate::pack::builder::build_canonical_archive_from_directory; +use crate::platform::fs::write_file_atomic; +use crate::releases::artifact_cache::cache_path_for_key; +use crate::releases::manifest::ReleaseEntry; +use crate::supervisor::stub::find_latest_app_dir; + +pub(in crate::update::manager) fn synthesize_current_full_archive_from_installed_app( + install_dir: &Path, + current_version: &str, + current_release: &ReleaseEntry, + artifact_cache_dir: &Path, + ctx: &Arc, +) -> Result> { + let app_dir = find_previous_app_dir(install_dir, current_version).ok_or_else(|| { + SurgeError::NotFound(format!( + "No active installed app directory was found for current version {current_version}" + )) + })?; + + let mut excluded_relative_paths = BTreeSet::new(); + excluded_relative_paths.insert(crate::install::RUNTIME_MANIFEST_RELATIVE_PATH.to_string()); + excluded_relative_paths.insert(crate::install::LEGACY_RUNTIME_MANIFEST_RELATIVE_PATH.to_string()); + if runtime_state_dir_contains_only_manifests(&app_dir)? { + excluded_relative_paths.insert(".surge".to_string()); + } + + let budget = ctx.resource_budget(); + let archive = build_canonical_archive_from_directory( + &app_dir, + budget.zstd_compression_level, + budget.effective_zstd_workers(), + &excluded_relative_paths, + )?; + + let mut cache_path = None; + if !current_release.full_sha256.trim().is_empty() { + let actual_sha256 = sha256_hex(&archive); + if actual_sha256 == current_release.full_sha256 { + cache_path = Some(cache_path_for_key(artifact_cache_dir, ¤t_release.full_filename)?); + } else { + warn!( + version = %current_release.version, + expected_sha256 = %current_release.full_sha256, + actual_sha256 = %actual_sha256, + "Installed app content reproduced the current package payload but not the original compressed full archive bytes; using synthesized archive for in-flight delta application without caching it" + ); + } + } + + if let Some(cache_path) = cache_path { + write_file_atomic(&cache_path, &archive)?; + debug!( + version = %current_release.version, + app_dir = %app_dir.display(), + cache_path = %cache_path.display(), + "Rebuilt current full archive from installed app content" + ); + } + Ok(archive) +} + +pub(in crate::update::manager) fn find_previous_app_dir(install_dir: &Path, current_version: &str) -> Option { + let active = install_dir.join("app"); + if active.is_dir() { + return Some(active); + } + + let explicit = install_dir.join(format!("app-{current_version}")); + if explicit.is_dir() { + return Some(explicit); + } + + find_latest_app_dir(install_dir).ok() +} + +fn runtime_state_dir_contains_only_manifests(app_dir: &Path) -> Result { + let surge_dir = app_dir.join(".surge"); + if !surge_dir.exists() { + return Ok(false); + } + if !surge_dir.is_dir() { + return Ok(false); + } + + let allowed = BTreeSet::from([ + crate::install::RUNTIME_MANIFEST_RELATIVE_PATH.to_string(), + crate::install::LEGACY_RUNTIME_MANIFEST_RELATIVE_PATH.to_string(), + ]); + let mut stack = vec![surge_dir]; + while let Some(dir) = stack.pop() { + let entries = std::fs::read_dir(&dir)?.collect::, std::io::Error>>()?; + for entry in entries { + let path = entry.path(); + let metadata = std::fs::symlink_metadata(&path)?; + if metadata.is_dir() { + stack.push(path); + continue; + } + + let relative = path + .strip_prefix(app_dir) + .map_err(|e| SurgeError::Update(format!("Failed to relativize installed app path: {e}")))?; + let relative = relative.to_string_lossy().replace('\\', "/"); + if !allowed.contains(&relative) { + return Ok(false); + } + } + } + + Ok(true) +} diff --git a/crates/surge-core/src/update/manager/progress_substep.rs b/crates/surge-core/src/update/manager/progress_substep.rs index 77801f9..43e6a4a 100644 --- a/crates/surge-core/src/update/manager/progress_substep.rs +++ b/crates/surge-core/src/update/manager/progress_substep.rs @@ -1,13 +1,10 @@ -//! Phase-substep progress emission for the finalize phase. +//! Phase-substep progress emission for long-running update phases. //! -//! The finalize phase of an update runs several substeps (supervisor -//! shutdown, atomic directory swaps, persistent asset copy, cache pruning, -//! post-update hook, supervisor restart) that historically reported nothing -//! to the user once the bytes/items counter hit 100%. This module owns the -//! helper that emits `ProgressInfo` with a `phase_label` for each substep, -//! mirrors that label into the persisted in-progress -//! [`UpdateStatusRecord`], and provides a heartbeat helper for substeps -//! that can block silently for many seconds. +//! Some update work historically reported nothing useful once the coarse +//! bytes/items counter hit a phase boundary. This module owns the helper that +//! emits `ProgressInfo` with a `phase_label` for each substep, mirrors that +//! label into the persisted in-progress [`UpdateStatusRecord`], and provides a +//! heartbeat helper for substeps that can block silently for many seconds. //! //! Substep labels live in [`labels`] so the manager and tests reference the //! same canonical strings. @@ -21,15 +18,20 @@ use tracing::warn; use super::progress::{ProgressInfo, emit_progress}; use crate::update::status::{self, UpdateStatusRecord}; -/// Substep labels for the finalize phase. These appear in `ProgressInfo` +/// Substep labels for long-running update work. These appear in `ProgressInfo` /// events and on the persisted `current_phase` field of in-progress update -/// status records so operators can tell a stuck "swapping app directory" -/// apart from a stuck "starting supervisor". +/// status records so operators can distinguish expensive apply/finalize +/// substeps. pub(crate) mod labels { pub const RELEASE_RESOLVED: &str = "release or delta resolved"; pub const PACKAGE_DOWNLOAD_STARTED: &str = "package download started"; pub const PACKAGE_DOWNLOADED: &str = "package downloaded"; pub const PACKAGE_APPLY_STARTED: &str = "package apply started"; + pub const RESTORING_CURRENT_PACKAGE_FROM_INSTALLED_APP: &str = "restoring current package from installed app"; + pub const RESTORING_CURRENT_PACKAGE_FROM_RELEASE_GRAPH: &str = "restoring current package from release graph"; + pub const APPLYING_TARGET_DELTAS: &str = "applying target deltas"; + pub const WRITING_REBUILT_PACKAGE: &str = "writing rebuilt package"; + pub const EXTRACTING_REBUILT_PACKAGE: &str = "extracting rebuilt package"; pub const PACKAGE_APPLY_COMPLETED: &str = "package apply completed"; pub const STOPPING_SUPERVISOR: &str = "stopping supervisor"; pub const PREPARING_SWAP: &str = "preparing app swap"; @@ -127,7 +129,7 @@ where } } - fn persist_current_phase(&self, label: &'static str) { + pub(super) fn persist_current_phase(&self, label: &'static str) { let mut record = self .in_progress_template .clone()