diff --git a/src/query/service/src/servers/http/v1/query/http_query.rs b/src/query/service/src/servers/http/v1/query/http_query.rs index 464da192b0c28..1529cd13d2092 100644 --- a/src/query/service/src/servers/http/v1/query/http_query.rs +++ b/src/query/service/src/servers/http/v1/query/http_query.rs @@ -416,6 +416,9 @@ impl HttpSessionConf { if !state.variables.is_empty() { session.set_all_variables(state.get_variables()?) } + } + http_ctx.restore_client_session_state(session); + if let Some(state) = &self.internal { if let Some(id) = state.last_query_ids.first() { if let Some(last_query) = http_query_manager.get_query(id) { let state = *last_query.client_state.lock(); diff --git a/src/query/service/src/servers/http/v1/query/http_query_context.rs b/src/query/service/src/servers/http/v1/query/http_query_context.rs index bc61dd502911b..66a3e7bed811a 100644 --- a/src/query/service/src/servers/http/v1/query/http_query_context.rs +++ b/src/query/service/src/servers/http/v1/query/http_query_context.rs @@ -176,11 +176,10 @@ impl HttpQueryContext { .upgrade_session(session_type) .map_err(|err| ErrorCode::Internal(format!("Failed to upgrade session: {err}")))?; - if let Some(cid) = session.get_client_session_id() { - ClientSessionManager::instance().on_query_start(&cid, &self.user_name, &session); - }; if let Some(session_conf) = http_session_conf { session_conf.restore(&session, self).await?; + } else { + self.restore_client_session_state(&session); }; let user_agent = &self.user_agent; session.set_client_host(self.client_host.clone()); @@ -201,6 +200,17 @@ impl HttpQueryContext { ctx.update_init_query_id(self.query_id.clone()); Ok((session, ctx)) } + + pub(crate) fn restore_client_session_state(&self, session: &Arc) { + if let Some(cid) = session.get_client_session_id() { + ClientSessionManager::instance().on_query_start( + &session.get_current_tenant(), + &cid, + &self.user_name, + session, + ); + }; + } } impl<'a> FromRequest<'a> for &'a HttpQueryContext { diff --git a/src/query/service/src/servers/http/v1/session/client_session_manager.rs b/src/query/service/src/servers/http/v1/session/client_session_manager.rs index bab53be795c00..e3b59e1bfe3c6 100644 --- a/src/query/service/src/servers/http/v1/session/client_session_manager.rs +++ b/src/query/service/src/servers/http/v1/session/client_session_manager.rs @@ -46,6 +46,7 @@ use crate::servers::http::v1::session::consts::TTL_GRACE_PERIOD_META; use crate::servers::http::v1::session::consts::TTL_GRACE_PERIOD_QUERY; use crate::sessions::Session; use crate::sessions::SessionPrivilegeManager; +use crate::sessions::temporary_table_session_prefix; pub struct TokenPair { pub refresh: String, @@ -96,8 +97,8 @@ impl ClientSessionManager { GlobalInstance::get() } - pub fn state_key(client_session_id: &str, user_name: &str) -> String { - format!("{user_name}/{client_session_id}") + pub fn state_key(tenant: &Tenant, client_session_id: &str, user_name: &str) -> String { + temporary_table_session_prefix(tenant, user_name, client_session_id) } fn refresh_token_ttl(&self) -> Duration { @@ -175,7 +176,7 @@ impl ClientSessionManager { sid: &str, user_name: &str, ) -> Result<()> { - if self.refresh_in_memory_states(sid, user_name) { + if self.refresh_in_memory_states(&tenant, sid, user_name) { self.refresh_session_handle(tenant, user_name.to_string(), sid) .await?; info!("refreshing session {}", sid); @@ -349,7 +350,7 @@ impl ClientSessionManager { .drop_client_session_id(session_id, user_name) .await .ok(); - let state_key = Self::state_key(session_id, user_name); + let state_key = Self::state_key(tenant, session_id, user_name); let state = self.session_state.lock().remove(&state_key); if let Some(state) = state { drop_all_temp_tables_with_logging(&state_key, state.temp_tbl_mgr, "closed").await; @@ -386,8 +387,13 @@ impl ClientSessionManager { Ok(()) } - pub fn refresh_in_memory_states(&self, client_session_id: &str, user_name: &str) -> bool { - let key = Self::state_key(client_session_id, user_name); + pub fn refresh_in_memory_states( + &self, + tenant: &Tenant, + client_session_id: &str, + user_name: &str, + ) -> bool { + let key = Self::state_key(tenant, client_session_id, user_name); let mut guard = self.session_state.lock(); if let Entry::Occupied(mut entry) = guard.entry(key) { let now = Instant::now(); @@ -401,8 +407,14 @@ impl ClientSessionManager { false } - pub fn on_query_start(&self, client_session_id: &str, user_name: &str, session: &Arc) { - let key = Self::state_key(client_session_id, user_name); + pub fn on_query_start( + &self, + tenant: &Tenant, + client_session_id: &str, + user_name: &str, + session: &Arc, + ) { + let key = Self::state_key(tenant, client_session_id, user_name); let mut guard = self.session_state.lock(); guard.entry(key).and_modify(|e| { if e.temp_tbl_mgr.lock().is_empty() { @@ -479,3 +491,23 @@ pub async fn drop_all_temp_tables_with_logging( } } } + +#[cfg(test)] +mod tests { + use databend_common_meta_app::tenant::Tenant; + + use super::ClientSessionManager; + + #[test] + fn test_state_key_is_tenant_scoped() { + let tenant_a = Tenant::new_literal("tenant_a"); + let tenant_b = Tenant::new_literal("tenant_b"); + let user_name = "analyst"; + let session_id = "018f2b74-0000-7000-8000-000000000001"; + + assert_ne!( + ClientSessionManager::state_key(&tenant_a, session_id, user_name), + ClientSessionManager::state_key(&tenant_b, session_id, user_name) + ); + } +} diff --git a/src/query/service/src/sessions/mod.rs b/src/query/service/src/sessions/mod.rs index 21c93a1ac5589..62929827904a8 100644 --- a/src/query/service/src/sessions/mod.rs +++ b/src/query/service/src/sessions/mod.rs @@ -41,6 +41,7 @@ pub use queue_mgr::QueryEntry; pub use queue_mgr::QueueData; pub use queue_mgr::QueueManager; pub use session::Session; +pub(crate) use session::temporary_table_session_prefix; pub use session_ctx::SessionContext; pub use session_info::ProcessInfo; pub use session_mgr::SessionManager; diff --git a/src/query/service/src/sessions/session.rs b/src/query/service/src/sessions/session.rs index 8eef01313d36b..8fb78885c46e0 100644 --- a/src/query/service/src/sessions/session.rs +++ b/src/query/service/src/sessions/session.rs @@ -453,24 +453,26 @@ impl Session { } pub fn get_temp_table_prefix(&self) -> Result { let typ = self.typ.read().clone(); - let session_id = match typ { - SessionType::MySQL => self.id.clone(), + let user_name = self.get_current_user()?.name; + match typ { + SessionType::MySQL => Ok(format!("{}/{}", user_name, self.id.as_str())), SessionType::HTTPQuery => { if let Some(id) = self.get_client_session_id() { - id + Ok(temporary_table_session_prefix( + &self.get_current_tenant(), + &user_name, + &id, + )) } else { - return Err(ErrorCode::BadArguments( + Err(ErrorCode::BadArguments( "can not use temp table in http handler if cookie is not enabled", - )); + )) } } - t => { - return Err(ErrorCode::BadArguments(format!( - "can not use temp table in session type {t}" - ))); - } - }; - Ok(format!("{}/{session_id}", self.get_current_user()?.name)) + t => Err(ErrorCode::BadArguments(format!( + "can not use temp table in session type {t}" + ))), + } } pub fn get_current_workload_group(&self) -> Option { @@ -482,6 +484,14 @@ impl Session { } } +pub(crate) fn temporary_table_session_prefix( + tenant: &Tenant, + user_name: &str, + session_id: &str, +) -> String { + format!("{}/{user_name}/{session_id}", tenant.tenant_name()) +} + impl Drop for Session { fn drop(&mut self) { drop_guard(move || { @@ -490,3 +500,27 @@ impl Drop for Session { }) } } + +#[cfg(test)] +mod tests { + use databend_common_meta_app::tenant::Tenant; + + use super::temporary_table_session_prefix; + + #[test] + fn test_temporary_table_session_prefix_is_tenant_scoped() { + let tenant_a = Tenant::new_literal("tenant_a"); + let tenant_b = Tenant::new_literal("tenant_b"); + let user_name = "analyst"; + let session_id = "018f2b74-0000-7000-8000-000000000001"; + + assert_ne!( + temporary_table_session_prefix(&tenant_a, user_name, session_id), + temporary_table_session_prefix(&tenant_b, user_name, session_id) + ); + assert_eq!( + "tenant_a/analyst/018f2b74-0000-7000-8000-000000000001", + temporary_table_session_prefix(&tenant_a, user_name, session_id) + ); + } +} diff --git a/src/query/service/src/table_functions/temporary_tables_table.rs b/src/query/service/src/table_functions/temporary_tables_table.rs index 9c84757b8e51f..ac8d115e50ebc 100644 --- a/src/query/service/src/table_functions/temporary_tables_table.rs +++ b/src/query/service/src/table_functions/temporary_tables_table.rs @@ -117,16 +117,15 @@ impl SyncSystemTable for TemporaryTablesTable { }) .ok_or_else(|| format!("Invalid table desc: {}", desc))?; - let user = session_key.split('/').next().unwrap().to_string(); - let session_id = session_key.split('/').nth(1).unwrap().to_string(); + let (user, session_id) = parse_temp_table_session_key(&session_key, &typ)?; let is_current_session = { if current_session_type == SessionType::HTTPQuery { current_client_session_id .as_ref() - .map(|id| id == &session_id) + .map(|id| id.as_str() == session_id) .unwrap_or(false) } else { - current_session_id == session_id + current_session_id.as_str() == session_id } }; @@ -135,8 +134,8 @@ impl SyncSystemTable for TemporaryTablesTable { names.push(table.name); table_ids.push(table.ident.table_id); engines.push(meta.engine); - users.push(user); - session_ids.push(session_id); + users.push(user.to_string()); + session_ids.push(session_id.to_string()); session_types.push(typ.to_string()); is_current_sessions.push(is_current_session); created_ons.push(meta.created_on.timestamp_micros()); @@ -171,6 +170,28 @@ impl SyncSystemTable for TemporaryTablesTable { } } +fn parse_temp_table_session_key<'a>( + session_key: &'a str, + typ: &SessionType, +) -> Result<(&'a str, &'a str)> { + let mut parts = session_key.split('/'); + match typ { + SessionType::MySQL => match (parts.next(), parts.next(), parts.next()) { + (Some(user), Some(session_id), None) => Ok((user, session_id)), + _ => invalid_temp_table_session_key(session_key, typ), + }, + SessionType::HTTPQuery => match (parts.next(), parts.next(), parts.next(), parts.next()) { + (Some(_tenant), Some(user), Some(session_id), None) => Ok((user, session_id)), + _ => invalid_temp_table_session_key(session_key, typ), + }, + _ => invalid_temp_table_session_key(session_key, typ), + } +} + +fn invalid_temp_table_session_key(session_key: &str, typ: &SessionType) -> Result { + Err(format!("Invalid temporary table session key: {session_key} for session type {typ}").into()) +} + impl TemporaryTablesTable { pub fn create(table_id: u64) -> Arc { let schema = TableSchemaRefExt::create(vec![ @@ -217,3 +238,45 @@ impl TemporaryTablesTable { SyncOneBlockSystemTable::create(Self { table_info }) } } + +#[cfg(test)] +mod tests { + use databend_common_catalog::session_type::SessionType; + + use super::parse_temp_table_session_key; + + #[test] + fn test_parse_mysql_temp_table_session_key() { + assert_eq!( + ("analyst", "mysql-session"), + parse_temp_table_session_key("analyst/mysql-session", &SessionType::MySQL).unwrap() + ); + } + + #[test] + fn test_parse_http_temp_table_session_key() { + assert_eq!( + ("analyst", "http-client-session"), + parse_temp_table_session_key( + "tenant_a/analyst/http-client-session", + &SessionType::HTTPQuery + ) + .unwrap() + ); + } + + #[test] + fn test_rejects_wrong_temp_table_session_key_shape() { + assert!( + parse_temp_table_session_key( + "tenant_a/analyst/http-client-session", + &SessionType::MySQL + ) + .is_err() + ); + assert!( + parse_temp_table_session_key("analyst/http-client-session", &SessionType::HTTPQuery) + .is_err() + ); + } +}