Skip to content

[KIP-932] : Add tests for share consumer - 1/N#2273

Open
Kaushik Raina (k-raina) wants to merge 4 commits into
dev_kip-932_queues-for-kafkafrom
dev_kip-932_queues-for-kafka_add_tests_1_of_N
Open

[KIP-932] : Add tests for share consumer - 1/N#2273
Kaushik Raina (k-raina) wants to merge 4 commits into
dev_kip-932_queues-for-kafkafrom
dev_kip-932_queues-for-kafka_add_tests_1_of_N

Conversation

@k-raina

Copy link
Copy Markdown
Member

Summary

  • Basic consuming and message - reading records correctly: values,
    keys, headers, header order, timestamps, empty/null payloads, multi-partition, batch-size caps,
    oversized records, compression, and rejecting cross-thread use
  • Subscription handling - subscribing/unsubscribing behavior: multiple topics,
    repeated/incremental subscribes, empty-list and bad topic names, subscribing before a topic exists,
    and re-subscribing to the same topic

@confluent-cla-assistant

Copy link
Copy Markdown

🎉 All Contributor License Agreements have been signed. Ready to merge.
Please push an empty commit if you would like to re-run the checks to verify CLA status for all contributors.

@airlock-confluentinc airlock-confluentinc Bot force-pushed the dev_kip-932_queues-for-kafka_add_tests_1_of_N branch from e5c181a to cb7db99 Compare June 11, 2026 19:59

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Kaushik Raina (@k-raina) for the PR!
In general some nits! Otherwise the tests looks good!
There is some build failure please check that

Comment thread tests/test_ShareConsumer.py Outdated

assert conflicts, "second-thread access during poll() should have raised _CONFLICT"
assert all(err.code() == KafkaError._CONFLICT for err in conflicts)
assert 'multi-threaded' in conflicts[0].str().lower(), f"unexpected _CONFLICT message: {conflicts[0].str()!r}"

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this shouldnt be needed, we are already asserting on the error code

for i in range(5):
producer.produce(topic, value=f'first-{i}'.encode())
producer.flush(timeout=10.0)
first = drain_share_consumers([sc], 5)[0]

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better naming of the returned records

producer.produce(topic, value=f'first-{i}'.encode())
producer.flush(timeout=10.0)
first = drain_share_consumers([sc], 5)[0]
assert sorted(m.value() for m in first) == [f'first-{i}'.encode() for i in range(5)], 'phase 1 records mismatch'

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
assert sorted(m.value() for m in first) == [f'first-{i}'.encode() for i in range(5)], 'phase 1 records mismatch'
assert sorted(m.value() for msg in first) == [f'first-{i}'.encode() for i in range(5)], 'phase 1 records mismatch'

producer.produce(topic, value=b'v', timestamp=ts)
producer.flush(timeout=10.0)

received = drain_share_consumers([sc], 1)[0]

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
received = drain_share_consumers([sc], 1)[0]
received_msgs = drain_share_consumers([sc], 1)[0]

producer.produce(topic, key=None, value=None) # null -> offset 1
producer.flush(timeout=10.0)

received = drain_share_consumers([sc], 2)[0]

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
received = drain_share_consumers([sc], 2)[0]
received_msgs = drain_share_consumers([sc], 2)[0]

producer.flush(timeout=10.0)

received = drain_share_consumers([sc], 2)[0]
by_offset = {m.offset(): (m.key(), m.value()) for m in received}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
by_offset = {m.offset(): (m.key(), m.value()) for m in received}
by_offset = {msg.offset(): (msg.key(), msg.value()) for msg in received}

@sonarqube-confluent

Copy link
Copy Markdown

Quality Gate failed Quality Gate failed

Failed conditions
1.1% Coverage on New Code (required ≥ 80%)

See analysis details on SonarQube

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants