diff --git a/graphrag/index/update/entities.py b/graphrag/index/update/entities.py index 7a74d1b6e0..2057f8f5ae 100644 --- a/graphrag/index/update/entities.py +++ b/graphrag/index/update/entities.py @@ -3,8 +3,6 @@ """Entity related operations and utils for Incremental Indexing.""" -import itertools - import numpy as np import pandas as pd @@ -45,32 +43,69 @@ def _group_and_resolve_entities( initial_id, initial_id + len(delta_entities_df) ) # Concat A and B + # HACK: Setting ignore_index=False is faster if indexes are already default range, but since old_entities_df and delta_entities_df may have different indexes, we cannot guarantee safety. Leave as is. combined = pd.concat( [old_entities_df, delta_entities_df], ignore_index=True, copy=False ) - # Group by title and resolve conflicts - aggregated = ( - combined.groupby("title") - .agg({ - "id": "first", - "type": "first", - "human_readable_id": "first", - "description": lambda x: list(x.astype(str)), # Ensure str - # Concatenate nd.array into a single list - "text_unit_ids": lambda x: list(itertools.chain(*x.tolist())), - "degree": "first", # todo: we could probably re-compute this with the entire new graph - "x": "first", - "y": "first", - }) - .reset_index() - ) + # ---- FAST LAMBDA REPLACEMENTS IN AGG ---- + # Precompute grouped "description" and "text_unit_ids" with efficient vectorized routines to avoid slow Python lambdas in groupby-agg + + # Prepare mapping from title (index) to stringified combined descriptions + # and title to flattened list of text_unit_ids + + # The columns might have type object (lists), but to ensure we avoid costly .tolist() and .astype(str), + # we use .values and numpy operations before aggregation. + + # Build all required columns up front and aggregate using a single pass + + # For "description", convert all to str up front (avoiding .astype(str) for each group in lambda) + description_str = combined["description"].astype(str).values + # For "text_unit_ids", ensure all are Python lists for chaining + text_unit_ids_seq = combined["text_unit_ids"].values + + # Compose the group labels (index per row) + titles = combined["title"].values + unique_titles, inverse_indices = np.unique(titles, return_inverse=True) + n_rows = len(titles) + n_groups = len(unique_titles) + + # Group indices for each title + index_map = [[] for _ in range(n_groups)] + for idx, inv_i in enumerate(inverse_indices): + index_map[inv_i].append(idx) + + # Precompute "description" and "text_unit_ids" per group using list comprehensions + description_grouped = [[description_str[i] for i in idxs] for idxs in index_map] + text_unit_ids_grouped = [ + [x for i in idxs for x in text_unit_ids_seq[i]] for idxs in index_map + ] + + # Helper to efficiently apply 'first' aggregation for simple columns + def first_grouped(col): + arr = combined[col].values + return [arr[idxs[0]] for idxs in index_map] + + # Now construct columns for resultant DataFrame + aggregated_dict = { + "title": unique_titles, + "id": first_grouped("id"), + "type": first_grouped("type"), + "human_readable_id": first_grouped("human_readable_id"), + "description": description_grouped, + "text_unit_ids": text_unit_ids_grouped, + "degree": first_grouped( + "degree" + ), # todo: we could probably re-compute this with the entire new graph + "x": first_grouped("x"), + "y": first_grouped("y"), + } - # recompute frequency to include new text units - aggregated["frequency"] = aggregated["text_unit_ids"].apply(len) + # Compute frequency as number of text units per entry + aggregated_dict["frequency"] = [len(tu) for tu in text_unit_ids_grouped] - # Force the result into a DataFrame - resolved: pd.DataFrame = pd.DataFrame(aggregated) + # Create DataFrame directly—no need for groupby.agg + resolved = pd.DataFrame(aggregated_dict) # Modify column order to keep consistency resolved = resolved.loc[:, ENTITIES_FINAL_COLUMNS]