Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,12 @@ impl HttpQueryContext {
.map_err(|err| ErrorCode::Internal(format!("Failed to upgrade session: {err}")))?;

if let Some(cid) = session.get_client_session_id() {
ClientSessionManager::instance().on_query_start(&cid, &self.user_name, &session);
ClientSessionManager::instance().on_query_start(
&session.get_current_tenant(),
&cid,
&self.user_name,
&session,
);
};
if let Some(session_conf) = http_session_conf {
session_conf.restore(&session, self).await?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use crate::servers::http::v1::session::consts::TTL_GRACE_PERIOD_META;
use crate::servers::http::v1::session::consts::TTL_GRACE_PERIOD_QUERY;
use crate::sessions::Session;
use crate::sessions::SessionPrivilegeManager;
use crate::sessions::temporary_table_session_prefix;

pub struct TokenPair {
pub refresh: String,
Expand Down Expand Up @@ -96,8 +97,8 @@ impl ClientSessionManager {
GlobalInstance::get()
}

pub fn state_key(client_session_id: &str, user_name: &str) -> String {
format!("{user_name}/{client_session_id}")
pub fn state_key(tenant: &Tenant, client_session_id: &str, user_name: &str) -> String {
temporary_table_session_prefix(tenant, user_name, client_session_id)
}

fn refresh_token_ttl(&self) -> Duration {
Expand Down Expand Up @@ -175,7 +176,7 @@ impl ClientSessionManager {
sid: &str,
user_name: &str,
) -> Result<()> {
if self.refresh_in_memory_states(sid, user_name) {
if self.refresh_in_memory_states(&tenant, sid, user_name) {
self.refresh_session_handle(tenant, user_name.to_string(), sid)
.await?;
info!("refreshing session {}", sid);
Expand Down Expand Up @@ -349,7 +350,7 @@ impl ClientSessionManager {
.drop_client_session_id(session_id, user_name)
.await
.ok();
let state_key = Self::state_key(session_id, user_name);
let state_key = Self::state_key(tenant, session_id, user_name);
let state = self.session_state.lock().remove(&state_key);
if let Some(state) = state {
drop_all_temp_tables_with_logging(&state_key, state.temp_tbl_mgr, "closed").await;
Expand Down Expand Up @@ -386,8 +387,13 @@ impl ClientSessionManager {
Ok(())
}

pub fn refresh_in_memory_states(&self, client_session_id: &str, user_name: &str) -> bool {
let key = Self::state_key(client_session_id, user_name);
pub fn refresh_in_memory_states(
&self,
tenant: &Tenant,
client_session_id: &str,
user_name: &str,
) -> bool {
let key = Self::state_key(tenant, client_session_id, user_name);
let mut guard = self.session_state.lock();
if let Entry::Occupied(mut entry) = guard.entry(key) {
let now = Instant::now();
Expand All @@ -401,8 +407,14 @@ impl ClientSessionManager {
false
}

pub fn on_query_start(&self, client_session_id: &str, user_name: &str, session: &Arc<Session>) {
let key = Self::state_key(client_session_id, user_name);
pub fn on_query_start(
&self,
tenant: &Tenant,
client_session_id: &str,
user_name: &str,
session: &Arc<Session>,
) {
let key = Self::state_key(tenant, client_session_id, user_name);
let mut guard = self.session_state.lock();
guard.entry(key).and_modify(|e| {
if e.temp_tbl_mgr.lock().is_empty() {
Expand Down Expand Up @@ -479,3 +491,23 @@ pub async fn drop_all_temp_tables_with_logging(
}
}
}

#[cfg(test)]
mod tests {
use databend_common_meta_app::tenant::Tenant;

use super::ClientSessionManager;

#[test]
fn test_state_key_is_tenant_scoped() {
let tenant_a = Tenant::new_literal("tenant_a");
let tenant_b = Tenant::new_literal("tenant_b");
let user_name = "analyst";
let session_id = "018f2b74-0000-7000-8000-000000000001";

assert_ne!(
ClientSessionManager::state_key(&tenant_a, session_id, user_name),
ClientSessionManager::state_key(&tenant_b, session_id, user_name)
);
}
}
1 change: 1 addition & 0 deletions src/query/service/src/sessions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ pub use queue_mgr::QueryEntry;
pub use queue_mgr::QueueData;
pub use queue_mgr::QueueManager;
pub use session::Session;
pub(crate) use session::temporary_table_session_prefix;
pub use session_ctx::SessionContext;
pub use session_info::ProcessInfo;
pub use session_mgr::SessionManager;
Expand Down
58 changes: 46 additions & 12 deletions src/query/service/src/sessions/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -453,24 +453,26 @@ impl Session {
}
pub fn get_temp_table_prefix(&self) -> Result<String> {
let typ = self.typ.read().clone();
let session_id = match typ {
SessionType::MySQL => self.id.clone(),
let user_name = self.get_current_user()?.name;
match typ {
SessionType::MySQL => Ok(format!("{}/{}", user_name, self.id.as_str())),
SessionType::HTTPQuery => {
if let Some(id) = self.get_client_session_id() {
id
Ok(temporary_table_session_prefix(
&self.get_current_tenant(),
&user_name,
&id,
))
} else {
return Err(ErrorCode::BadArguments(
Err(ErrorCode::BadArguments(
"can not use temp table in http handler if cookie is not enabled",
));
))
}
}
t => {
return Err(ErrorCode::BadArguments(format!(
"can not use temp table in session type {t}"
)));
}
};
Ok(format!("{}/{session_id}", self.get_current_user()?.name))
t => Err(ErrorCode::BadArguments(format!(
"can not use temp table in session type {t}"
))),
}
}

pub fn get_current_workload_group(&self) -> Option<String> {
Expand All @@ -482,6 +484,14 @@ impl Session {
}
}

pub(crate) fn temporary_table_session_prefix(
tenant: &Tenant,
user_name: &str,
session_id: &str,
) -> String {
format!("{}/{user_name}/{session_id}", tenant.tenant_name())

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Update temporary table key consumers for tenant prefix

When an HTTP temp table exists, ClientSessionManager::get_all_temp_tables() now returns keys built by this helper, but system.temporary_tables still decodes every key as user/session_id (temporary_tables_table.rs lines 120-121). With the new tenant/user/session_id format, that table reports the tenant as the user and the user name as the session id, so is_current_session also never matches the real client session id for HTTP sessions. Please update the HTTP parsing/display path or keep the tenant-scoped map key separate from the two-part session key exposed to this table.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@3em0 thanks for your contribution. this does need to be fixed

}

impl Drop for Session {
fn drop(&mut self) {
drop_guard(move || {
Expand All @@ -490,3 +500,27 @@ impl Drop for Session {
})
}
}

#[cfg(test)]
mod tests {
use databend_common_meta_app::tenant::Tenant;

use super::temporary_table_session_prefix;

#[test]
fn test_temporary_table_session_prefix_is_tenant_scoped() {
let tenant_a = Tenant::new_literal("tenant_a");
let tenant_b = Tenant::new_literal("tenant_b");
let user_name = "analyst";
let session_id = "018f2b74-0000-7000-8000-000000000001";

assert_ne!(
temporary_table_session_prefix(&tenant_a, user_name, session_id),
temporary_table_session_prefix(&tenant_b, user_name, session_id)
);
assert_eq!(
"tenant_a/analyst/018f2b74-0000-7000-8000-000000000001",
temporary_table_session_prefix(&tenant_a, user_name, session_id)
);
}
}
78 changes: 72 additions & 6 deletions src/query/service/src/table_functions/temporary_tables_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,16 +117,15 @@ impl SyncSystemTable for TemporaryTablesTable {
})
.ok_or_else(|| format!("Invalid table desc: {}", desc))?;

let user = session_key.split('/').next().unwrap().to_string();
let session_id = session_key.split('/').nth(1).unwrap().to_string();
let (user, session_id) = parse_temp_table_session_key(&session_key, &typ)?;
let is_current_session = {
if current_session_type == SessionType::HTTPQuery {
current_client_session_id
.as_ref()
.map(|id| id == &session_id)
.map(|id| id.as_str() == session_id)
.unwrap_or(false)
} else {
current_session_id == session_id
current_session_id.as_str() == session_id
}
};

Expand All @@ -135,8 +134,8 @@ impl SyncSystemTable for TemporaryTablesTable {
names.push(table.name);
table_ids.push(table.ident.table_id);
engines.push(meta.engine);
users.push(user);
session_ids.push(session_id);
users.push(user.to_string());
session_ids.push(session_id.to_string());
session_types.push(typ.to_string());
is_current_sessions.push(is_current_session);
created_ons.push(meta.created_on.timestamp_micros());
Expand Down Expand Up @@ -171,6 +170,31 @@ impl SyncSystemTable for TemporaryTablesTable {
}
}

fn parse_temp_table_session_key<'a>(
session_key: &'a str,
typ: &SessionType,
) -> Result<(&'a str, &'a str)> {
let mut parts = session_key.split('/');
match typ {
SessionType::MySQL => match (parts.next(), parts.next(), parts.next()) {
(Some(user), Some(session_id), None) => Ok((user, session_id)),
_ => invalid_temp_table_session_key(session_key, typ),
},
SessionType::HTTPQuery => match (parts.next(), parts.next(), parts.next(), parts.next()) {
(Some(_tenant), Some(user), Some(session_id), None) => Ok((user, session_id)),
_ => invalid_temp_table_session_key(session_key, typ),
},
_ => invalid_temp_table_session_key(session_key, typ),
}
}

fn invalid_temp_table_session_key<T>(session_key: &str, typ: &SessionType) -> Result<T> {
Err(format!(
"Invalid temporary table session key: {session_key} for session type {typ}"
)
.into())
}

impl TemporaryTablesTable {
pub fn create(table_id: u64) -> Arc<dyn Table> {
let schema = TableSchemaRefExt::create(vec![
Expand Down Expand Up @@ -217,3 +241,45 @@ impl TemporaryTablesTable {
SyncOneBlockSystemTable::create(Self { table_info })
}
}

#[cfg(test)]
mod tests {
use databend_common_catalog::session_type::SessionType;

use super::parse_temp_table_session_key;

#[test]
fn test_parse_mysql_temp_table_session_key() {
assert_eq!(
("analyst", "mysql-session"),
parse_temp_table_session_key("analyst/mysql-session", &SessionType::MySQL).unwrap()
);
}

#[test]
fn test_parse_http_temp_table_session_key() {
assert_eq!(
("analyst", "http-client-session"),
parse_temp_table_session_key(
"tenant_a/analyst/http-client-session",
&SessionType::HTTPQuery
)
.unwrap()
);
}

#[test]
fn test_rejects_wrong_temp_table_session_key_shape() {
assert!(
parse_temp_table_session_key(
"tenant_a/analyst/http-client-session",
&SessionType::MySQL
)
.is_err()
);
assert!(
parse_temp_table_session_key("analyst/http-client-session", &SessionType::HTTPQuery)
.is_err()
);
}
}
Loading