Skip to content
4 changes: 3 additions & 1 deletion src/services/api/models/agent/status.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#![allow(dead_code)]

use crate::utils::deserializer::deserialize_snake_case;
use crate::utils::deserializer::{deserialize_snake_case, string_or_number_to_string};
use serde::{Deserialize, Serialize};
use toml::Value;

Expand Down Expand Up @@ -54,4 +54,6 @@ pub struct RestoreInfo {
pub file: Option<String>,
#[serde(rename = "metaFile")]
pub meta_file: Option<String>,
#[serde(default, deserialize_with = "string_or_number_to_string")]
pub size: Option<String>,
}
7 changes: 6 additions & 1 deletion src/services/restore/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,19 @@ impl RestoreService {
return;
};

let expected_size = db.data.restore.size.clone();

let service = Self {
ctx: self.ctx.clone(),
};

let db_cfg = cfg.clone();

tokio::spawn(async move {
if let Err(e) = service.execute_restore(db_cfg, file_to_restore).await {
if let Err(e) = service
.execute_restore(db_cfg, file_to_restore, expected_size)
.await
{
error!("Restore failed: {}", e);
}
});
Expand Down
85 changes: 80 additions & 5 deletions src/services/restore/downloader.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,40 @@
use super::service::RestoreService;

use anyhow::Result;
use futures::StreamExt;
use reqwest::{Client, Url};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Instant;
use tokio::io::AsyncWriteExt;
use crate::services::backup::logger::JobLogger;

fn human_size(bytes: u64) -> String {
if bytes >= 1024 * 1024 {
format!("{} MB", bytes / 1024 / 1024)
} else if bytes >= 1024 {
format!("{} KB", bytes / 1024)
} else {
format!("{bytes} B")
}
}

impl RestoreService {
pub async fn download_backup(&self, file_url: &str, tmp_path: &Path, logger: Arc<JobLogger>) -> Result<PathBuf> {
pub async fn download_backup(
&self,
file_url: &str,
tmp_path: &Path,
logger: Arc<JobLogger>,
expected_size: Option<String>,
) -> Result<PathBuf> {
logger.log("info", "Start downloading backup archive".to_string());

let client = Client::new();

let response = client.get(file_url).send().await?;
Comment thread
RambokDev marked this conversation as resolved.
let status = response.status();

if !response.status().is_success() {
if !status.is_success() {
logger.log("error", "Failed to download".to_string());
anyhow::bail!("download failed");
}
Expand All @@ -39,11 +59,66 @@ impl RestoreService {

let path = tmp_path.join(&filename);

let bytes = response.bytes().await?;
let total = expected_size
.as_deref()
.and_then(|s| s.trim().parse::<u64>().ok())
.filter(|&n| n > 0);

logger.log(
"info",
format!(
"Downloading backup '{}' ({})",
filename,
total.map(human_size).unwrap_or_else(|| "unknown size".to_string())
),
);

let start = Instant::now();
let mut file = tokio::fs::File::create(&path).await?;
let mut stream = response.bytes_stream();
let mut downloaded: u64 = 0;
let mut next_pct: u64 = 10;

while let Some(chunk) = stream.next().await {
let chunk = chunk?;
file.write_all(&chunk).await?;
downloaded += chunk.len() as u64;

if let Some(total) = total {
let pct = (downloaded.saturating_mul(100) / total).min(100);
let milestone = pct / 10 * 10;
if milestone >= next_pct {
logger.log(
"info",
format!(
"Download progress: {}% ({} / {} bytes)",
milestone, downloaded, total
),
);
next_pct = milestone + 10;
}
}
}

file.flush().await?;

if downloaded == 0 {
logger.log(
"warn",
format!("Downloaded 0 bytes (status {status}); backup body was empty"),
);
}

tokio::fs::write(&path, &bytes).await?;
logger.log(
"info",
format!(
"Backup downloaded to {} ( {} bytes in {:.1}s)",
path.display(),
downloaded,
start.elapsed().as_secs_f64()
),
);

logger.log("info", format!("Backup downloaded to {}", path.display()));
Ok(path)
Comment thread
RambokDev marked this conversation as resolved.
}
}
11 changes: 9 additions & 2 deletions src/services/restore/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@ use std::time::Instant;
use tempfile::TempDir;

impl RestoreService {
pub async fn execute_restore(&self, cfg: DatabaseConfig, file_url: String) -> Result<()> {
pub async fn execute_restore(
&self,
cfg: DatabaseConfig,
file_url: String,
expected_size: Option<String>,
) -> Result<()> {
let logger = Arc::new(JobLogger::new());
let start = Instant::now();

Expand All @@ -18,7 +23,9 @@ impl RestoreService {

logger.log("info", format!("Created temp directory {}", tmp_path.display()));

let downloaded = self.download_backup(&file_url, tmp_path, Arc::clone(&logger)).await?;
let downloaded = self
.download_backup(&file_url, tmp_path, Arc::clone(&logger), expected_size)
.await?;

let backup_file = self.prepare_archive(downloaded, tmp_path, Arc::clone(&logger)).await?;

Expand Down
Loading