diff --git a/src/services/api/models/agent/status.rs b/src/services/api/models/agent/status.rs index 8b1b145..34ea915 100644 --- a/src/services/api/models/agent/status.rs +++ b/src/services/api/models/agent/status.rs @@ -1,6 +1,6 @@ #![allow(dead_code)] -use crate::utils::deserializer::deserialize_snake_case; +use crate::utils::deserializer::{deserialize_snake_case, string_or_number_to_string}; use serde::{Deserialize, Serialize}; use toml::Value; @@ -54,4 +54,6 @@ pub struct RestoreInfo { pub file: Option, #[serde(rename = "metaFile")] pub meta_file: Option, + #[serde(default, deserialize_with = "string_or_number_to_string")] + pub size: Option, } diff --git a/src/services/restore/dispatcher.rs b/src/services/restore/dispatcher.rs index bf0c9c4..61f35fd 100644 --- a/src/services/restore/dispatcher.rs +++ b/src/services/restore/dispatcher.rs @@ -20,6 +20,8 @@ impl RestoreService { return; }; + let expected_size = db.data.restore.size.clone(); + let service = Self { ctx: self.ctx.clone(), }; @@ -27,7 +29,10 @@ impl RestoreService { let db_cfg = cfg.clone(); tokio::spawn(async move { - if let Err(e) = service.execute_restore(db_cfg, file_to_restore).await { + if let Err(e) = service + .execute_restore(db_cfg, file_to_restore, expected_size) + .await + { error!("Restore failed: {}", e); } }); diff --git a/src/services/restore/downloader.rs b/src/services/restore/downloader.rs index ef10394..a154160 100644 --- a/src/services/restore/downloader.rs +++ b/src/services/restore/downloader.rs @@ -1,20 +1,40 @@ use super::service::RestoreService; use anyhow::Result; +use futures::StreamExt; use reqwest::{Client, Url}; use std::path::{Path, PathBuf}; use std::sync::Arc; +use std::time::Instant; +use tokio::io::AsyncWriteExt; use crate::services::backup::logger::JobLogger; +fn human_size(bytes: u64) -> String { + if bytes >= 1024 * 1024 { + format!("{} MB", bytes / 1024 / 1024) + } else if bytes >= 1024 { + format!("{} KB", bytes / 1024) + } else { + format!("{bytes} B") + } +} + impl RestoreService { - pub async fn download_backup(&self, file_url: &str, tmp_path: &Path, logger: Arc) -> Result { + pub async fn download_backup( + &self, + file_url: &str, + tmp_path: &Path, + logger: Arc, + expected_size: Option, + ) -> Result { logger.log("info", "Start downloading backup archive".to_string()); let client = Client::new(); let response = client.get(file_url).send().await?; + let status = response.status(); - if !response.status().is_success() { + if !status.is_success() { logger.log("error", "Failed to download".to_string()); anyhow::bail!("download failed"); } @@ -39,11 +59,66 @@ impl RestoreService { let path = tmp_path.join(&filename); - let bytes = response.bytes().await?; + let total = expected_size + .as_deref() + .and_then(|s| s.trim().parse::().ok()) + .filter(|&n| n > 0); + + logger.log( + "info", + format!( + "Downloading backup '{}' ({})", + filename, + total.map(human_size).unwrap_or_else(|| "unknown size".to_string()) + ), + ); + + let start = Instant::now(); + let mut file = tokio::fs::File::create(&path).await?; + let mut stream = response.bytes_stream(); + let mut downloaded: u64 = 0; + let mut next_pct: u64 = 10; + + while let Some(chunk) = stream.next().await { + let chunk = chunk?; + file.write_all(&chunk).await?; + downloaded += chunk.len() as u64; + + if let Some(total) = total { + let pct = (downloaded.saturating_mul(100) / total).min(100); + let milestone = pct / 10 * 10; + if milestone >= next_pct { + logger.log( + "info", + format!( + "Download progress: {}% ({} / {} bytes)", + milestone, downloaded, total + ), + ); + next_pct = milestone + 10; + } + } + } + + file.flush().await?; + + if downloaded == 0 { + logger.log( + "warn", + format!("Downloaded 0 bytes (status {status}); backup body was empty"), + ); + } - tokio::fs::write(&path, &bytes).await?; + logger.log( + "info", + format!( + "Backup downloaded to {} ( {} bytes in {:.1}s)", + path.display(), + downloaded, + start.elapsed().as_secs_f64() + ), + ); - logger.log("info", format!("Backup downloaded to {}", path.display())); Ok(path) } } diff --git a/src/services/restore/executor.rs b/src/services/restore/executor.rs index 7f1cff1..27ef070 100644 --- a/src/services/restore/executor.rs +++ b/src/services/restore/executor.rs @@ -7,7 +7,12 @@ use std::time::Instant; use tempfile::TempDir; impl RestoreService { - pub async fn execute_restore(&self, cfg: DatabaseConfig, file_url: String) -> Result<()> { + pub async fn execute_restore( + &self, + cfg: DatabaseConfig, + file_url: String, + expected_size: Option, + ) -> Result<()> { let logger = Arc::new(JobLogger::new()); let start = Instant::now(); @@ -18,7 +23,9 @@ impl RestoreService { logger.log("info", format!("Created temp directory {}", tmp_path.display())); - let downloaded = self.download_backup(&file_url, tmp_path, Arc::clone(&logger)).await?; + let downloaded = self + .download_backup(&file_url, tmp_path, Arc::clone(&logger), expected_size) + .await?; let backup_file = self.prepare_archive(downloaded, tmp_path, Arc::clone(&logger)).await?;