-
Notifications
You must be signed in to change notification settings - Fork 975
fix: stream multipart uploads to avoid OOM on large files #477
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
7f5d102
4be0e81
e71d69c
c0a881f
f267bee
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| --- | ||
| "@googleworkspace/cli": patch | ||
| --- | ||
|
|
||
| Stream multipart uploads to avoid OOM on large files. File content is now streamed in chunks via `ReaderStream` instead of being read entirely into memory, reducing memory usage from O(file_size) to O(64 KB). |
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change | ||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -22,6 +22,7 @@ use std::collections::{HashMap, HashSet}; | |||||||||||
| use std::path::PathBuf; | ||||||||||||
|
|
||||||||||||
| use anyhow::Context; | ||||||||||||
| use futures_util::stream::TryStreamExt; | ||||||||||||
| use futures_util::StreamExt; | ||||||||||||
| use serde_json::{json, Map, Value}; | ||||||||||||
| use tokio::io::AsyncWriteExt; | ||||||||||||
|
|
@@ -183,20 +184,22 @@ async fn build_http_request( | |||||||||||
| if input.is_upload { | ||||||||||||
| let upload_path = upload_path.expect("upload_path must be Some when is_upload is true"); | ||||||||||||
|
|
||||||||||||
| let file_bytes = tokio::fs::read(upload_path).await.map_err(|e| { | ||||||||||||
| let file_meta = tokio::fs::metadata(upload_path).await.map_err(|e| { | ||||||||||||
| GwsError::Validation(format!( | ||||||||||||
| "Failed to read upload file '{}': {}", | ||||||||||||
| "Failed to get metadata for upload file '{}': {}", | ||||||||||||
| upload_path, e | ||||||||||||
| )) | ||||||||||||
| })?; | ||||||||||||
| let file_size = file_meta.len(); | ||||||||||||
|
|
||||||||||||
| request = request.query(&[("uploadType", "multipart")]); | ||||||||||||
| let media_mime = | ||||||||||||
| resolve_upload_mime(upload_content_type, Some(upload_path), &input.body); | ||||||||||||
| let (multipart_body, content_type) = | ||||||||||||
| build_multipart_body(&input.body, &file_bytes, &media_mime)?; | ||||||||||||
| let (body, content_type, content_length) = | ||||||||||||
| build_multipart_stream(&input.body, upload_path, file_size, &media_mime)?; | ||||||||||||
jpoehnelt marked this conversation as resolved.
Show resolved
Hide resolved
jpoehnelt marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||
| request = request.header("Content-Type", content_type); | ||||||||||||
| request = request.body(multipart_body); | ||||||||||||
| request = request.header("Content-Length", content_length); | ||||||||||||
| request = request.body(body); | ||||||||||||
| } else if let Some(ref body_val) = input.body { | ||||||||||||
| request = request.header("Content-Type", "application/json"); | ||||||||||||
| request = request.json(body_val); | ||||||||||||
|
|
@@ -827,9 +830,75 @@ fn mime_from_extension(path: &str) -> Option<&'static str> { | |||||||||||
| } | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| /// Builds a multipart/related body for media upload requests. | ||||||||||||
| /// Builds a streaming multipart/related body for media upload requests. | ||||||||||||
| /// | ||||||||||||
| /// Instead of reading the entire file into memory, this streams the file in | ||||||||||||
| /// chunks via `ReaderStream`, keeping memory usage at O(64 KB) regardless of | ||||||||||||
| /// file size. The `Content-Length` is pre-computed from file metadata so Google | ||||||||||||
| /// APIs still receive the correct header without buffering. | ||||||||||||
| /// | ||||||||||||
| /// Returns `(body, content_type, content_length)`. | ||||||||||||
| fn build_multipart_stream( | ||||||||||||
| metadata: &Option<Value>, | ||||||||||||
| file_path: &str, | ||||||||||||
jpoehnelt marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||
| file_size: u64, | ||||||||||||
| media_mime: &str, | ||||||||||||
| ) -> Result<(reqwest::Body, String, u64), GwsError> { | ||||||||||||
| let boundary = format!("gws_boundary_{:016x}", rand::random::<u64>()); | ||||||||||||
|
|
||||||||||||
| let media_mime = media_mime.to_string(); | ||||||||||||
|
|
||||||||||||
| let metadata_json = match metadata { | ||||||||||||
| Some(m) => serde_json::to_string(m).map_err(|e| { | ||||||||||||
| GwsError::Validation(format!("Failed to serialize upload metadata: {e}")) | ||||||||||||
| })?, | ||||||||||||
| None => "{}".to_string(), | ||||||||||||
| }; | ||||||||||||
|
|
||||||||||||
| let preamble = format!( | ||||||||||||
| "--{boundary}\r\nContent-Type: application/json; charset=UTF-8\r\n\r\n{metadata_json}\r\n\ | ||||||||||||
| --{boundary}\r\nContent-Type: {media_mime}\r\n\r\n" | ||||||||||||
| ); | ||||||||||||
|
Comment on lines
+858
to
+861
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The multi-line
Suggested change
|
||||||||||||
| let postamble = format!("\r\n--{boundary}--\r\n"); | ||||||||||||
|
|
||||||||||||
| let content_length = preamble.len() as u64 + file_size + postamble.len() as u64; | ||||||||||||
| let content_type = format!("multipart/related; boundary={boundary}"); | ||||||||||||
|
|
||||||||||||
| let preamble_bytes: bytes::Bytes = preamble.into_bytes().into(); | ||||||||||||
| let postamble_bytes: bytes::Bytes = postamble.into_bytes().into(); | ||||||||||||
|
|
||||||||||||
| let file_path_owned = file_path.to_owned(); | ||||||||||||
| let file_stream = futures_util::stream::once(async move { | ||||||||||||
| tokio::fs::File::open(&file_path_owned).await.map_err(|e| { | ||||||||||||
| std::io::Error::new( | ||||||||||||
| e.kind(), | ||||||||||||
| format!("failed to open upload file '{}': {}", file_path_owned, e), | ||||||||||||
| ) | ||||||||||||
| }) | ||||||||||||
| }) | ||||||||||||
| .map_ok(tokio_util::io::ReaderStream::new) | ||||||||||||
| .try_flatten(); | ||||||||||||
|
|
||||||||||||
| let stream = futures_util::stream::once(async { Ok::<_, std::io::Error>(preamble_bytes) }) | ||||||||||||
| .chain(file_stream) | ||||||||||||
| .chain(futures_util::stream::once(async { | ||||||||||||
| Ok::<_, std::io::Error>(postamble_bytes) | ||||||||||||
| })); | ||||||||||||
|
|
||||||||||||
| Ok(( | ||||||||||||
| reqwest::Body::wrap_stream(stream), | ||||||||||||
| content_type, | ||||||||||||
| content_length, | ||||||||||||
| )) | ||||||||||||
| } | ||||||||||||
jpoehnelt marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||
|
|
||||||||||||
| /// Builds a buffered multipart/related body for media upload requests. | ||||||||||||
| /// | ||||||||||||
| /// This is the legacy implementation retained for unit tests that need | ||||||||||||
| /// a fully materialized body to assert against. | ||||||||||||
| /// | ||||||||||||
| /// Returns the body bytes and the Content-Type header value (with boundary). | ||||||||||||
| #[cfg(test)] | ||||||||||||
| fn build_multipart_body( | ||||||||||||
| metadata: &Option<Value>, | ||||||||||||
| file_bytes: &[u8], | ||||||||||||
|
|
@@ -1369,6 +1438,82 @@ mod tests { | |||||||||||
| ); | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| #[tokio::test] | ||||||||||||
| async fn test_build_multipart_stream_content_length() { | ||||||||||||
| let dir = tempfile::tempdir().unwrap(); | ||||||||||||
| let file_path = dir.path().join("small.txt"); | ||||||||||||
| let file_content = b"Hello stream"; | ||||||||||||
| std::fs::write(&file_path, file_content).unwrap(); | ||||||||||||
|
|
||||||||||||
| let metadata = Some(json!({ "name": "small.txt" })); | ||||||||||||
| let file_size = file_content.len() as u64; | ||||||||||||
|
|
||||||||||||
| let (_body, content_type, declared_len) = build_multipart_stream( | ||||||||||||
| &metadata, | ||||||||||||
| file_path.to_str().unwrap(), | ||||||||||||
| file_size, | ||||||||||||
| "text/plain", | ||||||||||||
| ) | ||||||||||||
| .unwrap(); | ||||||||||||
|
|
||||||||||||
| assert!(content_type.starts_with("multipart/related; boundary=")); | ||||||||||||
| let boundary = content_type.split("boundary=").nth(1).unwrap(); | ||||||||||||
|
|
||||||||||||
| // Manually compute expected content length: | ||||||||||||
| // preamble = "--{boundary}\r\nContent-Type: application/json; charset=UTF-8\r\n\r\n{json}\r\n--{boundary}\r\nContent-Type: text/plain\r\n\r\n" | ||||||||||||
| // postamble = "\r\n--{boundary}--\r\n" | ||||||||||||
| let metadata_json = serde_json::to_string(&metadata.unwrap()).unwrap(); | ||||||||||||
| let preamble = format!( | ||||||||||||
| "--{boundary}\r\nContent-Type: application/json; charset=UTF-8\r\n\r\n{metadata_json}\r\n\ | ||||||||||||
| --{boundary}\r\nContent-Type: text/plain\r\n\r\n" | ||||||||||||
| ); | ||||||||||||
|
Comment on lines
+1466
to
+1469
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The test calculates the expected preamble length using the same buggy format string as in
Suggested change
|
||||||||||||
| let postamble = format!("\r\n--{boundary}--\r\n"); | ||||||||||||
| let expected = preamble.len() as u64 + file_size + postamble.len() as u64; | ||||||||||||
| assert_eq!( | ||||||||||||
| declared_len, expected, | ||||||||||||
| "declared Content-Length must match expected preamble + file + postamble" | ||||||||||||
| ); | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| #[tokio::test] | ||||||||||||
| async fn test_build_multipart_stream_large_file() { | ||||||||||||
| let dir = tempfile::tempdir().unwrap(); | ||||||||||||
| let file_path = dir.path().join("large.bin"); | ||||||||||||
| // 256 KB — larger than the default 64 KB ReaderStream chunk size | ||||||||||||
| let data = vec![0xABu8; 256 * 1024]; | ||||||||||||
| std::fs::write(&file_path, &data).unwrap(); | ||||||||||||
|
|
||||||||||||
| let metadata = None; | ||||||||||||
| let file_size = data.len() as u64; | ||||||||||||
|
|
||||||||||||
| let (_body, _content_type, declared_len) = build_multipart_stream( | ||||||||||||
| &metadata, | ||||||||||||
| file_path.to_str().unwrap(), | ||||||||||||
| file_size, | ||||||||||||
| "application/octet-stream", | ||||||||||||
| ) | ||||||||||||
| .unwrap(); | ||||||||||||
|
|
||||||||||||
| // Content-Length must account for the empty-metadata preamble + large file + postamble | ||||||||||||
| assert!( | ||||||||||||
| declared_len > file_size, | ||||||||||||
| "Content-Length ({declared_len}) must be larger than file size ({file_size}) due to multipart framing" | ||||||||||||
| ); | ||||||||||||
|
|
||||||||||||
| // Verify exact arithmetic: preamble overhead + file_size + postamble | ||||||||||||
| let boundary = _content_type.split("boundary=").nth(1).unwrap(); | ||||||||||||
| let preamble = format!( | ||||||||||||
| "--{boundary}\r\nContent-Type: application/json; charset=UTF-8\r\n\r\n{{}}\r\n\ | ||||||||||||
| --{boundary}\r\nContent-Type: application/octet-stream\r\n\r\n" | ||||||||||||
| ); | ||||||||||||
|
Comment on lines
+1505
to
+1508
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similar to the other test, the preamble calculation here includes extra whitespace that will not match a correctly formed multipart body. The leading whitespace on the second line of the format string should be removed.
Suggested change
|
||||||||||||
| let postamble = format!("\r\n--{boundary}--\r\n"); | ||||||||||||
| let expected = preamble.len() as u64 + file_size + postamble.len() as u64; | ||||||||||||
| assert_eq!( | ||||||||||||
| declared_len, expected, | ||||||||||||
| "Content-Length must match for multi-chunk files" | ||||||||||||
| ); | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| #[test] | ||||||||||||
| fn test_build_url_basic() { | ||||||||||||
| let doc = RestDescription { | ||||||||||||
|
|
||||||||||||
Uh oh!
There was an error while loading. Please reload this page.