From f7639de0961e2ad75020e50ce70c687c68ea7d38 Mon Sep 17 00:00:00 2001 From: Charles GTE Date: Sat, 20 Jun 2026 17:39:10 +0200 Subject: [PATCH 1/4] build: add azure_storage_blob and azure_core dependencies --- Cargo.lock | 190 +++++++++++++++++++++++++++++++++++++++++++++++++++++ Cargo.toml | 2 + 2 files changed, 192 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 56482e1..f498065 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -136,6 +136,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "async-lock" +version = "3.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "290f7f2596bd5b78a9fec8088ccd89180d7f9f55b94b0576823bbbdc72ee8311" +dependencies = [ + "event-listener", + "event-listener-strategy", + "pin-project-lite", +] + [[package]] name = "async-stream" version = "0.3.6" @@ -679,6 +690,58 @@ dependencies = [ "tower-service", ] +[[package]] +name = "azure_core" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe6a26a7d374b440015cbbcbf2d9d8be5a133aa940599f5e5dc569504baa262e" +dependencies = [ + "async-lock", + "async-trait", + "azure_core_macros", + "bytes", + "futures", + "pin-project", + "rustc_version", + "serde", + "serde_json", + "tokio", + "tracing", + "typespec", + "typespec_client_core", +] + +[[package]] +name = "azure_core_macros" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9b52dba6a345f3ad2d42ff8d0d63df9d0994cfa29657bf18ffdbf149f78a4f5" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "tracing", +] + +[[package]] +name = "azure_storage_blob" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1756febbcca86c862ef718b983b505d08bd65a9bc984a915b0a16af4a4c3fe5b" +dependencies = [ + "async-stream", + "async-trait", + "azure_core", + "bytes", + "futures", + "percent-encoding", + "pin-project", + "serde", + "serde_json", + "time", + "tokio", +] + [[package]] name = "base16ct" version = "0.1.1" @@ -910,6 +973,17 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" +[[package]] +name = "chacha20" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f8d983286843e49675a4b7a2d174efe136dc93a18d69130dd18198a6c167601" +dependencies = [ + "cfg-if", + "cpufeatures 0.3.0", + "rand_core 0.10.0", +] + [[package]] name = "chrono" version = "0.4.44" @@ -981,6 +1055,15 @@ version = "0.4.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "75984efb6ed102a0d42db99afb6c1948f0380d1d91808d5529916e6c08b49d8d" +[[package]] +name = "concurrent-queue" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "connection-string" version = "0.2.0" @@ -1501,6 +1584,27 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "event-listener" +version = "5.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e13b66accf52311f30a0db42147dadea9850cb48cd070028831ae5f5d4b856ab" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", +] + +[[package]] +name = "event-listener-strategy" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8be9f3dfaaffdae2972880079a491a1a8bb7cbed0b8dd7a347f668b4150a3b93" +dependencies = [ + "event-listener", + "pin-project-lite", +] + [[package]] name = "fallible-iterator" version = "0.2.0" @@ -2934,6 +3038,12 @@ dependencies = [ "sha2", ] +[[package]] +name = "parking" +version = "2.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba" + [[package]] name = "parking_lot" version = "0.12.5" @@ -3093,6 +3203,8 @@ dependencies = [ "async-trait", "aws-config", "aws-sdk-s3", + "azure_core", + "azure_storage_blob", "base64 0.22.1", "bytes", "chrono", @@ -3294,6 +3406,16 @@ dependencies = [ "prost", ] +[[package]] +name = "quick-xml" +version = "0.39.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cdcc8dd4e2f670d309a5f0e83fe36dfdc05af317008fea29144da1a2ac858e5e" +dependencies = [ + "memchr", + "serde", +] + [[package]] name = "quinn" version = "0.11.9" @@ -3398,6 +3520,17 @@ dependencies = [ "rand_core 0.9.5", ] +[[package]] +name = "rand" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2e8e8bcc7961af1fdac401278c6a831614941f6164ee3bf4ce61b7edb162207" +dependencies = [ + "chacha20", + "getrandom 0.4.2", + "rand_core 0.10.0", +] + [[package]] name = "rand_chacha" version = "0.3.1" @@ -4752,13 +4885,18 @@ version = "0.6.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d4e6559d53cc268e5031cd8429d05415bc4cb4aefc4aa5d6cc35fbf5b924a1f8" dependencies = [ + "async-compression", "bitflags 2.11.0", "bytes", + "futures-core", "futures-util", "http 1.4.0", "http-body 1.0.1", + "http-body-util", "iri-string", "pin-project-lite", + "tokio", + "tokio-util", "tower", "tower-layer", "tower-service", @@ -4884,6 +5022,58 @@ version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "562d481066bde0658276a35467c4af00bdc6ee726305698a55b86e61d7ad82bb" +[[package]] +name = "typespec" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21666a31293beab8f41d38c2849ddbc342cd9c7cb4d71a9818868287a8934e53" +dependencies = [ + "base64 0.22.1", + "bytes", + "futures", + "quick-xml", + "serde", + "serde_json", + "url", +] + +[[package]] +name = "typespec_client_core" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "924f0c734e0ac3b881ab99d032bd28fcc969d2bb73ef1b8dd4772fd8e518a382" +dependencies = [ + "async-trait", + "base64 0.22.1", + "bytes", + "dyn-clone", + "futures", + "pin-project", + "rand 0.10.1", + "reqwest 0.13.2", + "serde", + "serde_json", + "time", + "tokio", + "tracing", + "typespec", + "typespec_macros", + "url", + "uuid", +] + +[[package]] +name = "typespec_macros" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c608f4427943f8adb211abc95c87672b1b98847152783507d54e3246e502f60" +dependencies = [ + "proc-macro2", + "quote", + "rustc_version", + "syn", +] + [[package]] name = "unicase" version = "2.9.0" diff --git a/Cargo.toml b/Cargo.toml index 9355bca..ff71bcb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,6 +39,8 @@ tokio-util = { version = "0.7.18", features = ["compat"] } tiberius = { version = "0.12", default-features = false, features = ["rustls", "chrono"] } aws-config = "1.8.13" aws-sdk-s3 = { version = "1.122.0", features = ["behavior-version-latest"] } +azure_core = "1.0.0" +azure_storage_blob = "1.0.0" async-compression = { version = "0.4.37", features = ["tokio", "gzip"] } tokio-tar = "0.3.1" oauth2 = "5.0.0" From 84735ac3990198e659d214a3dd37399f776b8377 Mon Sep 17 00:00:00 2001 From: Charles GTE Date: Sat, 20 Jun 2026 17:49:54 +0200 Subject: [PATCH 2/4] feat(storage): spike Azure Blob SAS auth + block round-trip against Azurite Co-Authored-By: Claude Opus 4.8 --- src/services/mod.rs | 2 +- .../storage/providers/azure_blob/helpers.rs | 171 ++++++++++++++++++ .../storage/providers/azure_blob/mod.rs | 1 + src/services/storage/providers/mod.rs | 1 + src/tests/mod.rs | 1 + src/tests/storage/azure_blob.rs | 74 ++++++++ src/tests/storage/mod.rs | 1 + 7 files changed, 250 insertions(+), 1 deletion(-) create mode 100644 src/services/storage/providers/azure_blob/helpers.rs create mode 100644 src/services/storage/providers/azure_blob/mod.rs create mode 100644 src/tests/storage/azure_blob.rs create mode 100644 src/tests/storage/mod.rs diff --git a/src/services/mod.rs b/src/services/mod.rs index c4f3f04..260c3ab 100644 --- a/src/services/mod.rs +++ b/src/services/mod.rs @@ -4,4 +4,4 @@ pub mod config; pub mod cron; pub mod restore; pub mod status; -mod storage; +pub mod storage; diff --git a/src/services/storage/providers/azure_blob/helpers.rs b/src/services/storage/providers/azure_blob/helpers.rs new file mode 100644 index 0000000..8d796b5 --- /dev/null +++ b/src/services/storage/providers/azure_blob/helpers.rs @@ -0,0 +1,171 @@ +use anyhow::{Context as _, Result, anyhow}; +use base64::Engine; +use base64::engine::general_purpose::STANDARD; +use chrono::{Duration, Utc}; +use openssl::hash::MessageDigest; +use openssl::pkey::PKey; +use openssl::sign::Signer; +use url::Url; + +#[derive(Debug, Clone)] +pub struct ResolvedAzure { + pub account_name: String, + pub account_key: String, // base64, as Azure presents it + pub blob_endpoint: String, // e.g. http://127.0.0.1:10000/devstoreaccount1 +} + +#[derive(Clone, Copy)] +pub enum SasResource { + Blob, + // Container-scoped Service SAS is exercised by later tasks; kept here as part of the API. + #[allow(dead_code)] + Container, +} + +impl SasResource { + fn code(self) -> &'static str { + match self { SasResource::Blob => "b", SasResource::Container => "c" } + } +} + +const SAS_VERSION: &str = "2022-11-02"; + +fn hmac_sha256_b64(key: &[u8], data: &str) -> Result { + let pkey = PKey::hmac(key).context("hmac key")?; + let mut signer = Signer::new(MessageDigest::sha256(), &pkey).context("signer")?; + signer.update(data.as_bytes()).context("signer update")?; + let sig = signer.sign_to_vec().context("sign")?; + Ok(STANDARD.encode(sig)) +} + +/// Build Service SAS query pairs (raw, un-encoded) for `canonical_resource` +/// e.g. `/blob/{account}/{container}/{blob}`. +pub fn build_service_sas( + resolved: &ResolvedAzure, + canonical_resource: &str, + resource: SasResource, + permissions: &str, +) -> Result> { + let key = STANDARD + .decode(&resolved.account_key) + .map_err(|_| anyhow!("account key is not valid base64"))?; + + let signed_start = String::new(); + let signed_expiry = (Utc::now() + Duration::hours(1)) + .format("%Y-%m-%dT%H:%M:%SZ").to_string(); + let signed_protocol = "https,http"; // Azurite is http + let signed_resource = resource.code(); + + // Service SAS string-to-sign for sv >= 2020-12-06 (16 fields, 15 newlines). + let string_to_sign = format!( + "{sp}\n{st}\n{se}\n{canon}\n{si}\n{sip}\n{spr}\n{sv}\n{sr}\n{snap}\n{enc}\n{rscc}\n{rscd}\n{rsce}\n{rscl}\n{rsct}", + sp = permissions, st = signed_start, se = signed_expiry, canon = canonical_resource, + si = "", sip = "", spr = signed_protocol, sv = SAS_VERSION, sr = signed_resource, + snap = "", enc = "", rscc = "", rscd = "", rsce = "", rscl = "", rsct = "", + ); + + let sig = hmac_sha256_b64(&key, &string_to_sign)?; + + Ok(vec![ + ("sv".into(), SAS_VERSION.into()), + ("sr".into(), signed_resource.into()), + ("sp".into(), permissions.into()), + ("se".into(), signed_expiry), + ("spr".into(), signed_protocol.into()), + ("sig".into(), sig), + ]) +} + +/// Build a SAS-scoped URL for a blob (or container when `blob` is empty). +pub fn build_sas_url( + resolved: &ResolvedAzure, + container: &str, + blob: &str, + resource: SasResource, + permissions: &str, +) -> Result { + let canonical = if blob.is_empty() { + format!("/blob/{}/{}", resolved.account_name, container) + } else { + format!("/blob/{}/{}/{}", resolved.account_name, container, blob) + }; + let pairs = build_service_sas(resolved, &canonical, resource, permissions)?; + + let base = if blob.is_empty() { + format!("{}/{}", resolved.blob_endpoint.trim_end_matches('/'), container) + } else { + format!("{}/{}/{}", resolved.blob_endpoint.trim_end_matches('/'), container, blob) + }; + + let mut url = Url::parse(&base).context("invalid blob endpoint/url")?; + { + let mut qp = url.query_pairs_mut(); + for (k, v) in pairs { qp.append_pair(&k, &v); } + } + Ok(url) +} + +/// Build an Account SAS query set (raw, un-encoded). +/// +/// Required because creating a *container* against Azurite cannot be authorized by a +/// container-scoped Service SAS (Azurite maps `Container_Create` to an empty required +/// permission and its check then always fails); an Account SAS is the supported path. +/// +/// `services` e.g. `"b"` (blob), `resource_types` e.g. `"c"` (container) / `"co"` (container+object), +/// `permissions` e.g. `"cw"` (create+write). +pub fn build_account_sas( + resolved: &ResolvedAzure, + services: &str, + resource_types: &str, + permissions: &str, +) -> Result> { + let key = STANDARD + .decode(&resolved.account_key) + .map_err(|_| anyhow!("account key is not valid base64"))?; + + let signed_start = String::new(); + let signed_expiry = (Utc::now() + Duration::hours(1)) + .format("%Y-%m-%dT%H:%M:%SZ").to_string(); + let signed_protocol = "https,http"; // Azurite is http + let signed_ip = String::new(); + let encryption_scope = String::new(); + + // Account SAS string-to-sign for sv >= 2020-12-06: + // account \n sp \n ss \n srt \n st \n se \n sip \n spr \n sv \n ses \n (trailing newline) + let string_to_sign = format!( + "{acc}\n{sp}\n{ss}\n{srt}\n{st}\n{se}\n{sip}\n{spr}\n{sv}\n{ses}\n", + acc = resolved.account_name, sp = permissions, ss = services, srt = resource_types, + st = signed_start, se = signed_expiry, sip = signed_ip, spr = signed_protocol, + sv = SAS_VERSION, ses = encryption_scope, + ); + + let sig = hmac_sha256_b64(&key, &string_to_sign)?; + + Ok(vec![ + ("sv".into(), SAS_VERSION.into()), + ("ss".into(), services.into()), + ("srt".into(), resource_types.into()), + ("sp".into(), permissions.into()), + ("se".into(), signed_expiry), + ("spr".into(), signed_protocol.into()), + ("sig".into(), sig), + ]) +} + +/// Build an Account-SAS-scoped URL for a container (used for container creation). +pub fn build_account_sas_container_url( + resolved: &ResolvedAzure, + container: &str, + services: &str, + resource_types: &str, + permissions: &str, +) -> Result { + let pairs = build_account_sas(resolved, services, resource_types, permissions)?; + let base = format!("{}/{}", resolved.blob_endpoint.trim_end_matches('/'), container); + let mut url = Url::parse(&base).context("invalid blob endpoint/url")?; + { + let mut qp = url.query_pairs_mut(); + for (k, v) in pairs { qp.append_pair(&k, &v); } + } + Ok(url) +} diff --git a/src/services/storage/providers/azure_blob/mod.rs b/src/services/storage/providers/azure_blob/mod.rs new file mode 100644 index 0000000..1630fab --- /dev/null +++ b/src/services/storage/providers/azure_blob/mod.rs @@ -0,0 +1 @@ +pub mod helpers; diff --git a/src/services/storage/providers/mod.rs b/src/services/storage/providers/mod.rs index 9dd23be..0cc0317 100644 --- a/src/services/storage/providers/mod.rs +++ b/src/services/storage/providers/mod.rs @@ -1,3 +1,4 @@ +pub mod azure_blob; pub mod google_drive; pub mod local; pub mod s3; diff --git a/src/tests/mod.rs b/src/tests/mod.rs index 168c8ef..585da2c 100644 --- a/src/tests/mod.rs +++ b/src/tests/mod.rs @@ -1,5 +1,6 @@ mod domain; mod services; +mod storage; mod utils; use once_cell::sync::Lazy; diff --git a/src/tests/storage/azure_blob.rs b/src/tests/storage/azure_blob.rs new file mode 100644 index 0000000..48ab863 --- /dev/null +++ b/src/tests/storage/azure_blob.rs @@ -0,0 +1,74 @@ +use crate::services::storage::providers::azure_blob::helpers::{ + ResolvedAzure, SasResource, build_account_sas_container_url, build_sas_url, +}; +use crate::tests::init_tracing_for_test; + +use azure_core::http::RequestContent; +use azure_storage_blob::clients::BlobClient; +use azure_storage_blob::models::BlockLookupList; +use bytes::Bytes; +use testcontainers::core::{IntoContainerPort, WaitFor}; +use testcontainers::runners::AsyncRunner; +use testcontainers::{GenericImage, ImageExt}; + +const AZURITE_ACCOUNT: &str = "devstoreaccount1"; +const AZURITE_KEY: &str = + "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="; + +async fn start_azurite() -> (testcontainers::ContainerAsync, ResolvedAzure) { + let container = GenericImage::new("mcr.microsoft.com/azure-storage/azurite", "latest") + .with_exposed_port(10000.tcp()) + // The current `latest` image logs (on stdout): + // "Azurite Blob service successfully listens on http://0.0.0.0:10000" + // Older builds phrased it "...is successfully listening"; this substring matches + // the wording the pulled image actually emits. + .with_wait_for(WaitFor::message_on_stdout( + "Azurite Blob service successfully listens on", + )) + // The GA SDK sends a very recent `x-ms-version`; Azurite 3.35 rejects unknown + // versions unless we tell it to skip that check. + .with_cmd(["azurite-blob", "--blobHost", "0.0.0.0", "--skipApiVersionCheck"]) + .start().await.unwrap(); + let port = container.get_host_port_ipv4(10000).await.unwrap(); + let resolved = ResolvedAzure { + account_name: AZURITE_ACCOUNT.to_string(), + account_key: AZURITE_KEY.to_string(), + blob_endpoint: format!("http://127.0.0.1:{port}/{AZURITE_ACCOUNT}"), + }; + (container, resolved) +} + +#[tokio::test] +async fn spike_sas_block_roundtrip_against_azurite() { + init_tracing_for_test(); + let (_container, resolved) = start_azurite().await; + let container = "portabase"; + let blob = "spike/hello.txt"; + + // Container creation must use an Account SAS (service=blob, resource-type=container, + // perms=create+write). Azurite cannot authorize container-create with a Service SAS. + let container_url = + build_account_sas_container_url(&resolved, container, "b", "c", "cw").unwrap(); + let container_client = + azure_storage_blob::clients::BlobContainerClient::new(container_url, None, None).unwrap(); + container_client.create(None).await.unwrap(); + + let blob_url = build_sas_url(&resolved, container, blob, SasResource::Blob, "cw").unwrap(); + let blob_client = BlobClient::new(blob_url.clone(), None, None).unwrap(); + let bbc = blob_client.block_blob_client(); + + let payload = Bytes::from_static(b"hello azurite"); + let raw_id = format!("{:032}", 0u32).into_bytes(); + bbc.stage_block(&raw_id, payload.len() as u64, RequestContent::from(payload.to_vec()), None) + .await.unwrap(); + + // `BlockLookupList.latest` is `Option>>` and base64-encodes each entry + // internally during XML serialization, exactly as `stage_block` base64-encodes the + // `blockid` query. So `latest` must hold the SAME RAW id bytes passed to `stage_block`. + let block_list = BlockLookupList { latest: Some(vec![raw_id.clone()]), ..Default::default() }; + bbc.commit_block_list(block_list.try_into().unwrap(), None).await.unwrap(); + + let read_url = build_sas_url(&resolved, container, blob, SasResource::Blob, "r").unwrap(); + let read_client = BlobClient::new(read_url, None, None).unwrap(); + assert!(read_client.exists().await.unwrap()); +} diff --git a/src/tests/storage/mod.rs b/src/tests/storage/mod.rs new file mode 100644 index 0000000..48816a4 --- /dev/null +++ b/src/tests/storage/mod.rs @@ -0,0 +1 @@ +mod azure_blob; From 221ed4e7e19f5d7191c57c6892b05bb47886ba50 Mon Sep 17 00:00:00 2001 From: Charles GTE Date: Sat, 20 Jun 2026 19:38:00 +0200 Subject: [PATCH 3/4] feat: add azure blob storage support --- src/services/backup/uploader.rs | 22 +++ src/services/storage/mod.rs | 2 + .../storage/providers/azure_blob/helpers.rs | 141 ++++++++++-------- .../storage/providers/azure_blob/mod.rs | 133 +++++++++++++++++ .../storage/providers/azure_blob/models.rs | 64 ++++++++ src/tests/services/backup_uploader_tests.rs | 95 ++++++++++++ src/tests/services/mod.rs | 1 + src/tests/storage/azure_blob.rs | 107 ++++++++++++- 8 files changed, 501 insertions(+), 64 deletions(-) create mode 100644 src/services/storage/providers/azure_blob/models.rs create mode 100644 src/tests/services/backup_uploader_tests.rs diff --git a/src/services/backup/uploader.rs b/src/services/backup/uploader.rs index 116bf8b..78fa382 100644 --- a/src/services/backup/uploader.rs +++ b/src/services/backup/uploader.rs @@ -115,6 +115,28 @@ impl BackupService { storage_id, upload_result.error.as_deref().unwrap_or("unknown error") )); + + // `backup_upload_init` opened a per-storage record; close it as "failed" + // so the server is notified of the failure (no path/size on this path). + if let Err(err) = ctx_clone + .api + .backup_upload_status( + ctx_clone.edge_key.agent_id.clone(), + generated_id.clone(), + backup_storage_id.clone(), + status, + String::new(), + 0u64, + backup_id, + ) + .await + { + logger_clone.log("error", format!( + "Failed-status update failed for {}: {}", + storage_id, err + )); + } + return upload_result; } diff --git a/src/services/storage/mod.rs b/src/services/storage/mod.rs index 02d3aaf..0fd70a1 100644 --- a/src/services/storage/mod.rs +++ b/src/services/storage/mod.rs @@ -5,6 +5,7 @@ use crate::services::api::models::agent::status::DatabaseStorage; use crate::services::backup::models::{BackupResult, UploadResult}; use crate::utils::common::BackupMethod; use async_trait::async_trait; +use providers::azure_blob; use providers::google_drive; use providers::local; use providers::s3; @@ -31,6 +32,7 @@ pub fn get_provider(storage: &DatabaseStorage) -> Option Some(Box::new(local::LocalProvider {})), "s3" => Some(Box::new(s3::S3Provider {})), + "blob" => Some(Box::new(azure_blob::AzureBlobProvider {})), "google-drive" => Some(Box::new(google_drive::GoogleDriveProvider {})), _ => { error!("Unknown storage provider: {}", storage.provider); diff --git a/src/services/storage/providers/azure_blob/helpers.rs b/src/services/storage/providers/azure_blob/helpers.rs index 8d796b5..8f94ada 100644 --- a/src/services/storage/providers/azure_blob/helpers.rs +++ b/src/services/storage/providers/azure_blob/helpers.rs @@ -6,18 +6,24 @@ use openssl::hash::MessageDigest; use openssl::pkey::PKey; use openssl::sign::Signer; use url::Url; +use azure_core::http::RequestContent; +use azure_storage_blob::clients::{BlobClient, BlockBlobClient}; +use azure_storage_blob::models::BlockLookupList; +use bytes::{Bytes, BytesMut}; +use futures::{Stream, StreamExt}; +use std::pin::Pin; +use tracing::info; #[derive(Debug, Clone)] pub struct ResolvedAzure { pub account_name: String, - pub account_key: String, // base64, as Azure presents it - pub blob_endpoint: String, // e.g. http://127.0.0.1:10000/devstoreaccount1 + pub account_key: String, + pub blob_endpoint: String, } #[derive(Clone, Copy)] pub enum SasResource { Blob, - // Container-scoped Service SAS is exercised by later tasks; kept here as part of the API. #[allow(dead_code)] Container, } @@ -28,9 +34,9 @@ impl SasResource { } } -const SAS_VERSION: &str = "2022-11-02"; +pub(crate) const SAS_VERSION: &str = "2022-11-02"; -fn hmac_sha256_b64(key: &[u8], data: &str) -> Result { +pub(crate) fn hmac_sha256_b64(key: &[u8], data: &str) -> Result { let pkey = PKey::hmac(key).context("hmac key")?; let mut signer = Signer::new(MessageDigest::sha256(), &pkey).context("signer")?; signer.update(data.as_bytes()).context("signer update")?; @@ -56,7 +62,6 @@ pub fn build_service_sas( let signed_protocol = "https,http"; // Azurite is http let signed_resource = resource.code(); - // Service SAS string-to-sign for sv >= 2020-12-06 (16 fields, 15 newlines). let string_to_sign = format!( "{sp}\n{st}\n{se}\n{canon}\n{si}\n{sip}\n{spr}\n{sv}\n{sr}\n{snap}\n{enc}\n{rscc}\n{rscd}\n{rsce}\n{rscl}\n{rsct}", sp = permissions, st = signed_start, se = signed_expiry, canon = canonical_resource, @@ -105,67 +110,77 @@ pub fn build_sas_url( Ok(url) } -/// Build an Account SAS query set (raw, un-encoded). -/// -/// Required because creating a *container* against Azurite cannot be authorized by a -/// container-scoped Service SAS (Azurite maps `Container_Create` to an empty required -/// permission and its check then always fails); an Account SAS is the supported path. +/// Default block size for the provider path (mirrors the S3 provider's PART_SIZE). +pub const BLOCK_SIZE: usize = 100 * 1024 * 1024; + +type ByteStream = Pin> + Send>>; + +/// Stage one block under a zero-padded sequential id; records the RAW id bytes. +async fn stage_block( + bbc: &BlockBlobClient, + index: u32, + block: Bytes, + block_ids: &mut Vec>, +) -> Result<()> { + let raw_id = format!("{index:032}").into_bytes(); + let len = block.len() as u64; + bbc.stage_block(&raw_id, len, RequestContent::from(block.to_vec()), None) + .await + .map_err(|e| anyhow!("stage_block {index} failed: {e}"))?; + block_ids.push(raw_id); + info!("staged azure block {index} ({len} bytes)"); + Ok(()) +} + +/// Stream `body` to `{container}/{blob}` using Azure block upload. Never buffers the +/// full payload: at most one `block_size` block plus one inbound chunk is resident +/// (mirrors the S3 provider's per-part guarantee). /// -/// `services` e.g. `"b"` (blob), `resource_types` e.g. `"c"` (container) / `"co"` (container+object), -/// `permissions` e.g. `"cw"` (create+write). -pub fn build_account_sas( +/// Assumes the container already exists — S3-faithful, no container creation. Uncommitted +/// blocks are garbage-collected by Azure if `commit_block_list` is never reached, so no +/// explicit abort is needed on the error path (unlike S3 multipart). +pub async fn upload_stream_to_azure( resolved: &ResolvedAzure, - services: &str, - resource_types: &str, - permissions: &str, -) -> Result> { - let key = STANDARD - .decode(&resolved.account_key) - .map_err(|_| anyhow!("account key is not valid base64"))?; - - let signed_start = String::new(); - let signed_expiry = (Utc::now() + Duration::hours(1)) - .format("%Y-%m-%dT%H:%M:%SZ").to_string(); - let signed_protocol = "https,http"; // Azurite is http - let signed_ip = String::new(); - let encryption_scope = String::new(); + container: &str, + blob: &str, + mut body: ByteStream, + block_size: usize, +) -> Result<()> { + let url = build_sas_url(resolved, container, blob, SasResource::Blob, "cw")?; + let blob_client = BlobClient::new(url, None, None).context("blob client")?; + let bbc = blob_client.block_blob_client(); + + let mut buffer = BytesMut::with_capacity(block_size); + let mut block_ids: Vec> = Vec::new(); + let mut index: u32 = 0; + + while let Some(item) = body.next().await { + let bytes = item.context("stream error during upload")?; + buffer.extend_from_slice(&bytes); + + while buffer.len() >= block_size { + let block = buffer.split_to(block_size).freeze(); + stage_block(&bbc, index, block, &mut block_ids).await?; + index += 1; + } + } - // Account SAS string-to-sign for sv >= 2020-12-06: - // account \n sp \n ss \n srt \n st \n se \n sip \n spr \n sv \n ses \n (trailing newline) - let string_to_sign = format!( - "{acc}\n{sp}\n{ss}\n{srt}\n{st}\n{se}\n{sip}\n{spr}\n{sv}\n{ses}\n", - acc = resolved.account_name, sp = permissions, ss = services, srt = resource_types, - st = signed_start, se = signed_expiry, sip = signed_ip, spr = signed_protocol, - sv = SAS_VERSION, ses = encryption_scope, - ); + if !buffer.is_empty() { + let block = buffer.split().freeze(); + stage_block(&bbc, index, block, &mut block_ids).await?; + } - let sig = hmac_sha256_b64(&key, &string_to_sign)?; + if block_ids.is_empty() { + stage_block(&bbc, 0, Bytes::new(), &mut block_ids).await?; + } - Ok(vec![ - ("sv".into(), SAS_VERSION.into()), - ("ss".into(), services.into()), - ("srt".into(), resource_types.into()), - ("sp".into(), permissions.into()), - ("se".into(), signed_expiry), - ("spr".into(), signed_protocol.into()), - ("sig".into(), sig), - ]) -} + let block_list = BlockLookupList { + latest: Some(block_ids), + ..Default::default() + }; + bbc.commit_block_list(block_list.try_into()?, None) + .await + .map_err(|e| anyhow!("commit_block_list failed: {e}"))?; -/// Build an Account-SAS-scoped URL for a container (used for container creation). -pub fn build_account_sas_container_url( - resolved: &ResolvedAzure, - container: &str, - services: &str, - resource_types: &str, - permissions: &str, -) -> Result { - let pairs = build_account_sas(resolved, services, resource_types, permissions)?; - let base = format!("{}/{}", resolved.blob_endpoint.trim_end_matches('/'), container); - let mut url = Url::parse(&base).context("invalid blob endpoint/url")?; - { - let mut qp = url.query_pairs_mut(); - for (k, v) in pairs { qp.append_pair(&k, &v); } - } - Ok(url) + Ok(()) } diff --git a/src/services/storage/providers/azure_blob/mod.rs b/src/services/storage/providers/azure_blob/mod.rs index 1630fab..1f9a575 100644 --- a/src/services/storage/providers/azure_blob/mod.rs +++ b/src/services/storage/providers/azure_blob/mod.rs @@ -1 +1,134 @@ pub mod helpers; +mod models; + +use crate::core::context::Context; +use crate::services::api::models::agent::status::DatabaseStorage; +use crate::services::backup::models::{BackupResult, UploadResult}; +use crate::services::storage::StorageProvider; +use crate::services::storage::providers::azure_blob::helpers::{BLOCK_SIZE, upload_stream_to_azure}; +use crate::services::storage::providers::azure_blob::models::AzureBlobProviderConfig; +use crate::utils::common::BackupMethod; +use crate::utils::file::{full_file_name, full_file_path}; +use crate::utils::stream::build_stream; +use async_trait::async_trait; +use std::sync::Arc; +use tokio::fs; +use tracing::{error, info}; + +pub struct AzureBlobProvider {} + +#[async_trait] +impl StorageProvider for AzureBlobProvider { + async fn upload( + &self, + ctx: Arc, + result: BackupResult, + _method: BackupMethod, + storage: &DatabaseStorage, + encrypt: Option, + ) -> UploadResult { + let Some(file_path) = result.backup_file else { + return UploadResult { + storage_id: storage.id.clone(), + success: false, + error: Some("Missing backup file path".to_string()), + remote_file_path: None, + total_size: None, + }; + }; + + let total_size = match fs::metadata(&file_path).await { + Ok(meta) => meta.len(), + Err(e) => { + error!("Failed to get file size: {}", e); + return UploadResult { + storage_id: storage.id.clone(), + success: false, + error: Some(e.to_string()), + remote_file_path: None, + total_size: None, + }; + } + }; + + let encrypt = encrypt.unwrap_or(false); + + let upload = match build_stream(&file_path, encrypt, &ctx.edge_key.master_key_b64).await { + Ok(u) => u, + Err(e) => { + error!("Stream build failed: {}", e); + return UploadResult { + storage_id: storage.id.clone(), + success: false, + error: Some(e.to_string()), + remote_file_path: None, + total_size: None, + }; + } + }; + + let config: AzureBlobProviderConfig = match storage.clone().config.try_into() { + Ok(c) => c, + Err(e) => { + return UploadResult { + storage_id: storage.id.clone(), + success: false, + error: Some(e.to_string()), + remote_file_path: None, + total_size: None, + }; + } + }; + + let resolved = match config.resolve() { + Ok(r) => r, + Err(e) => { + return UploadResult { + storage_id: storage.id.clone(), + success: false, + error: Some(e.to_string()), + remote_file_path: None, + total_size: None, + }; + } + }; + + let file_name = full_file_name(encrypt); + let remote_file_path = full_file_path(&file_name); + info!( + "Starting block upload to azure blob {}/{}", + config.container_name, remote_file_path + ); + + match upload_stream_to_azure( + &resolved, + &config.container_name, + &remote_file_path, + upload.stream, + BLOCK_SIZE, + ) + .await + { + Ok(_) => { + info!("Azure blob upload successful: {}", remote_file_path); + UploadResult { + storage_id: storage.id.clone(), + success: true, + error: None, + remote_file_path: Some(remote_file_path), + total_size: Some(total_size), + } + } + Err(e) => { + error!("Azure blob upload failed: {:?}", e); + UploadResult { + storage_id: storage.id.clone(), + success: false, + error: Some(e.to_string()), + remote_file_path: None, + total_size: None, + } + } + } + } +} diff --git a/src/services/storage/providers/azure_blob/models.rs b/src/services/storage/providers/azure_blob/models.rs new file mode 100644 index 0000000..c239bc4 --- /dev/null +++ b/src/services/storage/providers/azure_blob/models.rs @@ -0,0 +1,64 @@ +use crate::services::storage::providers::azure_blob::helpers::ResolvedAzure; +use anyhow::{Result, anyhow}; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Deserialize, Serialize)] +pub struct AzureBlobProviderConfig { + pub account_name: String, + pub account_key: String, + pub container_name: String, + #[serde(default)] + pub connection_string: String, + #[serde(default)] + pub endpoint_url: Option, +} + +fn parse_connection_string(cs: &str) -> std::collections::HashMap { + cs.split(';') + .filter(|s| !s.trim().is_empty()) + .filter_map(|pair| { + let mut it = pair.splitn(2, '='); + let k = it.next()?.trim().to_string(); + let v = it.next()?.trim().to_string(); + Some((k, v)) + }) + .collect() +} + +impl AzureBlobProviderConfig { + /// Resolve effective connection params, preferring the connection string when non-empty. + pub fn resolve(&self) -> Result { + if !self.connection_string.trim().is_empty() { + let map = parse_connection_string(&self.connection_string); + let account_name = map + .get("AccountName") + .cloned() + .unwrap_or_else(|| self.account_name.clone()); + let account_key = map + .get("AccountKey") + .cloned() + .unwrap_or_else(|| self.account_key.clone()); + let blob_endpoint = map + .get("BlobEndpoint") + .cloned() + .ok_or_else(|| anyhow!("connection string missing BlobEndpoint"))?; + return Ok(ResolvedAzure { + account_name, + account_key, + blob_endpoint, + }); + } + + let blob_endpoint = self + .endpoint_url + .clone() + .filter(|s| !s.trim().is_empty()) + .ok_or_else(|| anyhow!("endpointUrl required when connectionString is empty"))?; + + Ok(ResolvedAzure { + account_name: self.account_name.clone(), + account_key: self.account_key.clone(), + blob_endpoint, + }) + } +} diff --git a/src/tests/services/backup_uploader_tests.rs b/src/tests/services/backup_uploader_tests.rs new file mode 100644 index 0000000..886f578 --- /dev/null +++ b/src/tests/services/backup_uploader_tests.rs @@ -0,0 +1,95 @@ +//! Regression test: a per-storage upload failure must be reported to the server via +//! `backup_upload_status("failed", ...)`. Previously the uploader early-returned on failure +//! and skipped the status call, so `backup_upload_init` opened a record that was never closed. + +use crate::core::context::Context; +use crate::services::api::ApiClient; +use crate::services::api::models::agent::status::DatabaseStorage; +use crate::services::backup::BackupService; +use crate::services::backup::logger::JobLogger; +use crate::services::backup::models::BackupResult; +use crate::services::config::DbType; +use crate::tests::init_tracing_for_test; +use crate::utils::common::BackupMethod; +use crate::utils::edge_key::EdgeKey; + +use serde_json::json; +use std::sync::Arc; +use wiremock::matchers::{body_partial_json, method, path}; +use wiremock::{Mock, MockServer, ResponseTemplate}; + +fn ctx_pointing_at(base_url: String) -> Context { + Context { + edge_key: EdgeKey { + server_url: String::new(), + agent_id: "agent-1".to_string(), + master_key_b64: String::new(), + }, + api: ApiClient::new(base_url), + } +} + +#[tokio::test] +async fn failed_upload_reports_failed_status_to_server() { + init_tracing_for_test(); + let server = MockServer::start().await; + + // init opens the per-storage record and returns its id. + Mock::given(method("POST")) + .and(path("/agent/agent-1/backup/upload/init")) + .respond_with(ResponseTemplate::new(200).set_body_json(json!({ + "message": "ok", + "backupStorage": { "id": "bs-1" } + }))) + .expect(1) + .mount(&server) + .await; + + // The fix: on failure the uploader must PATCH the status as "failed". + Mock::given(method("PATCH")) + .and(path("/agent/agent-1/backup/upload/status")) + .and(body_partial_json(json!({ "status": "failed" }))) + .respond_with(ResponseTemplate::new(200).set_body_json(json!({}))) + .expect(1) + .mount(&server) + .await; + + let service = BackupService::new(Arc::new(ctx_pointing_at(server.uri()))); + + // backup_file = None makes the provider fail immediately ("Missing backup file path"), + // exercising the failure path without any network/Azure dependency. + let result = BackupResult { + generated_id: "gen-1".to_string(), + db_type: DbType::Postgresql, + status: "success".to_string(), + backup_file: None, + code: None, + }; + + let storage: DatabaseStorage = serde_json::from_value(json!({ + "id": "storage-1", + "provider": "blob", + "config": {} + })) + .unwrap(); + + let backup_id = "backup-1".to_string(); + let logger = Arc::new(JobLogger::new()); + + let results = service + .upload( + result, + BackupMethod::Manual, + vec![storage], + false, + &backup_id, + logger, + ) + .await + .unwrap(); + + assert_eq!(results.len(), 1); + assert!(!results[0].success); + + // MockServer drop verifies both `.expect(1)` mounts were hit — including the "failed" PATCH. +} diff --git a/src/tests/services/mod.rs b/src/tests/services/mod.rs index 4a12318..1780d61 100644 --- a/src/tests/services/mod.rs +++ b/src/tests/services/mod.rs @@ -1 +1,2 @@ mod api_models_tests; +mod backup_uploader_tests; diff --git a/src/tests/storage/azure_blob.rs b/src/tests/storage/azure_blob.rs index 48ab863..4217c94 100644 --- a/src/tests/storage/azure_blob.rs +++ b/src/tests/storage/azure_blob.rs @@ -1,15 +1,83 @@ use crate::services::storage::providers::azure_blob::helpers::{ - ResolvedAzure, SasResource, build_account_sas_container_url, build_sas_url, + ResolvedAzure, SAS_VERSION, SasResource, build_sas_url, hmac_sha256_b64, }; use crate::tests::init_tracing_for_test; +use anyhow::{Context as _, anyhow}; use azure_core::http::RequestContent; use azure_storage_blob::clients::BlobClient; use azure_storage_blob::models::BlockLookupList; +use base64::Engine; +use base64::engine::general_purpose::STANDARD; use bytes::Bytes; +use chrono::{Duration, Utc}; use testcontainers::core::{IntoContainerPort, WaitFor}; use testcontainers::runners::AsyncRunner; use testcontainers::{GenericImage, ImageExt}; +use url::Url; + +/// Build an Account SAS query set (test-only). Azurite cannot authorize container-create with +/// a container-scoped Service SAS, so tests create the target container with an Account SAS. +/// Reuses the production HMAC primitive (`hmac_sha256_b64`) to avoid duplicating signing logic. +fn build_account_sas( + resolved: &ResolvedAzure, + services: &str, + resource_types: &str, + permissions: &str, +) -> anyhow::Result> { + let key = STANDARD + .decode(&resolved.account_key) + .map_err(|_| anyhow!("account key is not valid base64"))?; + + let signed_start = String::new(); + let signed_expiry = (Utc::now() + Duration::hours(1)) + .format("%Y-%m-%dT%H:%M:%SZ") + .to_string(); + let signed_protocol = "https,http"; // Azurite is http + let signed_ip = String::new(); + let encryption_scope = String::new(); + + // Account SAS string-to-sign for sv >= 2020-12-06: + // account \n sp \n ss \n srt \n st \n se \n sip \n spr \n sv \n ses \n (trailing newline) + let string_to_sign = format!( + "{acc}\n{sp}\n{ss}\n{srt}\n{st}\n{se}\n{sip}\n{spr}\n{sv}\n{ses}\n", + acc = resolved.account_name, sp = permissions, ss = services, srt = resource_types, + st = signed_start, se = signed_expiry, sip = signed_ip, spr = signed_protocol, + sv = SAS_VERSION, ses = encryption_scope, + ); + + let sig = hmac_sha256_b64(&key, &string_to_sign)?; + + Ok(vec![ + ("sv".into(), SAS_VERSION.into()), + ("ss".into(), services.into()), + ("srt".into(), resource_types.into()), + ("sp".into(), permissions.into()), + ("se".into(), signed_expiry), + ("spr".into(), signed_protocol.into()), + ("sig".into(), sig), + ]) +} + +/// Build an Account-SAS-scoped URL for a container (test-only container creation). +fn build_account_sas_container_url( + resolved: &ResolvedAzure, + container: &str, + services: &str, + resource_types: &str, + permissions: &str, +) -> anyhow::Result { + let pairs = build_account_sas(resolved, services, resource_types, permissions)?; + let base = format!("{}/{}", resolved.blob_endpoint.trim_end_matches('/'), container); + let mut url = Url::parse(&base).context("invalid blob endpoint/url")?; + { + let mut qp = url.query_pairs_mut(); + for (k, v) in pairs { + qp.append_pair(&k, &v); + } + } + Ok(url) +} const AZURITE_ACCOUNT: &str = "devstoreaccount1"; const AZURITE_KEY: &str = @@ -72,3 +140,40 @@ async fn spike_sas_block_roundtrip_against_azurite() { let read_client = BlobClient::new(read_url, None, None).unwrap(); assert!(read_client.exists().await.unwrap()); } + +#[tokio::test] +async fn upload_stream_multi_block_roundtrip() { + init_tracing_for_test(); + use crate::services::storage::providers::azure_blob::helpers::upload_stream_to_azure; + use futures::stream; + + let (_container, resolved) = start_azurite().await; + let container = "portabase"; + let blob = "backups/multi.bin"; + + // Container setup (provider itself never creates it): Account SAS create. + let container_url = + build_account_sas_container_url(&resolved, container, "b", "c", "cw").unwrap(); + azure_storage_blob::clients::BlobContainerClient::new(container_url, None, None) + .unwrap() + .create(None) + .await + .unwrap(); + + // 10 KiB fed as 1 KiB chunks, forced into 4 KiB blocks => 3 blocks (multi-block path). + let data = vec![7u8; 10 * 1024]; + let chunks: Vec> = data + .chunks(1024) + .map(|c| Ok(Bytes::copy_from_slice(c))) + .collect(); + let body = Box::pin(stream::iter(chunks)); + + upload_stream_to_azure(&resolved, container, blob, body, 4 * 1024) + .await + .unwrap(); + + // Verify the committed blob reassembles to the exact source bytes via a read-SAS GET. + let read_url = build_sas_url(&resolved, container, blob, SasResource::Blob, "r").unwrap(); + let got = reqwest::get(read_url).await.unwrap().bytes().await.unwrap(); + assert_eq!(got.as_ref(), data.as_slice()); +} From 66cad4e12e8211b484a911d71a4349a95fc06c67 Mon Sep 17 00:00:00 2001 From: Charles GTE Date: Sun, 21 Jun 2026 16:22:51 +0200 Subject: [PATCH 4/4] fix: az blog storage tests --- src/tests/storage/azure_blob.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/tests/storage/azure_blob.rs b/src/tests/storage/azure_blob.rs index 4217c94..7409858 100644 --- a/src/tests/storage/azure_blob.rs +++ b/src/tests/storage/azure_blob.rs @@ -97,11 +97,16 @@ async fn start_azurite() -> (testcontainers::ContainerAsync, Resol // versions unless we tell it to skip that check. .with_cmd(["azurite-blob", "--blobHost", "0.0.0.0", "--skipApiVersionCheck"]) .start().await.unwrap(); + // Use the testcontainers-resolved host (not a hardcoded 127.0.0.1): under + // docker-out-of-docker / remote daemons the published port is not on the test + // process's loopback. All other container tests (mssql, valkey, postgres, ...) + // already do this; azure_blob was the only one hardcoding the host. + let host = container.get_host().await.unwrap().to_string(); let port = container.get_host_port_ipv4(10000).await.unwrap(); let resolved = ResolvedAzure { account_name: AZURITE_ACCOUNT.to_string(), account_key: AZURITE_KEY.to_string(), - blob_endpoint: format!("http://127.0.0.1:{port}/{AZURITE_ACCOUNT}"), + blob_endpoint: format!("http://{host}:{port}/{AZURITE_ACCOUNT}"), }; (container, resolved) }