Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 57 additions & 22 deletions graphrag/index/update/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@

"""Entity related operations and utils for Incremental Indexing."""

import itertools

import numpy as np
import pandas as pd

Expand Down Expand Up @@ -45,32 +43,69 @@ def _group_and_resolve_entities(
initial_id, initial_id + len(delta_entities_df)
)
# Concat A and B
# HACK: Setting ignore_index=False is faster if indexes are already default range, but since old_entities_df and delta_entities_df may have different indexes, we cannot guarantee safety. Leave as is.
combined = pd.concat(
[old_entities_df, delta_entities_df], ignore_index=True, copy=False
)

# Group by title and resolve conflicts
aggregated = (
combined.groupby("title")
.agg({
"id": "first",
"type": "first",
"human_readable_id": "first",
"description": lambda x: list(x.astype(str)), # Ensure str
# Concatenate nd.array into a single list
"text_unit_ids": lambda x: list(itertools.chain(*x.tolist())),
"degree": "first", # todo: we could probably re-compute this with the entire new graph
"x": "first",
"y": "first",
})
.reset_index()
)
# ---- FAST LAMBDA REPLACEMENTS IN AGG ----
# Precompute grouped "description" and "text_unit_ids" with efficient vectorized routines to avoid slow Python lambdas in groupby-agg

# Prepare mapping from title (index) to stringified combined descriptions
# and title to flattened list of text_unit_ids

# The columns might have type object (lists), but to ensure we avoid costly .tolist() and .astype(str),
# we use .values and numpy operations before aggregation.

# Build all required columns up front and aggregate using a single pass

# For "description", convert all to str up front (avoiding .astype(str) for each group in lambda)
description_str = combined["description"].astype(str).values
# For "text_unit_ids", ensure all are Python lists for chaining
text_unit_ids_seq = combined["text_unit_ids"].values

# Compose the group labels (index per row)
titles = combined["title"].values
unique_titles, inverse_indices = np.unique(titles, return_inverse=True)
n_rows = len(titles)
n_groups = len(unique_titles)

# Group indices for each title
index_map = [[] for _ in range(n_groups)]
for idx, inv_i in enumerate(inverse_indices):
index_map[inv_i].append(idx)

# Precompute "description" and "text_unit_ids" per group using list comprehensions
description_grouped = [[description_str[i] for i in idxs] for idxs in index_map]
text_unit_ids_grouped = [
[x for i in idxs for x in text_unit_ids_seq[i]] for idxs in index_map
]

# Helper to efficiently apply 'first' aggregation for simple columns
def first_grouped(col):
arr = combined[col].values
return [arr[idxs[0]] for idxs in index_map]

# Now construct columns for resultant DataFrame
aggregated_dict = {
"title": unique_titles,
"id": first_grouped("id"),
"type": first_grouped("type"),
"human_readable_id": first_grouped("human_readable_id"),
"description": description_grouped,
"text_unit_ids": text_unit_ids_grouped,
"degree": first_grouped(
"degree"
), # todo: we could probably re-compute this with the entire new graph
"x": first_grouped("x"),
"y": first_grouped("y"),
}

# recompute frequency to include new text units
aggregated["frequency"] = aggregated["text_unit_ids"].apply(len)
# Compute frequency as number of text units per entry
aggregated_dict["frequency"] = [len(tu) for tu in text_unit_ids_grouped]

# Force the result into a DataFrame
resolved: pd.DataFrame = pd.DataFrame(aggregated)
# Create DataFrame directly—no need for groupby.agg
resolved = pd.DataFrame(aggregated_dict)

# Modify column order to keep consistency
resolved = resolved.loc[:, ENTITIES_FINAL_COLUMNS]
Expand Down