[KIP-932] : Add tests for share consumer - 1/N#2273
[KIP-932] : Add tests for share consumer - 1/N#2273Kaushik Raina (k-raina) wants to merge 4 commits into
Conversation
|
🎉 All Contributor License Agreements have been signed. Ready to merge. |
e5c181a to
cb7db99
Compare
Pratyush Ranjan (PratRanj07)
left a comment
There was a problem hiding this comment.
Thanks Kaushik Raina (@k-raina) for the PR!
In general some nits! Otherwise the tests looks good!
There is some build failure please check that
|
|
||
| assert conflicts, "second-thread access during poll() should have raised _CONFLICT" | ||
| assert all(err.code() == KafkaError._CONFLICT for err in conflicts) | ||
| assert 'multi-threaded' in conflicts[0].str().lower(), f"unexpected _CONFLICT message: {conflicts[0].str()!r}" |
There was a problem hiding this comment.
I think this shouldnt be needed, we are already asserting on the error code
| for i in range(5): | ||
| producer.produce(topic, value=f'first-{i}'.encode()) | ||
| producer.flush(timeout=10.0) | ||
| first = drain_share_consumers([sc], 5)[0] |
There was a problem hiding this comment.
Better naming of the returned records
| producer.produce(topic, value=f'first-{i}'.encode()) | ||
| producer.flush(timeout=10.0) | ||
| first = drain_share_consumers([sc], 5)[0] | ||
| assert sorted(m.value() for m in first) == [f'first-{i}'.encode() for i in range(5)], 'phase 1 records mismatch' |
There was a problem hiding this comment.
| assert sorted(m.value() for m in first) == [f'first-{i}'.encode() for i in range(5)], 'phase 1 records mismatch' | |
| assert sorted(m.value() for msg in first) == [f'first-{i}'.encode() for i in range(5)], 'phase 1 records mismatch' |
| producer.produce(topic, value=b'v', timestamp=ts) | ||
| producer.flush(timeout=10.0) | ||
|
|
||
| received = drain_share_consumers([sc], 1)[0] |
There was a problem hiding this comment.
| received = drain_share_consumers([sc], 1)[0] | |
| received_msgs = drain_share_consumers([sc], 1)[0] |
| producer.produce(topic, key=None, value=None) # null -> offset 1 | ||
| producer.flush(timeout=10.0) | ||
|
|
||
| received = drain_share_consumers([sc], 2)[0] |
There was a problem hiding this comment.
| received = drain_share_consumers([sc], 2)[0] | |
| received_msgs = drain_share_consumers([sc], 2)[0] |
| producer.flush(timeout=10.0) | ||
|
|
||
| received = drain_share_consumers([sc], 2)[0] | ||
| by_offset = {m.offset(): (m.key(), m.value()) for m in received} |
There was a problem hiding this comment.
| by_offset = {m.offset(): (m.key(), m.value()) for m in received} | |
| by_offset = {msg.offset(): (msg.key(), msg.value()) for msg in received} |
|


Summary
keys, headers, header order, timestamps, empty/null payloads, multi-partition, batch-size caps,
oversized records, compression, and rejecting cross-thread use
repeated/incremental subscribes, empty-list and bad topic names, subscribing before a topic exists,
and re-subscribing to the same topic