From 26f82c14a7295304c2365c57ec571d4f9f6919ed Mon Sep 17 00:00:00 2001 From: 3em0 <59153706+3em0@users.noreply.github.com> Date: Wed, 27 May 2026 13:11:29 +0000 Subject: [PATCH 1/3] fix(query): scope http session state by tenant --- .../http/v1/query/http_query_context.rs | 7 ++- .../http/v1/session/client_session_manager.rs | 48 ++++++++++++--- src/query/service/src/sessions/mod.rs | 1 + src/query/service/src/sessions/session.rs | 58 +++++++++++++++---- 4 files changed, 93 insertions(+), 21 deletions(-) diff --git a/src/query/service/src/servers/http/v1/query/http_query_context.rs b/src/query/service/src/servers/http/v1/query/http_query_context.rs index bc61dd502911b..b630a70cb0816 100644 --- a/src/query/service/src/servers/http/v1/query/http_query_context.rs +++ b/src/query/service/src/servers/http/v1/query/http_query_context.rs @@ -177,7 +177,12 @@ impl HttpQueryContext { .map_err(|err| ErrorCode::Internal(format!("Failed to upgrade session: {err}")))?; if let Some(cid) = session.get_client_session_id() { - ClientSessionManager::instance().on_query_start(&cid, &self.user_name, &session); + ClientSessionManager::instance().on_query_start( + &session.get_current_tenant(), + &cid, + &self.user_name, + &session, + ); }; if let Some(session_conf) = http_session_conf { session_conf.restore(&session, self).await?; diff --git a/src/query/service/src/servers/http/v1/session/client_session_manager.rs b/src/query/service/src/servers/http/v1/session/client_session_manager.rs index bab53be795c00..7a73da26a9757 100644 --- a/src/query/service/src/servers/http/v1/session/client_session_manager.rs +++ b/src/query/service/src/servers/http/v1/session/client_session_manager.rs @@ -46,6 +46,7 @@ use crate::servers::http::v1::session::consts::TTL_GRACE_PERIOD_META; use crate::servers::http::v1::session::consts::TTL_GRACE_PERIOD_QUERY; use crate::sessions::Session; use crate::sessions::SessionPrivilegeManager; +use crate::sessions::temporary_table_session_prefix; pub struct TokenPair { pub refresh: String, @@ -96,8 +97,8 @@ impl ClientSessionManager { GlobalInstance::get() } - pub fn state_key(client_session_id: &str, user_name: &str) -> String { - format!("{user_name}/{client_session_id}") + pub fn state_key(tenant: &Tenant, client_session_id: &str, user_name: &str) -> String { + temporary_table_session_prefix(tenant, user_name, client_session_id) } fn refresh_token_ttl(&self) -> Duration { @@ -175,7 +176,7 @@ impl ClientSessionManager { sid: &str, user_name: &str, ) -> Result<()> { - if self.refresh_in_memory_states(sid, user_name) { + if self.refresh_in_memory_states(&tenant, sid, user_name) { self.refresh_session_handle(tenant, user_name.to_string(), sid) .await?; info!("refreshing session {}", sid); @@ -349,7 +350,7 @@ impl ClientSessionManager { .drop_client_session_id(session_id, user_name) .await .ok(); - let state_key = Self::state_key(session_id, user_name); + let state_key = Self::state_key(tenant, session_id, user_name); let state = self.session_state.lock().remove(&state_key); if let Some(state) = state { drop_all_temp_tables_with_logging(&state_key, state.temp_tbl_mgr, "closed").await; @@ -386,8 +387,13 @@ impl ClientSessionManager { Ok(()) } - pub fn refresh_in_memory_states(&self, client_session_id: &str, user_name: &str) -> bool { - let key = Self::state_key(client_session_id, user_name); + pub fn refresh_in_memory_states( + &self, + tenant: &Tenant, + client_session_id: &str, + user_name: &str, + ) -> bool { + let key = Self::state_key(tenant, client_session_id, user_name); let mut guard = self.session_state.lock(); if let Entry::Occupied(mut entry) = guard.entry(key) { let now = Instant::now(); @@ -401,8 +407,14 @@ impl ClientSessionManager { false } - pub fn on_query_start(&self, client_session_id: &str, user_name: &str, session: &Arc) { - let key = Self::state_key(client_session_id, user_name); + pub fn on_query_start( + &self, + tenant: &Tenant, + client_session_id: &str, + user_name: &str, + session: &Arc, + ) { + let key = Self::state_key(tenant, client_session_id, user_name); let mut guard = self.session_state.lock(); guard.entry(key).and_modify(|e| { if e.temp_tbl_mgr.lock().is_empty() { @@ -455,6 +467,26 @@ impl ClientSessionManager { } } +#[cfg(test)] +mod tests { + use databend_common_meta_app::tenant::Tenant; + + use super::ClientSessionManager; + + #[test] + fn test_state_key_is_tenant_scoped() { + let tenant_a = Tenant::new_literal("tenant_a"); + let tenant_b = Tenant::new_literal("tenant_b"); + let user_name = "analyst"; + let session_id = "018f2b74-0000-7000-8000-000000000001"; + + assert_ne!( + ClientSessionManager::state_key(&tenant_a, session_id, user_name), + ClientSessionManager::state_key(&tenant_b, session_id, user_name) + ); + } +} + pub async fn drop_all_temp_tables_with_logging( user_name_session_id: &str, mgr: TempTblMgrRef, diff --git a/src/query/service/src/sessions/mod.rs b/src/query/service/src/sessions/mod.rs index 21c93a1ac5589..62929827904a8 100644 --- a/src/query/service/src/sessions/mod.rs +++ b/src/query/service/src/sessions/mod.rs @@ -41,6 +41,7 @@ pub use queue_mgr::QueryEntry; pub use queue_mgr::QueueData; pub use queue_mgr::QueueManager; pub use session::Session; +pub(crate) use session::temporary_table_session_prefix; pub use session_ctx::SessionContext; pub use session_info::ProcessInfo; pub use session_mgr::SessionManager; diff --git a/src/query/service/src/sessions/session.rs b/src/query/service/src/sessions/session.rs index 8eef01313d36b..698ef17c1def5 100644 --- a/src/query/service/src/sessions/session.rs +++ b/src/query/service/src/sessions/session.rs @@ -453,24 +453,26 @@ impl Session { } pub fn get_temp_table_prefix(&self) -> Result { let typ = self.typ.read().clone(); - let session_id = match typ { - SessionType::MySQL => self.id.clone(), + let user_name = self.get_current_user()?.name; + match typ { + SessionType::MySQL => Ok(format!("{}/{}", user_name, self.id.as_str())), SessionType::HTTPQuery => { if let Some(id) = self.get_client_session_id() { - id + Ok(temporary_table_session_prefix( + &self.get_current_tenant(), + &user_name, + &id, + )) } else { - return Err(ErrorCode::BadArguments( + Err(ErrorCode::BadArguments( "can not use temp table in http handler if cookie is not enabled", - )); + )) } } - t => { - return Err(ErrorCode::BadArguments(format!( - "can not use temp table in session type {t}" - ))); - } - }; - Ok(format!("{}/{session_id}", self.get_current_user()?.name)) + t => Err(ErrorCode::BadArguments(format!( + "can not use temp table in session type {t}" + ))), + } } pub fn get_current_workload_group(&self) -> Option { @@ -482,6 +484,38 @@ impl Session { } } +pub(crate) fn temporary_table_session_prefix( + tenant: &Tenant, + user_name: &str, + session_id: &str, +) -> String { + format!("{}/{user_name}/{session_id}", tenant.tenant_name()) +} + +#[cfg(test)] +mod tests { + use databend_common_meta_app::tenant::Tenant; + + use super::temporary_table_session_prefix; + + #[test] + fn test_temporary_table_session_prefix_is_tenant_scoped() { + let tenant_a = Tenant::new_literal("tenant_a"); + let tenant_b = Tenant::new_literal("tenant_b"); + let user_name = "analyst"; + let session_id = "018f2b74-0000-7000-8000-000000000001"; + + assert_ne!( + temporary_table_session_prefix(&tenant_a, user_name, session_id), + temporary_table_session_prefix(&tenant_b, user_name, session_id) + ); + assert_eq!( + "tenant_a/analyst/018f2b74-0000-7000-8000-000000000001", + temporary_table_session_prefix(&tenant_a, user_name, session_id) + ); + } +} + impl Drop for Session { fn drop(&mut self) { drop_guard(move || { From 375810cf32a1f78f9001581894fc769bbfec3503 Mon Sep 17 00:00:00 2001 From: 3em0 <59153706+3em0@users.noreply.github.com> Date: Sat, 20 Jun 2026 02:11:18 -0700 Subject: [PATCH 2/3] fix(query): move items before test module to satisfy clippy Resolve `clippy::items_after_test_module` errors that failed the linux/check and mac_check CI jobs by moving the items declared after the `#[cfg(test)] mod tests` blocks to before them, so each test module is the last item in its file. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../http/v1/session/client_session_manager.rs | 40 +++++++++---------- src/query/service/src/sessions/session.rs | 18 ++++----- 2 files changed, 29 insertions(+), 29 deletions(-) diff --git a/src/query/service/src/servers/http/v1/session/client_session_manager.rs b/src/query/service/src/servers/http/v1/session/client_session_manager.rs index 7a73da26a9757..e3b59e1bfe3c6 100644 --- a/src/query/service/src/servers/http/v1/session/client_session_manager.rs +++ b/src/query/service/src/servers/http/v1/session/client_session_manager.rs @@ -467,26 +467,6 @@ impl ClientSessionManager { } } -#[cfg(test)] -mod tests { - use databend_common_meta_app::tenant::Tenant; - - use super::ClientSessionManager; - - #[test] - fn test_state_key_is_tenant_scoped() { - let tenant_a = Tenant::new_literal("tenant_a"); - let tenant_b = Tenant::new_literal("tenant_b"); - let user_name = "analyst"; - let session_id = "018f2b74-0000-7000-8000-000000000001"; - - assert_ne!( - ClientSessionManager::state_key(&tenant_a, session_id, user_name), - ClientSessionManager::state_key(&tenant_b, session_id, user_name) - ); - } -} - pub async fn drop_all_temp_tables_with_logging( user_name_session_id: &str, mgr: TempTblMgrRef, @@ -511,3 +491,23 @@ pub async fn drop_all_temp_tables_with_logging( } } } + +#[cfg(test)] +mod tests { + use databend_common_meta_app::tenant::Tenant; + + use super::ClientSessionManager; + + #[test] + fn test_state_key_is_tenant_scoped() { + let tenant_a = Tenant::new_literal("tenant_a"); + let tenant_b = Tenant::new_literal("tenant_b"); + let user_name = "analyst"; + let session_id = "018f2b74-0000-7000-8000-000000000001"; + + assert_ne!( + ClientSessionManager::state_key(&tenant_a, session_id, user_name), + ClientSessionManager::state_key(&tenant_b, session_id, user_name) + ); + } +} diff --git a/src/query/service/src/sessions/session.rs b/src/query/service/src/sessions/session.rs index 698ef17c1def5..8fb78885c46e0 100644 --- a/src/query/service/src/sessions/session.rs +++ b/src/query/service/src/sessions/session.rs @@ -492,6 +492,15 @@ pub(crate) fn temporary_table_session_prefix( format!("{}/{user_name}/{session_id}", tenant.tenant_name()) } +impl Drop for Session { + fn drop(&mut self) { + drop_guard(move || { + debug!("Drop session {}", self.id.clone()); + SessionManager::instance().destroy_session(&self.id.clone()); + }) + } +} + #[cfg(test)] mod tests { use databend_common_meta_app::tenant::Tenant; @@ -515,12 +524,3 @@ mod tests { ); } } - -impl Drop for Session { - fn drop(&mut self) { - drop_guard(move || { - debug!("Drop session {}", self.id.clone()); - SessionManager::instance().destroy_session(&self.id.clone()); - }) - } -} From dbc5074320773d27775a85828f997dc247e1cab1 Mon Sep 17 00:00:00 2001 From: 3em0 <59153706+3em0@users.noreply.github.com> Date: Mon, 22 Jun 2026 16:12:41 +0000 Subject: [PATCH 3/3] fix(query): parse tenant-scoped temp table sessions --- .../table_functions/temporary_tables_table.rs | 78 +++++++++++++++++-- 1 file changed, 72 insertions(+), 6 deletions(-) diff --git a/src/query/service/src/table_functions/temporary_tables_table.rs b/src/query/service/src/table_functions/temporary_tables_table.rs index 9c84757b8e51f..5132e6ebddec7 100644 --- a/src/query/service/src/table_functions/temporary_tables_table.rs +++ b/src/query/service/src/table_functions/temporary_tables_table.rs @@ -117,16 +117,15 @@ impl SyncSystemTable for TemporaryTablesTable { }) .ok_or_else(|| format!("Invalid table desc: {}", desc))?; - let user = session_key.split('/').next().unwrap().to_string(); - let session_id = session_key.split('/').nth(1).unwrap().to_string(); + let (user, session_id) = parse_temp_table_session_key(&session_key, &typ)?; let is_current_session = { if current_session_type == SessionType::HTTPQuery { current_client_session_id .as_ref() - .map(|id| id == &session_id) + .map(|id| id.as_str() == session_id) .unwrap_or(false) } else { - current_session_id == session_id + current_session_id.as_str() == session_id } }; @@ -135,8 +134,8 @@ impl SyncSystemTable for TemporaryTablesTable { names.push(table.name); table_ids.push(table.ident.table_id); engines.push(meta.engine); - users.push(user); - session_ids.push(session_id); + users.push(user.to_string()); + session_ids.push(session_id.to_string()); session_types.push(typ.to_string()); is_current_sessions.push(is_current_session); created_ons.push(meta.created_on.timestamp_micros()); @@ -171,6 +170,31 @@ impl SyncSystemTable for TemporaryTablesTable { } } +fn parse_temp_table_session_key<'a>( + session_key: &'a str, + typ: &SessionType, +) -> Result<(&'a str, &'a str)> { + let mut parts = session_key.split('/'); + match typ { + SessionType::MySQL => match (parts.next(), parts.next(), parts.next()) { + (Some(user), Some(session_id), None) => Ok((user, session_id)), + _ => invalid_temp_table_session_key(session_key, typ), + }, + SessionType::HTTPQuery => match (parts.next(), parts.next(), parts.next(), parts.next()) { + (Some(_tenant), Some(user), Some(session_id), None) => Ok((user, session_id)), + _ => invalid_temp_table_session_key(session_key, typ), + }, + _ => invalid_temp_table_session_key(session_key, typ), + } +} + +fn invalid_temp_table_session_key(session_key: &str, typ: &SessionType) -> Result { + Err(format!( + "Invalid temporary table session key: {session_key} for session type {typ}" + ) + .into()) +} + impl TemporaryTablesTable { pub fn create(table_id: u64) -> Arc { let schema = TableSchemaRefExt::create(vec![ @@ -217,3 +241,45 @@ impl TemporaryTablesTable { SyncOneBlockSystemTable::create(Self { table_info }) } } + +#[cfg(test)] +mod tests { + use databend_common_catalog::session_type::SessionType; + + use super::parse_temp_table_session_key; + + #[test] + fn test_parse_mysql_temp_table_session_key() { + assert_eq!( + ("analyst", "mysql-session"), + parse_temp_table_session_key("analyst/mysql-session", &SessionType::MySQL).unwrap() + ); + } + + #[test] + fn test_parse_http_temp_table_session_key() { + assert_eq!( + ("analyst", "http-client-session"), + parse_temp_table_session_key( + "tenant_a/analyst/http-client-session", + &SessionType::HTTPQuery + ) + .unwrap() + ); + } + + #[test] + fn test_rejects_wrong_temp_table_session_key_shape() { + assert!( + parse_temp_table_session_key( + "tenant_a/analyst/http-client-session", + &SessionType::MySQL + ) + .is_err() + ); + assert!( + parse_temp_table_session_key("analyst/http-client-session", &SessionType::HTTPQuery) + .is_err() + ); + } +}