Skip to content

Optimize noSQLdask: connection pooling, batched queries, dask.delayed#80

Merged
yehudarav merged 1 commit intomasterfrom
optimize-nosqldask
Apr 2, 2026
Merged

Optimize noSQLdask: connection pooling, batched queries, dask.delayed#80
yehudarav merged 1 commit intomasterfrom
optimize-nosqldask

Conversation

@yehudarav
Copy link
Copy Markdown
Collaborator

Summary

Rewrote both CassandraBag and MongoBag for significantly better performance with large datasets.

CassandraBag fixes

Before After Impact
New Cluster+Session per partition Single shared session Saves ~100-500ms × npartitions
Loop over keys one at a time key IN (...) in one query N keys × M partitions → M queries
dask.bag.to_dataframe().compute().pivot() dask.delayedpd.DataFrame Avoids double materialization
items = items + list(rows) items.extend(rows) O(n) vs O(n^2)
Default fetch_size 5000 Configurable (default 50000) Fewer Cassandra round-trips

MongoBag fixes

Before After Impact
New MongoClient per partition Single shared client Connection pooling
String comparison on timestamps datetime objects in query Index-friendly
dask.bag dask.delayedpd.DataFrame Less overhead
No projection support Optional projection param Reduce data transfer

Both

  • Context manager support (with CassandraBag(...) as bag:)
  • close() method for explicit cleanup
  • getDataFrame() convenience method on MongoBag

Test plan

  • pytest — 99 passed, 10 xfailed
  • mkdocs build — clean

🤖 Generated with Claude Code

CassandraBag:
- Single shared Cassandra session instead of new connection per partition
  (saves ~100-500ms × npartitions in connection overhead)
- Query all keys in one CQL query with IN (...) instead of looping per key
  (reduces queries from keys × partitions to just partitions)
- dask.delayed → pd.DataFrame instead of dask.bag (avoids double
  materialization: bag → DataFrame → pivot)
- extend() instead of list concatenation (O(n) vs O(n^2))
- Configurable fetch_size (default 50000) for large result sets
- Context manager support (with ... as bag:)
- close() method for explicit resource cleanup

MongoBag:
- Single shared MongoClient instead of new connection per partition
- datetime objects in queries instead of string comparison (allows
  MongoDB to use time-based indexes efficiently)
- dask.delayed → pd.DataFrame instead of dask.bag
- Projection support to limit returned fields
- getDataFrame() convenience method
- Context manager support
- Backwards-compatible read_datetime_interval_from_collection()

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@yehudarav yehudarav merged commit 420de7b into master Apr 2, 2026
4 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant