diff --git a/Cargo.lock b/Cargo.lock index d46a27e..0b2569d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -82,7 +82,7 @@ checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" [[package]] name = "bitcoin_da_client" -version = "0.1.10" +version = "0.1.12" dependencies = [ "async-trait", "hex", diff --git a/Cargo.toml b/Cargo.toml index 4b76a50..5dac4f1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "bitcoin_da_client" -version = "0.1.11" +version = "0.1.12" edition = "2021" authors = ["SYS LABS sidhujag@syscoin.org"] description = "Tools for interacting with BitcoinDA by SYS LABS" diff --git a/src/lib.rs b/src/lib.rs index a09630c..2f7e52d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -14,6 +14,7 @@ const NEVM_DATA_SCALE_FACTOR: f64 = 0.01; /// Maximum payload accepted by the Syscoin PoDA endpoint (2 MiB). pub const MAX_BLOB_SIZE: usize = 2 * 1024 * 1024; +pub const MAX_BLOB_EXISTENCE_BATCH: usize = 32; /// Thread-safe error type pub type SyscoinError = Box; @@ -27,6 +28,7 @@ pub enum BitcoinDaFinalityMode { /// Response structure for JSON-RPC calls #[derive(Deserialize, Debug)] struct JsonRpcResponse { + id: Option, result: Option, error: Option, } @@ -145,6 +147,87 @@ impl RealRpcClient { .ok_or_else(|| "missing result in JSON-RPC response".into()) } + async fn rpc_batch_request( + &self, + calls: &[(&str, Vec)], + ) -> Result>, SyscoinError> { + if calls.is_empty() { + return Ok(Vec::new()); + } + + let request_body: Vec<_> = calls + .iter() + .enumerate() + .map(|(id, (method, params))| { + json!({ + "jsonrpc": "2.0", + "id": id, + "method": method, + "params": params, + }) + }) + .collect(); + + let resp = self + .http_client + .post(&self.rpc_url) + .basic_auth(&self.rpc_user, Some(&self.rpc_password)) + .json(&request_body) + .timeout(self.timeout) + .send() + .await?; + + let status = resp.status(); + let body = resp.text().await?; + info!("RPC batch → HTTP {}:\n{}", status, body); + + if !status.is_success() { + return Err(format!("HTTP error: {} returned body: {}", status, body).into()); + } + + let responses: Vec> = serde_json::from_str(&body)?; + let mut results = Vec::with_capacity(calls.len()); + results.resize_with(calls.len(), || None); + + for response in responses { + let Some(id) = response.id.and_then(|value| value.as_u64()) else { + return Err("missing id in JSON-RPC batch response".into()); + }; + let id = usize::try_from(id)?; + if id >= calls.len() { + return Err(format!("unexpected id {id} in JSON-RPC batch response").into()); + } + + let result = if let Some(err) = response.error { + Err(format!("RPC error: {}", err).into()) + } else { + response + .result + .ok_or_else(|| "missing result in JSON-RPC response".into()) + }; + results[id] = Some(result); + } + + let ordered_results = results + .into_iter() + .enumerate() + .map(|(id, result)| { + result.unwrap_or_else(|| Err(format!("missing response for batch id {id}").into())) + }) + .collect(); + Ok(ordered_results) + } + + async fn http_post_json(&self, url: &str, body: &Value) -> Result, SyscoinError> { + let response = self.http_client.post(url).json(body).send().await?; + + if !response.status().is_success() { + return Err(format!("HTTP POST error: {}", response.status()).into()); + } + + Ok(response.bytes().await?.to_vec()) + } + /// Like `rpc_request`, but points at `/wallet/{wallet_name}` on the node async fn wallet_rpc_request( &self, @@ -284,34 +367,132 @@ impl SyscoinClient { blob_id.strip_prefix("0x").unwrap_or(blob_id) } - fn poda_candidate_urls(&self, version_hash: &str) -> Vec { + fn poda_blob_url(&self, version_hash: &str) -> String { let normalized_hash = self.normalized_blob_id(version_hash); let base = self.poda_url.trim_end_matches('/'); - if base.ends_with("/blob") || base.ends_with("/vh") { - return vec![format!("{base}/{normalized_hash}")]; + if base.ends_with("/vh") { + return format!("{base}/{normalized_hash}"); } - vec![ - format!("{base}/blob/{normalized_hash}"), - format!("{base}/vh/{normalized_hash}"), - ] + format!("{base}/vh/{normalized_hash}") } - async fn blob_exists_in_cloud(&self, version_hash: &str) -> bool { - for url in self.poda_candidate_urls(version_hash) { - match self.rpc_client.http_get(&url).await { - Ok(_) => { - info!("PODA fallback located blob at {}", url); - return true; + fn poda_check_vh_batch_url(&self) -> String { + let mut base = self.poda_url.trim_end_matches('/'); + + for suffix in ["/vh", "/check_vh"] { + if let Some(stripped) = base.strip_suffix(suffix) { + base = stripped; + break; + } + } + + format!("{base}/check_vh") + } + + fn truthy_check_vh_response(response: &str) -> bool { + matches!(response.trim().to_ascii_lowercase().as_str(), "1" | "true") + } + + fn check_vh_value_exists(value: &Value) -> bool { + match value { + Value::Bool(exists) => *exists, + Value::Number(number) => number.as_u64().is_some_and(|value| value != 0), + Value::String(value) => Self::truthy_check_vh_response(value), + Value::Object(object) => { + if object.get("error").is_some_and(|error| !error.is_null()) { + return false; } - Err(err) => { - warn!("PODA fallback lookup failed at {}: {}", url, err); + + ["exists", "found", "available", "result"] + .into_iter() + .find_map(|key| object.get(key)) + .is_some_and(Self::check_vh_value_exists) + } + _ => false, + } + } + + fn check_vh_batch_response_exists( + bytes: &[u8], + version_hashes: &[String], + ) -> Result, SyscoinError> { + let value: Value = serde_json::from_slice(bytes)?; + let aggregate_result = + |value: &Value| vec![Self::check_vh_value_exists(value); version_hashes.len()]; + + match value { + Value::Array(values) => { + if values.len() != version_hashes.len() { + return Err(format!( + "check_vh batch response length mismatch: expected {}, got {}", + version_hashes.len(), + values.len() + ) + .into()); } + Ok(values.iter().map(Self::check_vh_value_exists).collect()) } + Value::Object(object) => { + for key in ["results", "result"] { + if let Some(value) = object.get(key) { + if let Value::Array(values) = value { + if values.len() != version_hashes.len() { + return Err(format!( + "check_vh batch response length mismatch: expected {}, got {}", + version_hashes.len(), + values.len() + ) + .into()); + } + return Ok(values.iter().map(Self::check_vh_value_exists).collect()); + } + return Ok(aggregate_result(value)); + } + } + + for key in ["exists", "found", "available"] { + if let Some(value) = object.get(key) { + return Ok(aggregate_result(value)); + } + } + + version_hashes + .iter() + .map(|version_hash| { + Ok(object + .get(version_hash) + .is_some_and(Self::check_vh_value_exists)) + }) + .collect() + } + other => Ok(aggregate_result(&other)), + } + } + + async fn blobs_exist_in_cloud( + &self, + version_hashes: &[String], + ) -> Result, SyscoinError> { + if version_hashes.is_empty() { + return Ok(Vec::new()); } - false + let url = self.poda_check_vh_batch_url(); + let body = json!(version_hashes); + let bytes = self.rpc_client.http_post_json(&url, &body).await?; + Self::check_vh_batch_response_exists(&bytes, version_hashes) + } + + async fn blob_exists_in_cloud(&self, version_hash: &str) -> bool { + match self.blobs_exist_in_cloud(&[version_hash.to_string()]).await { + Ok(mut exists) => exists.pop().unwrap_or(false), + Err(err) => { + warn!("PODA fallback check_vh lookup failed: {}", err); + false + } + } } /// Create a new Syscoin client @@ -524,16 +705,9 @@ impl SyscoinClient { /// Retrieve blob data from PODA cloud storage pub async fn get_blob_from_cloud(&self, version_hash: &str) -> Result, SyscoinError> { - let mut last_err: Option = None; - - for url in self.poda_candidate_urls(version_hash) { - match self.rpc_client.http_get(&url).await { - Ok(bytes) => return Ok(bytes), - Err(err) => last_err = Some(err), - } - } - - Err(last_err.unwrap_or_else(|| "failed to build PODA URL".into())) + self.rpc_client + .http_get(&self.poda_blob_url(version_hash)) + .await } /// Check whether a blob is retrievable from the Syscoin node or PODA cloud storage. @@ -541,21 +715,76 @@ impl SyscoinClient { /// This is an availability check only. It deliberately does not imply chainlock or /// confirmation finality. pub async fn blob_exists(&self, blob_id: &str) -> Result { - let actual_blob_id = blob_id.strip_prefix("0x").unwrap_or(blob_id); - let params = vec![json!(actual_blob_id)]; + self.blobs_exist([blob_id]) + .await? + .pop() + .ok_or_else(|| "missing blob existence result".into()) + } - match self.rpc_client.call("getnevmblobdata", ¶ms).await { - Ok(_) => Ok(true), - Err(e) => { - let msg = e.to_string(); - if msg.contains("Could not find blob information for versionhash") - || msg.contains("\"code\":-32602") - { - return Ok(self.blob_exists_in_cloud(actual_blob_id).await); + /// Check whether up to 32 blobs are retrievable from the Syscoin node or PODA cloud storage. + /// + /// The Syscoin node lookup is sent as a single JSON-RPC batch with `getdata=false`. + /// Any hashes missing from the node are checked with a single PODA `check_vh` batch call. + pub async fn blobs_exist(&self, blob_ids: I) -> Result, SyscoinError> + where + I: IntoIterator, + S: AsRef, + { + let actual_blob_ids: Vec = blob_ids + .into_iter() + .map(|blob_id| self.normalized_blob_id(blob_id.as_ref()).to_string()) + .collect(); + + if actual_blob_ids.len() > MAX_BLOB_EXISTENCE_BATCH { + return Err(format!( + "blob existence batch exceeds maximum of {}: got {}", + MAX_BLOB_EXISTENCE_BATCH, + actual_blob_ids.len() + ) + .into()); + } + if actual_blob_ids.is_empty() { + return Ok(Vec::new()); + } + + let calls: Vec<_> = actual_blob_ids + .iter() + .map(|blob_id| ("getnevmblobdata", vec![json!(blob_id), json!(false)])) + .collect(); + let rpc_results = self.rpc_client.rpc_batch_request(&calls).await?; + + let mut exists = vec![false; actual_blob_ids.len()]; + let mut missing = Vec::new(); + for (idx, result) in rpc_results.into_iter().enumerate() { + match result { + Ok(_) => exists[idx] = true, + Err(err) => { + let msg = err.to_string(); + if msg.contains("Could not find blob information for versionhash") + || msg.contains("\"code\":-32602") + { + missing.push((idx, actual_blob_ids[idx].clone())); + } else { + return Err(err); + } } - Err(e) } } + + if missing.is_empty() { + return Ok(exists); + } + + let missing_hashes: Vec<_> = missing + .iter() + .map(|(_, version_hash)| version_hash.clone()) + .collect(); + let cloud_results = self.blobs_exist_in_cloud(&missing_hashes).await?; + for ((idx, _), cloud_exists) in missing.into_iter().zip(cloud_results) { + exists[idx] = cloud_exists; + } + + Ok(exists) } /// Check if a blob is final @@ -724,12 +953,26 @@ mod tests { #[tokio::test] async fn blob_exists_does_not_imply_finality() { let mut server = Server::new_async().await; - let _lookup = server + let _availability_lookup = server + .mock("POST", "/") + .match_body(mockito::Matcher::JsonString( + r#"[{"jsonrpc":"2.0","id":0,"method":"getnevmblobdata","params":["abc",false]}]"# + .to_string(), + )) + .with_status(200) + .with_body(r#"[{"jsonrpc":"2.0","id":0,"result":{"data":"00"},"error":null}]"#) + .expect(1) + .create_async() + .await; + let _finality_lookup = server .mock("POST", "/") - .match_body(mockito::Matcher::Any) + .match_body(mockito::Matcher::JsonString( + r#"{"jsonrpc":"2.0","id":1,"method":"getnevmblobdata","params":["abc"]}"# + .to_string(), + )) .with_status(200) .with_body(r#"{"jsonrpc":"2.0","id":1,"result":{"data":"00","chainlock":false}}"#) - .expect(2) + .expect(1) .create_async() .await; let client = SyscoinClient::new(&server.url(), "user", "password", &server.url(), None, "") diff --git a/tests/lib_test.rs b/tests/lib_test.rs index 9f006f2..b583ca5 100644 --- a/tests/lib_test.rs +++ b/tests/lib_test.rs @@ -114,7 +114,7 @@ mod tests { // Mock HTTP GET response let _m = mock_server - .mock("GET", format!("/blob/{}", version_hash).as_str()) + .mock("GET", format!("/vh/{}", version_hash).as_str()) .with_status(200) .with_body(&expected_data) .create(); @@ -145,7 +145,7 @@ mod tests { } #[tokio::test] - async fn test_get_blob_from_cloud_falls_back_to_vh() { + async fn test_get_blob_from_cloud_uses_vh() { let mut mock_server = std::thread::spawn(|| Server::new()) .join() .expect("Failed to create mock server"); @@ -153,11 +153,6 @@ mod tests { let expected_data = Vec::new(); let version_hash = "deadbeef"; - mock_server - .mock("GET", format!("/blob/{}", version_hash).as_str()) - .with_status(404) - .create(); - mock_server .mock("GET", format!("/vh/{}", version_hash).as_str()) .with_status(200) @@ -179,6 +174,335 @@ mod tests { assert_eq!(result.unwrap(), expected_data); } + #[tokio::test] + async fn test_blob_exists_uses_metadata_only_rpc() { + let mut mock_server = std::thread::spawn(|| Server::new()) + .join() + .expect("Failed to create mock server"); + + let blob_id = "deadbeef"; + let mock_response = json!([ + { + "jsonrpc": "2.0", + "id": 0, + "result": { + "versionhash": blob_id + }, + "error": null + } + ]); + + mock_server + .mock("POST", "/") + .match_body(mockito::Matcher::JsonString( + r#"[{"jsonrpc":"2.0","id":0,"method":"getnevmblobdata","params":["deadbeef",false]}]"# + .to_string(), + )) + .with_status(200) + .with_header("content-type", "application/json") + .with_body(mock_response.to_string()) + .create(); + + let client = SyscoinClient::new( + &mock_server.url(), + "user", + "password", + &mock_server.url(), + None, + "test_wallet", + ) + .unwrap(); + + assert!(client.blob_exists("0xdeadbeef").await.unwrap()); + } + + #[tokio::test] + async fn test_blob_exists_cloud_fallback_uses_check_vh() { + let mut mock_server = std::thread::spawn(|| Server::new()) + .join() + .expect("Failed to create mock server"); + + let blob_id = "deadbeef"; + let not_found_response = json!([ + { + "jsonrpc": "2.0", + "id": 0, + "result": null, + "error": { + "code": -32602, + "message": format!("Could not find blob information for versionhash {}", blob_id) + } + } + ]); + + mock_server + .mock("POST", "/") + .match_body(mockito::Matcher::Regex("getnevmblobdata".into())) + .with_status(200) + .with_header("content-type", "application/json") + .with_body(not_found_response.to_string()) + .create(); + + mock_server + .mock("POST", "/check_vh") + .match_body(mockito::Matcher::JsonString(format!(r#"["{blob_id}"]"#))) + .with_status(200) + .with_header("content-type", "application/json") + .with_body(json!([true]).to_string()) + .create(); + + let poda_url = format!("{}/vh", mock_server.url()); + let client = SyscoinClient::new( + &mock_server.url(), + "user", + "password", + &poda_url, + None, + "test_wallet", + ) + .unwrap(); + + assert!(client.blob_exists(blob_id).await.unwrap()); + } + + #[tokio::test] + async fn test_blobs_exist_batches_rpc_and_check_vh_fallback() { + let mut mock_server = std::thread::spawn(|| Server::new()) + .join() + .expect("Failed to create mock server"); + + mock_server + .mock("POST", "/") + .match_body(mockito::Matcher::Regex("getnevmblobdata".into())) + .with_status(200) + .with_header("content-type", "application/json") + .with_body( + json!([ + { + "jsonrpc": "2.0", + "id": 0, + "result": { + "versionhash": "aaa" + }, + "error": null + }, + { + "jsonrpc": "2.0", + "id": 1, + "result": null, + "error": { + "code": -32602, + "message": "Could not find blob information for versionhash bbb" + } + } + ]) + .to_string(), + ) + .expect(1) + .create(); + + mock_server + .mock("POST", "/check_vh") + .match_body(mockito::Matcher::JsonString(r#"["bbb"]"#.to_string())) + .with_status(200) + .with_header("content-type", "application/json") + .with_body(json!([true]).to_string()) + .expect(1) + .create(); + + let client = SyscoinClient::new( + &mock_server.url(), + "user", + "password", + &mock_server.url(), + None, + "test_wallet", + ) + .unwrap(); + + let result = client.blobs_exist(["0xaaa", "bbb"]).await.unwrap(); + assert_eq!(result, vec![true, true]); + } + + #[tokio::test] + async fn test_blobs_exist_accepts_aggregate_check_vh_response() { + let mut mock_server = std::thread::spawn(|| Server::new()) + .join() + .expect("Failed to create mock server"); + + mock_server + .mock("POST", "/") + .match_body(mockito::Matcher::Regex("getnevmblobdata".into())) + .with_status(200) + .with_header("content-type", "application/json") + .with_body( + json!([ + { + "jsonrpc": "2.0", + "id": 0, + "result": null, + "error": { + "code": -32602, + "message": "Could not find blob information for versionhash aaa" + } + }, + { + "jsonrpc": "2.0", + "id": 1, + "result": null, + "error": { + "code": -32602, + "message": "Could not find blob information for versionhash bbb" + } + } + ]) + .to_string(), + ) + .expect(1) + .create(); + + mock_server + .mock("POST", "/check_vh") + .match_body(mockito::Matcher::JsonString(r#"["aaa","bbb"]"#.to_string())) + .with_status(200) + .with_header("content-type", "application/json") + .with_body(json!(false).to_string()) + .expect(1) + .create(); + + let client = SyscoinClient::new( + &mock_server.url(), + "user", + "password", + &mock_server.url(), + None, + "test_wallet", + ) + .unwrap(); + + let result = client.blobs_exist(["aaa", "bbb"]).await.unwrap(); + assert_eq!(result, vec![false, false]); + } + + #[tokio::test] + async fn test_blobs_exist_treats_unknown_check_vh_response_as_unavailable() { + let mut mock_server = std::thread::spawn(|| Server::new()) + .join() + .expect("Failed to create mock server"); + + mock_server + .mock("POST", "/") + .match_body(mockito::Matcher::Regex("getnevmblobdata".into())) + .with_status(200) + .with_header("content-type", "application/json") + .with_body( + json!([ + { + "jsonrpc": "2.0", + "id": 0, + "result": null, + "error": { + "code": -32602, + "message": "Could not find blob information for versionhash aaa" + } + } + ]) + .to_string(), + ) + .expect(1) + .create(); + + mock_server + .mock("POST", "/check_vh") + .match_body(mockito::Matcher::JsonString(r#"["aaa"]"#.to_string())) + .with_status(200) + .with_header("content-type", "application/json") + .with_body(json!({ "unexpected": true }).to_string()) + .expect(1) + .create(); + + let client = SyscoinClient::new( + &mock_server.url(), + "user", + "password", + &mock_server.url(), + None, + "test_wallet", + ) + .unwrap(); + + let result = client.blobs_exist(["aaa"]).await.unwrap(); + assert_eq!(result, vec![false]); + } + + #[tokio::test] + async fn test_blobs_exist_treats_descriptive_check_vh_string_as_unavailable() { + let mut mock_server = std::thread::spawn(|| Server::new()) + .join() + .expect("Failed to create mock server"); + + mock_server + .mock("POST", "/") + .match_body(mockito::Matcher::Regex("getnevmblobdata".into())) + .with_status(200) + .with_header("content-type", "application/json") + .with_body( + json!([ + { + "jsonrpc": "2.0", + "id": 0, + "result": null, + "error": { + "code": -32602, + "message": "Could not find blob information for versionhash aaa" + } + } + ]) + .to_string(), + ) + .expect(1) + .create(); + + mock_server + .mock("POST", "/check_vh") + .match_body(mockito::Matcher::JsonString(r#"["aaa"]"#.to_string())) + .with_status(200) + .with_header("content-type", "application/json") + .with_body(json!("blob not found").to_string()) + .expect(1) + .create(); + + let client = SyscoinClient::new( + &mock_server.url(), + "user", + "password", + &mock_server.url(), + None, + "test_wallet", + ) + .unwrap(); + + let result = client.blobs_exist(["aaa"]).await.unwrap(); + assert_eq!(result, vec![false]); + } + + #[tokio::test] + async fn test_blobs_exist_rejects_more_than_32_hashes() { + let client = SyscoinClient::new( + "http://localhost:8888", + "user", + "password", + "http://poda.example.com", + None, + "test_wallet", + ) + .unwrap(); + + let hashes: Vec<_> = (0..33).map(|idx| format!("{idx:064x}")).collect(); + let result = client.blobs_exist(hashes.iter()).await; + assert!(result.is_err()); + } + #[tokio::test] async fn test_create_or_load_wallet() { // Create the mock server in a separate thread @@ -527,14 +851,11 @@ mod tests { .create(); mock_server - .mock("GET", format!("/blob/{}", blob_id).as_str()) - .with_status(404) - .create(); - - mock_server - .mock("GET", format!("/vh/{}", blob_id).as_str()) + .mock("POST", "/check_vh") + .match_body(mockito::Matcher::JsonString(format!(r#"["{blob_id}"]"#))) .with_status(200) - .with_body("") + .with_header("content-type", "application/json") + .with_body(json!([true]).to_string()) .create(); let client = SyscoinClient::new( @@ -553,7 +874,6 @@ mod tests { result.unwrap(), "Expected PODA fallback to mark blob as final" ); - assert!(client.blob_exists(blob_id).await.unwrap()); } #[tokio::test] @@ -735,14 +1055,11 @@ mod tests { .create(); mock_server - .mock("GET", format!("/blob/{}", blob_id).as_str()) - .with_status(404) - .create(); - - mock_server - .mock("GET", format!("/vh/{}", blob_id).as_str()) + .mock("POST", "/check_vh") + .match_body(mockito::Matcher::JsonString(format!(r#"["{blob_id}"]"#))) .with_status(200) - .with_body("") + .with_header("content-type", "application/json") + .with_body(json!([true]).to_string()) .create(); let client = SyscoinClient::new( @@ -763,7 +1080,6 @@ mod tests { result.unwrap(), "Expected PODA fallback to mark blob as final" ); - assert!(client.blob_exists(blob_id).await.unwrap()); } #[tokio::test]