From bbac08257a0dd9c5a06203030939255835e64341 Mon Sep 17 00:00:00 2001 From: yulong Date: Fri, 17 Apr 2026 16:08:53 +0800 Subject: [PATCH] perf(cmdb): optimize CI cache initialization with batch queries - Add _build_ci_dicts_batch() helper function to batch-load CI attributes by value table, reducing N*M queries to T queries - Optimize CI relation loading by pre-fetching ci_id -> type_id mapping to eliminate N+1 query pattern - Refactor cache building to group CIs by type and process in batches of 500 for better memory efficiency - Add progress tracking and error reporting with detailed logging at each stage - Skip already-cached CIs before batch processing to avoid redundant operations - Add aidlc-docs to .gitignore - Improves cache initialization performance for large CI datasets by reducing database round-trips --- .gitignore | 3 + cmdb-api/api/commands/click_cmdb.py | 164 +++++++++++++++++++++++++--- 2 files changed, 152 insertions(+), 15 deletions(-) diff --git a/.gitignore b/.gitignore index e43370de5..ed57546d3 100755 --- a/.gitignore +++ b/.gitignore @@ -78,3 +78,6 @@ cmdb-ui/npm-debug.log* cmdb-ui/yarn-debug.log* cmdb-ui/yarn-error.log* cmdb-ui/package-lock.json + + +aidlc-docs \ No newline at end of file diff --git a/cmdb-api/api/commands/click_cmdb.py b/cmdb-api/api/commands/click_cmdb.py index 7794a293e..3dbfe7149 100644 --- a/cmdb-api/api/commands/click_cmdb.py +++ b/cmdb-api/api/commands/click_cmdb.py @@ -47,19 +47,100 @@ from api.tasks.cmdb import batch_ci_cache +def _build_ci_dicts_batch(ci_list, ci_type, attrs, unique_key): + """ + 批量构建一组同类型 CI 的数据字典。 + + 通过按值表分组、批量查询,将原来 N(CI) * M(属性) 次查询降低为 T(值表类型数) 次查询。 + + :param ci_list: 同一 type_id 的 CI 对象列表 + :param ci_type: CIType 对象 + :param attrs: 该类型的属性对象列表 [(type_attr, attr), ...] + :param unique_key: 唯一属性对象 + :return: dict {ci_id: ci_dict} + """ + from collections import defaultdict + from api.lib.cmdb.utils import TableMap, ValueTypeMap + + ci_ids = [ci.id for ci in ci_list] + ci_map = {ci.id: ci for ci in ci_list} + + # 初始化每个 CI 的结果字典 + result = {} + for ci in ci_list: + result[ci.id] = { + 'ci_type': ci_type.name, + '_type': ci_type.id, + 'ci_type_alias': ci_type.alias, + '_id': ci.id, + '_updated_at': str(ci.updated_at or ''), + '_updated_by': ci.updated_by, + } + if unique_key: + result[ci.id]['unique'] = unique_key.name + result[ci.id]['unique_alias'] = unique_key.alias + + # 按值表分组属性 + table_to_attrs = defaultdict(list) + for _, attr in attrs: + table_map = TableMap(attr=attr) + value_table = table_map.table + if value_table is not None: + table_to_attrs[value_table].append(attr) + + # 对每种值表执行一次批量查询 + for value_table, attr_group in table_to_attrs.items(): + attr_ids = [a.id for a in attr_group] + attr_by_id = {a.id: a for a in attr_group} + + rows = db.session.query(value_table).filter( + value_table.ci_id.in_(ci_ids), + value_table.attr_id.in_(attr_ids) + ).all() + + # 按 (ci_id, attr_id) 分组 + grouped = defaultdict(list) + for row in rows: + grouped[(row.ci_id, row.attr_id)].append(row) + + for attr in attr_group: + for ci_id in ci_ids: + rs = grouped.get((ci_id, attr.id), []) + if attr.is_list: + result[ci_id][attr.name] = [ + ValueTypeMap.serialize[attr.value_type](r.value) for r in rs + ] + elif attr.is_password and rs: + result[ci_id][attr.name] = '******' if rs[0].value else '' + else: + result[ci_id][attr.name] = ( + ValueTypeMap.serialize[attr.value_type](rs[0].value) if rs else None + ) + + return result + + @click.command() @with_appcontext def cmdb_init_cache(): db.session.remove() + # ---- 阶段 1: 加载 CI 关系 (优化: 预加载 ci_id -> type_id 映射,消除 N+1) ---- + click.echo("[1/3] Loading CI relations...") + + ci_id_to_type_id = dict(db.session.query(CI.id, CI.type_id).all()) + ci_relations = CIRelation.get_by(to_dict=False) relations = dict() relations2 = dict() for cr in ci_relations: - relations.setdefault(cr.first_ci_id, {}).update({cr.second_ci_id: cr.second_ci.type_id}) + second_type_id = ci_id_to_type_id.get(cr.second_ci_id) + if second_type_id is None: + continue + relations.setdefault(cr.first_ci_id, {}).update({cr.second_ci_id: second_type_id}) if cr.ancestor_ids: relations2.setdefault('{},{}'.format(cr.ancestor_ids, cr.first_ci_id), {}).update( - {cr.second_ci_id: cr.second_ci.type_id}) + {cr.second_ci_id: second_type_id}) for i in relations: relations[i] = json.dumps(relations[i]) for i in relations2: @@ -69,6 +150,9 @@ def cmdb_init_cache(): if relations2: rd.create_or_update(relations2, REDIS_PREFIX_CI_RELATION2) + click.echo("[1/3] CI relations loaded. {} relations processed.".format(len(ci_relations))) + + # ---- 阶段 2: ES mapping 更新 (保持不变) ---- es = None if current_app.config.get("USE_ES"): from api.extensions import es @@ -93,24 +177,74 @@ def cmdb_init_cache(): except Exception as e: print(e) + # ---- 阶段 3: 构建 CI 缓存 (优化: 按类型分组 + 批量查询) ---- + click.echo("[2/3] Building CI cache...") + + from api.lib.cmdb.cache import CITypeCache + from api.lib.cmdb.ci_type import CITypeAttributeManager + cis = CI.get_by(to_dict=False) + total = len(cis) + click.echo("[2/3] Total CIs: {}".format(total)) + + # 按 type_id 分组 + type_id_to_cis = dict() for ci in cis: - if current_app.config.get("USE_ES"): - res = es.get_index_id(ci.id) - if res: - continue - else: - res = rd.get([ci.id], REDIS_PREFIX_CI) - if res and list(filter(lambda x: x, res)): + type_id_to_cis.setdefault(ci.type_id, []).append(ci) + + processed = 0 + failed_ci_ids = [] + batch_size = 500 + + for type_id, ci_group in type_id_to_cis.items(): + ci_type = CITypeCache.get(type_id) + if not ci_type: + click.echo(" Warning: CIType {} not found, skipping {} CIs".format(type_id, len(ci_group))) + continue + + # 每种类型只查一次属性列表 + all_attrs = CITypeAttributeManager.get_all_attributes(type_id) + unique_key = AttributeCache.get(ci_type.unique_id) + + # 分批处理 + for batch_start in range(0, len(ci_group), batch_size): + batch = ci_group[batch_start:batch_start + batch_size] + + # 过滤已缓存的 CI + cis_to_process = [] + for ci in batch: + if current_app.config.get("USE_ES"): + res = es.get_index_id(ci.id) + if res: + continue + else: + res = rd.get([ci.id], REDIS_PREFIX_CI) + if res and list(filter(lambda x: x, res)): + continue + cis_to_process.append(ci) + + if not cis_to_process: + processed += len(batch) continue - m = api.lib.cmdb.ci.CIManager() - ci_dict = m.get_ci_by_id_from_db(ci.id, need_children=False, use_master=False) + try: + ci_dicts = _build_ci_dicts_batch(cis_to_process, ci_type, all_attrs, unique_key) - if current_app.config.get("USE_ES"): - es.create(ci_dict) - else: - rd.create_or_update({ci.id: json.dumps(ci_dict)}, REDIS_PREFIX_CI) + for ci_id, ci_dict in ci_dicts.items(): + if current_app.config.get("USE_ES"): + es.create(ci_dict) + else: + rd.create_or_update({ci_id: json.dumps(ci_dict)}, REDIS_PREFIX_CI) + except Exception as e: + click.echo(" Error processing batch for type {}: {}".format(ci_type.name, e)) + failed_ci_ids.extend([ci.id for ci in cis_to_process]) + + processed += len(batch) + click.echo("[2/3] Progress: {}/{} CIs processed".format(processed, total)) + + click.echo("[3/3] Done.") + if failed_ci_ids: + click.echo(" Failed CI IDs ({}): {}".format(len(failed_ci_ids), failed_ci_ids[:50])) db.session.remove()