diff --git a/s3/src/bucket.rs b/s3/src/bucket.rs index e341d299c1..c025445c03 100644 --- a/s3/src/bucket.rs +++ b/s3/src/bucket.rs @@ -1598,8 +1598,14 @@ impl Bucket { s3_path: &str, content_type: &str, ) -> Result { - self._put_object_stream_with_content_type_and_headers(reader, s3_path, content_type, None) - .await + self._put_object_stream_with_content_type_and_headers( + reader, + s3_path, + content_type, + None, + None, + ) + .await } /// Calculate the maximum number of concurrent chunks based on available memory. @@ -1642,6 +1648,7 @@ impl Bucket { s3_path: &str, content_type: &str, custom_headers: Option, + max_concurrent_chunks: Option, ) -> Result { // If the file is smaller CHUNK_SIZE, just do a regular upload. // Otherwise perform a multi-part upload. @@ -1675,9 +1682,117 @@ impl Bucket { let path = msg.key; let upload_id = &msg.upload_id; - // Determine max concurrent chunks based on available memory - let max_concurrent_chunks = Self::calculate_max_concurrent_chunks(); + // use configured max_concurrent_chunks or determine max concurrent chunks based on available memory + let max_concurrent_chunks = max_concurrent_chunks.map_or_else( + Self::calculate_max_concurrent_chunks, + std::num::NonZeroUsize::get, + ); + + let (total_size, mut etags) = if max_concurrent_chunks == 1 { + self._put_object_stream_chunks_sequential( + reader, + first_chunk, + &path, + upload_id, + content_type, + ) + .await? + } else { + self._put_object_stream_chunks_concurrent( + reader, + first_chunk, + &path, + upload_id, + content_type, + max_concurrent_chunks, + ) + .await? + }; + + // Sort etags by part number to ensure correct order + etags.sort_by_key(|k| k.0); + let etags: Vec = etags.into_iter().map(|(_, etag)| etag).collect(); + + // Finish the upload + let inner_data = etags + .into_iter() + .enumerate() + .map(|(i, x)| Part { + etag: x, + part_number: i as u32 + 1, + }) + .collect::>(); + let response_data = self + .complete_multipart_upload(&path, &msg.upload_id, inner_data) + .await?; + + Ok(PutStreamResponse::new( + response_data.status_code(), + total_size, + )) + } + + #[maybe_async::async_impl] + async fn _put_object_stream_chunks_sequential( + &self, + reader: &mut R, + first_chunk: Vec, + path: &str, + upload_id: &str, + content_type: &str, + ) -> Result<(usize, Vec<(u32, String)>), S3Error> { + let mut chunk = first_chunk; + let mut part_number: u32 = 0; + let mut total_size = 0; + let mut etags = Vec::new(); + + loop { + let chunk_len = chunk.len(); + + if chunk_len == 0 { + break; + } + + part_number += 1; + total_size += chunk_len; + + let current_part = part_number; + let is_last_chunk = chunk_len < CHUNK_SIZE; + + let response_data = self + .make_multipart_request(path, chunk, current_part, upload_id, content_type) + .await?; + + if !(200..300).contains(&response_data.status_code()) { + // it chunk upload failed - abort the upload + return match self.abort_upload(path, upload_id).await { + Ok(_) => Err(error_from_response_data(response_data)?), + Err(error) => Err(error), + }; + } + + etags.push((current_part, response_data.as_str()?.to_string())); + if is_last_chunk { + break; + } + + chunk = crate::utils::read_chunk_async(reader).await?; + } + + Ok((total_size, etags)) + } + + #[maybe_async::async_impl] + async fn _put_object_stream_chunks_concurrent( + &self, + reader: &mut R, + first_chunk: Vec, + path: &str, + upload_id: &str, + content_type: &str, + max_concurrent_chunks: usize, + ) -> Result<(usize, Vec<(u32, String)>), S3Error> { // Use FuturesUnordered for bounded parallelism use futures_util::FutureExt; use futures_util::stream::{FuturesUnordered, StreamExt}; @@ -1697,21 +1812,10 @@ impl Bucket { reading_done = true; } - let path_clone = path.clone(); - let upload_id_clone = upload_id.clone(); - let content_type_clone = content_type.to_string(); - let bucket_clone = self.clone(); - active_uploads.push( async move { - let result = bucket_clone - .make_multipart_request( - &path_clone, - first_chunk, - 1, - &upload_id_clone, - &content_type_clone, - ) + let result = self + .make_multipart_request(path, first_chunk, 1, upload_id, content_type) .await; (1, result) } @@ -1738,20 +1842,16 @@ impl Bucket { } let current_part = part_number; - let path_clone = path.clone(); - let upload_id_clone = upload_id.clone(); - let content_type_clone = content_type.to_string(); - let bucket_clone = self.clone(); active_uploads.push( async move { - let result = bucket_clone + let result = self .make_multipart_request( - &path_clone, + path, chunk, current_part, - &upload_id_clone, - &content_type_clone, + upload_id, + content_type, ) .await; (current_part, result) @@ -1765,7 +1865,7 @@ impl Bucket { let response_data = result?; if !(200..300).contains(&response_data.status_code()) { // if chunk upload failed - abort the upload - match self.abort_upload(&path, upload_id).await { + match self.abort_upload(path, upload_id).await { Ok(_) => { return Err(error_from_response_data(response_data)?); } @@ -1781,28 +1881,7 @@ impl Bucket { } } - // Sort etags by part number to ensure correct order - etags.sort_by_key(|k| k.0); - let etags: Vec = etags.into_iter().map(|(_, etag)| etag).collect(); - - // Finish the upload - let inner_data = etags - .clone() - .into_iter() - .enumerate() - .map(|(i, x)| Part { - etag: x, - part_number: i as u32 + 1, - }) - .collect::>(); - let response_data = self - .complete_multipart_upload(&path, &msg.upload_id, inner_data) - .await?; - - Ok(PutStreamResponse::new( - response_data.status_code(), - total_size, - )) + Ok((total_size, etags)) } #[maybe_async::sync_impl] diff --git a/s3/src/put_object_request.rs b/s3/src/put_object_request.rs index 1b48227661..b5e0801e71 100644 --- a/s3/src/put_object_request.rs +++ b/s3/src/put_object_request.rs @@ -208,6 +208,7 @@ pub struct PutObjectStreamRequest<'a> { path: String, content_type: String, custom_headers: HeaderMap, + max_concurrent_chunks: Option, } #[cfg(any(feature = "with-tokio", feature = "with-async-std"))] @@ -219,6 +220,7 @@ impl<'a> PutObjectStreamRequest<'a> { path: path.as_ref().to_string(), content_type: "application/octet-stream".to_string(), custom_headers: HeaderMap::new(), + max_concurrent_chunks: None, } } @@ -286,6 +288,12 @@ impl<'a> PutObjectStreamRequest<'a> { Ok(self) } + /// Set the maximum number of concurrent chunks for multipart upload, setting it to 0 falls back to the default value based on available memory + pub fn with_max_concurrent_chunks(mut self, max: usize) -> Self { + self.max_concurrent_chunks = std::num::NonZeroUsize::new(max); + self + } + /// Execute the streaming PUT request #[cfg(feature = "with-tokio")] pub async fn execute_stream( @@ -304,6 +312,7 @@ impl<'a> PutObjectStreamRequest<'a> { } else { Some(self.custom_headers) }, + self.max_concurrent_chunks, ) .await } @@ -323,6 +332,7 @@ impl<'a> PutObjectStreamRequest<'a> { } else { Some(self.custom_headers) }, + self.max_concurrent_chunks, ) .await }