Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
187 changes: 138 additions & 49 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,24 @@ pub enum BitcoinDaFinalityMode {
Confirmations,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum BlobFinalityState {
Finalized,
Confirmed { confirmations: Option<u64> },
Unconfirmed,
Missing,
}

impl BlobFinalityState {
pub fn is_final(self) -> bool {
matches!(self, Self::Finalized)
}

pub fn should_republish_after_timeout(self) -> bool {
matches!(self, Self::Unconfirmed | Self::Missing)
}
}

/// Response structure for JSON-RPC calls
#[derive(Deserialize, Debug)]
struct JsonRpcResponse<T> {
Expand Down Expand Up @@ -285,7 +303,7 @@ impl RealRpcClient {
pub async fn create_or_load_wallet(&self, wallet_name: &str) -> Result<(), SyscoinError> {
info!("create_or_load_wallet");
match self.call("loadwallet", &[json!(wallet_name)]).await {
Ok(_) => return Ok(()),
Ok(_) => Ok(()),
Err(e) => {
info!("wallet error");
let s = e.to_string();
Expand All @@ -294,15 +312,15 @@ impl RealRpcClient {
if s.contains("failed") {
info!("wallet not found, creating new one");
self.call("createwallet", &[json!(wallet_name)]).await?;
return Ok(());
}
// -4 = wallet already loaded → ignore
if s.contains("already loaded") {
Ok(())
} else if s.contains("already loaded") {
// -4 = wallet already loaded → ignore
info!("wallet already loaded, continuing");
return Ok(());
Ok(())
} else {
// any other error is fatal
Err(e)
}
// any other error is fatal
return Err(e);
}
}
}
Expand Down Expand Up @@ -367,6 +385,11 @@ impl SyscoinClient {
blob_id.strip_prefix("0x").unwrap_or(blob_id)
}

fn is_missing_blob_error(message: &str) -> bool {
message.contains("Could not find blob information for versionhash")
|| message.contains("\"code\":-32602")
}

fn poda_blob_url(&self, version_hash: &str) -> String {
let normalized_hash = self.normalized_blob_id(version_hash);
let base = self.poda_url.trim_end_matches('/');
Expand Down Expand Up @@ -514,8 +537,11 @@ impl SyscoinClient {
})
}

/// Create a blob in BitcoinDA(FKA Poda) storage
pub async fn create_blob(&self, data: &[u8]) -> Result<String, SyscoinError> {
async fn create_blob_with_overwrite(
&self,
data: &[u8],
overwrite_existing: bool,
) -> Result<String, SyscoinError> {
if data.len() > MAX_BLOB_SIZE {
return Err(format!(
"blob size ({}) exceeds maximum allowed ({})",
Expand All @@ -527,9 +553,8 @@ impl SyscoinClient {

let data_hex = hex::encode(data);
// pass positional args: data hex, overwrite_existing, hash type.
// Keep overwrite_existing=false to make repeated calls idempotent for identical data.
// Force blake2s to keep blob IDs aligned with Syscoin / OS expectations.
let params = vec![json!(data_hex), json!(false), json!("blake2s")];
let params = vec![json!(data_hex), json!(overwrite_existing), json!("blake2s")];
// SYSCOIN
let response = self
.rpc_client
Expand All @@ -542,6 +567,23 @@ impl SyscoinClient {
Ok(hash.to_string())
}

/// Create a blob in BitcoinDA(FKA Poda) storage.
///
/// Repeated calls are idempotent for identical data: if local metadata already
/// exists, Syscoin returns the existing version hash instead of publishing a
/// fresh transaction.
pub async fn create_blob(&self, data: &[u8]) -> Result<String, SyscoinError> {
self.create_blob_with_overwrite(data, false).await
}

/// Force-publish a blob even when local metadata for the same hash exists.
///
/// Use this for healing stale DA refs whose data is retrievable but whose
/// original Syscoin publication is not making confirmation/finality progress.
pub async fn force_create_blob(&self, data: &[u8]) -> Result<String, SyscoinError> {
self.create_blob_with_overwrite(data, true).await
}

/// Ensure there is a receive address for the provided label.
/// If none exists, a new address is created and returned.
pub async fn ensure_address_by_label(
Expand Down Expand Up @@ -787,50 +829,62 @@ impl SyscoinClient {
Ok(exists)
}

/// Check if a blob is final
pub async fn check_blob_finality(&self, blob_id: &str) -> Result<bool, SyscoinError> {
// Strip any 0x prefix
let actual_blob_id = if let Some(stripped) = blob_id.strip_prefix("0x") {
stripped
} else {
blob_id
};

async fn blob_finality_response(&self, blob_id: &str) -> Result<Option<Value>, SyscoinError> {
let actual_blob_id = self.normalized_blob_id(blob_id);
// Use positional parameter: (versionhash_or_txid: String)
let params = vec![json!(actual_blob_id)];

// If the node does not know the blob yet, it may return an HTTP 500 with
// a JSON-RPC error body like:
// {"result":null,"error":{"code":-32602,"message":"Could not find blob information for versionhash ..."},"id":1}
// Treat this as "not final yet" instead of a hard error so that the
// dispatcher keeps polling for finality.
let response = match self.rpc_client.call("getnevmblobdata", &params).await {
Ok(v) => v,
match self.rpc_client.call("getnevmblobdata", &params).await {
Ok(v) => Ok(Some(v)),
Err(e) => {
let msg = e.to_string();
if msg.contains("Could not find blob information for versionhash")
|| msg.contains("\"code\":-32602")
{
if self.blob_exists_in_cloud(actual_blob_id).await {
warn!(
"RPC finality lookup could not find blob {}; accepting PODA archive presence as finalized",
actual_blob_id
);
return Ok(true);
}
return Ok(false);
if Self::is_missing_blob_error(&msg) {
return Ok(None);
}
return Err(e);
Err(e)
}
}
}

/// Check if a blob is final
pub async fn check_blob_finality(&self, blob_id: &str) -> Result<bool, SyscoinError> {
Ok(self.blob_finality_state(blob_id).await?.is_final())
}

pub async fn blob_finality_state(
&self,
blob_id: &str,
) -> Result<BlobFinalityState, SyscoinError> {
let actual_blob_id = self.normalized_blob_id(blob_id);
let Some(response) = self.blob_finality_response(actual_blob_id).await? else {
return if self.blob_exists_in_cloud(actual_blob_id).await {
warn!(
"RPC finality lookup could not find blob {}; accepting PODA archive presence as finalized",
actual_blob_id
);
Ok(BlobFinalityState::Finalized)
} else {
Ok(BlobFinalityState::Missing)
};
};

// Extract finality status from response
let is_final = response
if response
.get("chainlock")
.and_then(|v| v.as_bool())
.unwrap_or(false);

Ok(is_final)
.unwrap_or(false)
{
return Ok(BlobFinalityState::Finalized);
}
if response.get("height").and_then(|v| v.as_u64()).is_none() {
return Ok(BlobFinalityState::Unconfirmed);
}
Ok(BlobFinalityState::Confirmed {
confirmations: None,
})
}

pub async fn check_blob_finality_with_mode(
Expand All @@ -848,30 +902,54 @@ impl SyscoinClient {
}
}

pub async fn blob_finality_state_with_mode(
&self,
blob_id: &str,
mode: BitcoinDaFinalityMode,
confirmations: u64,
) -> Result<BlobFinalityState, SyscoinError> {
match mode {
BitcoinDaFinalityMode::Chainlock => self.blob_finality_state(blob_id).await,
BitcoinDaFinalityMode::Confirmations => {
self.blob_confirmation_finality_state(blob_id, confirmations)
.await
}
}
}

/// Check if a blob is final based on a required number of confirmations.
pub async fn check_blob_finality_by_confirmations(
&self,
blob_id: &str,
confirmations: u64,
) -> Result<bool, SyscoinError> {
let actual_blob_id = blob_id.strip_prefix("0x").unwrap_or(blob_id);
Ok(self
.blob_confirmation_finality_state(blob_id, confirmations)
.await?
.is_final())
}

pub async fn blob_confirmation_finality_state(
&self,
blob_id: &str,
confirmations: u64,
) -> Result<BlobFinalityState, SyscoinError> {
let actual_blob_id = self.normalized_blob_id(blob_id);
let params = vec![json!(actual_blob_id)];

let response = match self.rpc_client.call("getnevmblobdata", &params).await {
Ok(v) => v,
Err(e) => {
let msg = e.to_string();
if msg.contains("Could not find blob information for versionhash")
|| msg.contains("\"code\":-32602")
{
if Self::is_missing_blob_error(&msg) {
if self.blob_exists_in_cloud(actual_blob_id).await {
warn!(
"RPC confirmation lookup could not find blob {}; accepting PODA archive presence as finalized",
actual_blob_id
);
return Ok(true);
return Ok(BlobFinalityState::Finalized);
}
return Ok(false);
return Ok(BlobFinalityState::Missing);
}
return Err(e);
}
Expand All @@ -880,17 +958,28 @@ impl SyscoinClient {
let Some(blob_height) = response.get("height").and_then(|v| v.as_u64()) else {
// Unconfirmed blobs may not have a mined height yet; treat this as "not final"
// so callers keep polling instead of crashing.
return Ok(false);
return Ok(BlobFinalityState::Unconfirmed);
};

let actual_confirmations = self.current_confirmations(blob_height).await?;
if actual_confirmations >= confirmations {
Ok(BlobFinalityState::Finalized)
} else {
Ok(BlobFinalityState::Confirmed {
confirmations: Some(actual_confirmations),
})
}
}

async fn current_confirmations(&self, blob_height: u64) -> Result<u64, SyscoinError> {
let current_height = self
.rpc_client
.call("getblockcount", &[])
.await?
.as_u64()
.ok_or("getblockcount returned non-u64 result")?;

Ok(current_height.saturating_sub(blob_height) + 1 >= confirmations)
Ok(current_height.saturating_sub(blob_height) + 1)
}

/// Create or load a wallet by name
Expand Down
Loading
Loading