From e0d403fdfa764f88f56e2ca143527ff1d6c6eeab Mon Sep 17 00:00:00 2001 From: RoyLin Date: Wed, 10 Jun 2026 16:01:44 +0800 Subject: [PATCH] fix(streaming): use idle read_timeout for SSE, not total request timeout MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit reqwest's RequestBuilder::timeout caps the WHOLE request including the streamed body, so SSE/chunked responses were hard-killed after the hardcoded 300s regardless of activity — every SSE stream died at 5 min. Move to a client-level read_timeout (idle, reset on every byte): a healthy stream with periodic keep-alive frames (~10s) never trips it and can run indefinitely; only a genuinely silent upstream is reaped. Bump to 1.0.10. --- Cargo.toml | 2 +- src/proxy/streaming.rs | 22 ++++++++++++++++++---- 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 85092bd..ff2b77b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "a3s-gateway" -version = "1.0.7" +version = "1.0.10" edition = "2021" rust-version = "1.88" authors = ["A3S Lab"] diff --git a/src/proxy/streaming.rs b/src/proxy/streaming.rs index 3447c2a..018d95c 100644 --- a/src/proxy/streaming.rs +++ b/src/proxy/streaming.rs @@ -12,10 +12,21 @@ use std::time::Duration; /// Shared reqwest client for streaming requests — reuses connection pool across calls static STREAMING_CLIENT: OnceLock = OnceLock::new(); +/// Idle read timeout for streaming/SSE responses. This is the max gap the +/// upstream may go silent — NOT a total-request deadline. The API emits an SSE +/// keep-alive every ~10s, so a healthy stream never trips it and can run +/// indefinitely; only a genuinely dead upstream is reaped after this window. +const STREAM_IDLE_TIMEOUT_SECS: u64 = 300; + fn streaming_client() -> &'static reqwest::Client { STREAMING_CLIENT.get_or_init(|| { reqwest::Client::builder() .pool_max_idle_per_host(100) + // read_timeout = per-read (idle) timeout, reset on every byte — + // unlike .timeout()/RequestBuilder::timeout which caps the *whole* + // request including the streamed body and hard-killed every SSE + // stream after 5 minutes regardless of activity. + .read_timeout(Duration::from_secs(STREAM_IDLE_TIMEOUT_SECS)) .build() .unwrap_or_default() }) @@ -90,10 +101,13 @@ pub async fn forward_streaming( let path_and_query = uri.path_and_query().map(|pq| pq.as_str()).unwrap_or("/"); let upstream_url = format!("{}{}", backend_url, path_and_query); - // Reuse shared client — connection pool survives across streaming requests - let mut req_builder = streaming_client() - .request(method.clone(), &upstream_url) - .timeout(Duration::from_secs(timeout_secs)); + // Reuse shared client — its read_timeout (idle, see STREAM_IDLE_TIMEOUT_SECS) + // governs streaming liveness. Deliberately NO per-request .timeout() here: + // reqwest's total-request timeout would cap the whole streamed body and + // hard-kill every SSE/chunked response after `timeout_secs` regardless of + // activity (that was the 5-minute SSE cutoff). `timeout_secs` is still used + // below to label an UpstreamTimeout if the initial response never arrives. + let mut req_builder = streaming_client().request(method.clone(), &upstream_url); // Forward headers (skip hop-by-hop) — eq_ignore_ascii_case avoids to_lowercase() alloc for (key, value) in headers.iter() {