diff --git a/tests/integration/share_consumer/test_share_consumer.py b/tests/integration/share_consumer/test_share_consumer.py index da4b4fde1..76b1ad2b1 100644 --- a/tests/integration/share_consumer/test_share_consumer.py +++ b/tests/integration/share_consumer/test_share_consumer.py @@ -21,7 +21,8 @@ import pytest -from confluent_kafka import AcknowledgeType, KafkaError, KafkaException, Producer +from confluent_kafka import TIMESTAMP_CREATE_TIME, AcknowledgeType, KafkaError, KafkaException, Producer +from confluent_kafka.admin import NewTopic from tests.common import ( drain_share_consumers, set_group_config, @@ -62,9 +63,9 @@ def test_concurrent_consumers(kafka_cluster): producer.produce(topic, value=f'msg-{i}'.encode()) producer.flush(timeout=10.0) - received_1, received_2 = drain_share_consumers([sc1, sc2], n_messages) - offsets1 = {(m.topic(), m.partition(), m.offset()) for m in received_1} - offsets2 = {(m.topic(), m.partition(), m.offset()) for m in received_2} + received_msgs_1, received_msgs_2 = drain_share_consumers([sc1, sc2], n_messages) + offsets1 = {(msg.topic(), msg.partition(), msg.offset()) for msg in received_msgs_1} + offsets2 = {(msg.topic(), msg.partition(), msg.offset()) for msg in received_msgs_2} overlap = offsets1 & offsets2 all_offsets = offsets1 | offsets2 @@ -94,8 +95,8 @@ def test_basic_consume_records(kafka_cluster): producer.produce(topic, value=v) producer.flush(timeout=10.0) - received = drain_share_consumers([sc], n)[0] - values = sorted(m.value() for m in received) + received_msgs = drain_share_consumers([sc], n)[0] + values = sorted(msg.value() for msg in received_msgs) assert values == sorted(expected), f"Value mismatch: expected {sorted(expected)}, got {values}" finally: sc.close() @@ -119,16 +120,240 @@ def test_message_fields_preserved(kafka_cluster): produced.append((key, value, headers)) producer.flush(timeout=10.0) - received = drain_share_consumers([sc], 5)[0] - assert len(received) == 5 + received_msgs = drain_share_consumers([sc], 5)[0] + assert len(received_msgs) == 5 - got = sorted([(m.key(), m.value(), m.headers()) for m in received]) + got = sorted([(msg.key(), msg.value(), msg.headers()) for msg in received_msgs]) exp = sorted(produced) assert got == exp, f"Field mismatch: expected {exp}, got {got}" finally: sc.close() +def test_header_order_preserved(kafka_cluster): + """Header ORDER round-trips intact. + + test_message_fields_preserved sorts headers before comparing, so it can't + catch a reordering. Keys can repeat and Kafka headers are an ordered list, + so order is part of the contract. + """ + topic = kafka_cluster.create_topic_and_wait_propogation('test-share-consumer-hdrorder') + + # Repeated key 'a' with different values: only order distinguishes them. + headers = [('a', b'1'), ('b', b'2'), ('a', b'3'), ('c', b'4')] + + sc = kafka_cluster.share_consumer() + try: + sc.subscribe([topic]) + + producer = kafka_cluster.cimpl_producer() + producer.produce(topic, value=b'v', headers=headers) + producer.flush(timeout=10.0) + + received_msgs = drain_share_consumers([sc], 1)[0] + assert len(received_msgs) == 1 + assert received_msgs[0].value() == b'v' + assert received_msgs[0].headers() == headers, f"header order/content changed: {received_msgs[0].headers()}" + finally: + sc.close() + + +def test_timestamp_and_type_preserved(kafka_cluster): + """A producer-set CreateTime timestamp round-trips with TIMESTAMP_CREATE_TIME. + + Complements test_message_fields_preserved, which doesn't check timestamps. + Assumes the broker default (CreateTime) so the producer's explicit timestamp + is the one stored and returned. + """ + topic = kafka_cluster.create_topic_and_wait_propogation('test-share-consumer-ts') + + ts = 1_600_000_000_000 # fixed CreateTime in ms + + sc = kafka_cluster.share_consumer() + try: + sc.subscribe([topic]) + + producer = kafka_cluster.cimpl_producer() + producer.produce(topic, value=b'v', timestamp=ts) + producer.flush(timeout=10.0) + + received_msgs = drain_share_consumers([sc], 1)[0] + assert len(received_msgs) == 1 + ts_type, ts_val = received_msgs[0].timestamp() + assert ts_type == TIMESTAMP_CREATE_TIME, f"expected CREATE_TIME, got timestamp_type {ts_type}" + assert ts_val == ts, f"timestamp not preserved: expected {ts}, got {ts_val}" + finally: + sc.close() + + +def test_zero_byte_and_null_key_value(kafka_cluster): + """Empty (zero-length) vs absent (None) keys/values are preserved distinctly: + b'' stays b'' (not collapsed to None), and None stays None. + """ + topic = kafka_cluster.create_topic_and_wait_propogation('test-share-consumer-empty') + + sc = kafka_cluster.share_consumer() + try: + sc.subscribe([topic]) + + producer = kafka_cluster.cimpl_producer() + producer.produce(topic, key=b'', value=b'') # empty, non-null -> offset 0 + producer.produce(topic, key=None, value=None) # null -> offset 1 + producer.flush(timeout=10.0) + + received_msgs = drain_share_consumers([sc], 2)[0] + by_offset = {msg.offset(): (msg.key(), msg.value()) for msg in received_msgs} + assert len(by_offset) == 2, f"expected 2 records, got {len(by_offset)}" + ordered = [by_offset[o] for o in sorted(by_offset)] + assert ordered[0] == (b'', b''), f"empty key/value not preserved: {ordered[0]}" + assert ordered[1] == (None, None), f"null key/value not preserved: {ordered[1]}" + finally: + sc.close() + + +def test_single_consumer_multi_partition_full_coverage(kafka_cluster): + """One consumer drains a multi-partition topic: every record arrives exactly + once and records show up from every partition. + + The existing basic/ordering tests use a single-partition topic; this + exercises the multi-partition fetch path. + """ + n_partitions = 3 + per_partition = 10 + topic = kafka_cluster.create_topic_and_wait_propogation( + 'test-share-consumer-multipart', conf={'num_partitions': n_partitions} + ) + + sc = kafka_cluster.share_consumer() + try: + sc.subscribe([topic]) + + producer = kafka_cluster.cimpl_producer() + for p in range(n_partitions): + for i in range(per_partition): + producer.produce(topic, value=f'p{p}-{i}'.encode(), partition=p) + producer.flush(timeout=10.0) + + total = n_partitions * per_partition + received_msgs = drain_share_consumers([sc], total)[0] + + # Exactly the produced records: unique offsets, exact value set, every + # partition represented. + unique_records = {(msg.partition(), msg.offset()) for msg in received_msgs} + assert len(unique_records) == total, f"expected {total} unique records, got {len(unique_records)}" + expected_values = sorted(f'p{p}-{i}'.encode() for p in range(n_partitions) for i in range(per_partition)) + assert ( + sorted(msg.value() for msg in received_msgs) == expected_values + ), 'received values do not match the produced set' + assert {msg.partition() for msg in received_msgs} == set(range(n_partitions)) + finally: + sc.close() + + +def test_max_poll_records_caps_batch(kafka_cluster): + """max.poll.records bounds the records returned by a single poll(). + + The cap applies at batch boundaries — a poll never splits a single broker + batch, it just stops accumulating once the cap is reached. So each record + has to land in its own batch for the cap to be observable: linger.ms=0 plus + a flush per produce gives one batch per record. With 10 single-record + batches and cap=5, no poll() returns more than 5 and draining all 10 takes + at least 2 polls. + """ + topic = kafka_cluster.create_topic_and_wait_propogation('test-share-consumer-maxpoll') + cap = 5 + n = 10 + + sc = kafka_cluster.share_consumer({'max.poll.records': cap}) + try: + sc.subscribe([topic]) + + # One produce + flush per record => one broker batch per record, so the + # cap can take effect (a single fat batch can't be split below its size). + producer = kafka_cluster.cimpl_producer({'linger.ms': 0}) + for i in range(n): + producer.produce(topic, value=f'msg-{i}'.encode()) + producer.flush(timeout=10.0) + + batch_sizes = [] + received_values = [] + deadline = time.time() + 30.0 + while time.time() < deadline and len(received_values) < n: + batch = [msg.value() for msg in sc.poll(timeout=0.5) if msg.error() is None] + if batch: + batch_sizes.append(len(batch)) + received_values.extend(batch) + + expected_values = sorted(f'msg-{i}'.encode() for i in range(n)) + assert sorted(received_values) == expected_values, 'received values do not match the produced set' + assert all(size <= cap for size in batch_sizes), f"a poll() exceeded max.poll.records={cap}: {batch_sizes}" + assert len(batch_sizes) >= 2, f"expected >=2 capped batches for {n} records at cap {cap}, got {batch_sizes}" + finally: + sc.close() + + +def test_record_larger_than_fetch_max_bytes_delivered(kafka_cluster): + """A record larger than the consumer's fetch.max.bytes is still delivered. + + The broker hands back at least one record per partition even when it exceeds + the fetch budget, so a single large record can't wedge consumption. + """ + topic = kafka_cluster.create_topic_and_wait_propogation('test-share-consumer-bigrec') + + # fetch.max.bytes is tiny so the large value far exceeds it. It has to be + # >= message.max.bytes or construction is rejected, so lower both together. + sc = kafka_cluster.share_consumer({'message.max.bytes': 1500, 'fetch.max.bytes': 1500}) + try: + sc.subscribe([topic]) + + small = b'small' + large = b'x' * 5000 # >> fetch.max.bytes + producer = kafka_cluster.cimpl_producer() + producer.produce(topic, value=small) + producer.produce(topic, value=large) + producer.flush(timeout=10.0) + + received_msgs = drain_share_consumers([sc], 2)[0] + values = sorted((msg.value() for msg in received_msgs), key=len) + assert values == [ + small, + large, + ], f"oversized record not delivered intact: got byte-lengths {[len(v) for v in values]}" + finally: + sc.close() + + +@pytest.mark.parametrize('codec', ['none', 'gzip', 'lz4', 'zstd', 'snappy']) +def test_compression_codec_roundtrip(kafka_cluster, codec): + """Records produced under each compression codec are consumed intact — + decompression is transparent to the share consumer. + + Codecs the client wasn't built with are rejected at producer construction + and skipped. + """ + topic = kafka_cluster.create_topic_and_wait_propogation(f'test-share-consumer-compress-{codec}') + n = 10 + + try: + producer = kafka_cluster.cimpl_producer({'compression.type': codec}) + except KafkaException as exc: + pytest.skip(f"compression codec '{codec}' not available in this build: {exc}") + + sc = kafka_cluster.share_consumer() + try: + sc.subscribe([topic]) + + expected = [f'{codec}-msg-{i}'.encode() for i in range(n)] + for v in expected: + producer.produce(topic, value=v) + producer.flush(timeout=10.0) + + received_msgs = drain_share_consumers([sc], n)[0] + assert sorted(msg.value() for msg in received_msgs) == sorted(expected), f"{codec}: value mismatch" + finally: + sc.close() + + def test_multi_topic_subscription(kafka_cluster): """Subscribe to multiple topics; records from all topics are delivered.""" topic_a = kafka_cluster.create_topic_and_wait_propogation('test-share-consumer-multi-a') @@ -145,12 +370,12 @@ def test_multi_topic_subscription(kafka_cluster): producer.produce(topic_b, value=f'b-{i}'.encode()) producer.flush(timeout=10.0) - received = drain_share_consumers([sc], 2 * n_per_topic)[0] - topics_seen = {m.topic() for m in received} + received_msgs = drain_share_consumers([sc], 2 * n_per_topic)[0] + topics_seen = {msg.topic() for msg in received_msgs} assert topics_seen == {topic_a, topic_b}, f"Expected both topics, got {topics_seen}" assert ( - len(received) == 2 * n_per_topic - ), f"Expected {2 * n_per_topic} records across both topics, got {len(received)}" + len(received_msgs) == 2 * n_per_topic + ), f"Expected {2 * n_per_topic} records across both topics, got {len(received_msgs)}" finally: sc.close() @@ -177,15 +402,15 @@ def test_records_before_join_not_delivered(kafka_cluster): sc.subscribe([topic]) # Observation window — pre-join records (if delivered at all) would # arrive here. - received = [] + received_msgs = [] deadline = time.time() + 8.0 while time.time() < deadline: - for m in sc.poll(timeout=0.5): - if m.error() is None: - received.append(m) + for msg in sc.poll(timeout=0.5): + if msg.error() is None: + received_msgs.append(msg) - assert received == [], ( - f"Pre-join records were delivered ({len(received)} messages); " + assert received_msgs == [], ( + f"Pre-join records were delivered ({len(received_msgs)} messages); " f"share consumers must only see records produced after join" ) finally: @@ -218,8 +443,8 @@ def test_three_consumers_no_overlap(kafka_cluster): producer.produce(topic, value=f'msg-{i}'.encode()) producer.flush(timeout=10.0) - received = drain_share_consumers(consumers, n, poll_timeout_s=0.2) - offset_sets = [{(m.topic(), m.partition(), m.offset()) for m in r} for r in received] + received_msgs = drain_share_consumers(consumers, n, poll_timeout_s=0.2) + offset_sets = [{(msg.topic(), msg.partition(), msg.offset()) for msg in r} for r in received_msgs] for i in range(len(offset_sets)): for j in range(i + 1, len(offset_sets)): @@ -252,9 +477,9 @@ def test_independent_share_groups(kafka_cluster): producer.produce(topic, value=f'msg-{i}'.encode()) producer.flush(timeout=10.0) - received_a, received_b = drain_share_consumers([sc_a, sc_b], 2 * n) - offsets_a = {(m.topic(), m.partition(), m.offset()) for m in received_a} - offsets_b = {(m.topic(), m.partition(), m.offset()) for m in received_b} + received_msgs_a, received_msgs_b = drain_share_consumers([sc_a, sc_b], 2 * n) + offsets_a = {(msg.topic(), msg.partition(), msg.offset()) for msg in received_msgs_a} + offsets_b = {(msg.topic(), msg.partition(), msg.offset()) for msg in received_msgs_b} assert len(offsets_a) == n, f"Group A got {len(offsets_a)} unique records, expected {n}" assert len(offsets_b) == n, f"Group B got {len(offsets_b)} unique records, expected {n}" @@ -281,9 +506,9 @@ def test_implicit_ack_no_redelivery(kafka_cluster): seen = set() deadline = time.time() + 20.0 while time.time() < deadline and len(seen) < n: - for m in sc.poll(timeout=0.5): - if m.error() is None: - seen.add((m.partition(), m.offset())) + for msg in sc.poll(timeout=0.5): + if msg.error() is None: + seen.add((msg.partition(), msg.offset())) assert len(seen) == n, f"Failed to consume all {n} records (got {len(seen)})" @@ -291,9 +516,9 @@ def test_implicit_ack_no_redelivery(kafka_cluster): # records, so no redelivery should occur. extras = [] for _ in range(8): - for m in sc.poll(timeout=0.5): - if m.error() is None: - extras.append((m.partition(), m.offset())) + for msg in sc.poll(timeout=0.5): + if msg.error() is None: + extras.append((msg.partition(), msg.offset())) assert extras == [], f"Records were redelivered after implicit ack: {extras}" finally: @@ -336,9 +561,9 @@ def test_records_redelivered_after_lock_timeout(kafka_cluster): sc1_received = set() deadline = time.time() + 5.0 while time.time() < deadline and not sc1_received: - for m in sc1.poll(timeout=0.5): - if m.error() is None: - sc1_received.add((m.partition(), m.offset())) + for msg in sc1.poll(timeout=0.5): + if msg.error() is None: + sc1_received.add((msg.partition(), msg.offset())) if sc1_received: break @@ -353,9 +578,9 @@ def test_records_redelivered_after_lock_timeout(kafka_cluster): sc2_received = set() deadline = time.time() + 10.0 while time.time() < deadline and len(sc2_received) < n: - for m in sc2.poll(timeout=0.5): - if m.error() is None: - sc2_received.add((m.partition(), m.offset())) + for msg in sc2.poll(timeout=0.5): + if msg.error() is None: + sc2_received.add((msg.partition(), msg.offset())) redelivered = sc1_received & sc2_received assert redelivered, ( @@ -394,9 +619,9 @@ def test_poll_with_zero_timeout(kafka_cluster): collected = [] deadline = time.time() + 20.0 while time.time() < deadline and len(collected) < n: - for m in sc.poll(timeout=0): - if m.error() is None: - collected.append((m.partition(), m.offset())) + for msg in sc.poll(timeout=0): + if msg.error() is None: + collected.append((msg.partition(), msg.offset())) assert len(collected) == n, ( f"poll(timeout=0) tight-loop should deliver all {n} records, " f"got {len(collected)}" @@ -456,11 +681,11 @@ def test_resubscribe_to_different_topic(kafka_cluster): producer.produce(topic_a, value=f'a-pre-{i}'.encode()) producer.flush(timeout=10.0) - first = drain_share_consumers([sc], 3)[0] - assert len(first) == 3, f"Failed to consume from topic_a (got {len(first)}/3)" + first_msgs = drain_share_consumers([sc], 3)[0] + assert len(first_msgs) == 3, f"Failed to consume from topic_a (got {len(first_msgs)}/3)" assert all( - m.topic() == topic_a for m in first - ), f"Expected only topic_a records, got {[m.topic() for m in first]}" + msg.topic() == topic_a for msg in first_msgs + ), f"Expected only topic_a records, got {[msg.topic() for msg in first_msgs]}" # Phase 2: switch subscription. Records to topic_a must no longer # be delivered; only topic_b records should arrive. @@ -477,10 +702,10 @@ def test_resubscribe_to_different_topic(kafka_cluster): producer.produce(topic_b, value=f'b-{i}'.encode()) producer.flush(timeout=10.0) - received = drain_share_consumers([sc], 5)[0] - topics = {m.topic() for m in received} + received_msgs = drain_share_consumers([sc], 5)[0] + topics = {msg.topic() for msg in received_msgs} assert topics == {topic_b}, f"Resubscribe should drop topic_a; got topics {topics}" - assert len(received) == 5, f"Expected 5 topic_b records, got {len(received)}" + assert len(received_msgs) == 5, f"Expected 5 topic_b records, got {len(received_msgs)}" finally: sc.close() @@ -503,9 +728,9 @@ def test_messages_in_offset_order_single_consumer(kafka_cluster): total = 0 deadline = time.time() + 20.0 while time.time() < deadline and total < n: - for m in sc.poll(timeout=0.5): - if m.error() is None: - per_partition.setdefault(m.partition(), []).append(m.offset()) + for msg in sc.poll(timeout=0.5): + if msg.error() is None: + per_partition.setdefault(msg.partition(), []).append(msg.offset()) total += 1 assert total == n, f"Expected {n} records, got {total}" @@ -544,12 +769,12 @@ def test_open_transaction_stalls_share_group(kafka_cluster): # Open txn must stall delivery. stalled = drain_share_consumers([sc], 1, timeout_s=5.0)[0] - assert stalled == [], f'open txn did not stall delivery: {[m.value() for m in stalled]}' + assert stalled == [], f'open txn did not stall delivery: {[msg.value() for msg in stalled]}' txn_producer.commit_transaction(10) - msgs = drain_share_consumers([sc], 3, ack_type=AcknowledgeType.ACCEPT)[0] - assert len(msgs) == 3, f'expected 3 msgs after commit, got {len(msgs)}' + received_msgs = drain_share_consumers([sc], 3, ack_type=AcknowledgeType.ACCEPT)[0] + assert len(received_msgs) == 3, f'expected 3 msgs after commit, got {len(received_msgs)}' finally: sc.close() @@ -559,3 +784,81 @@ def test_double_close_is_idempotent(kafka_cluster): sc = kafka_cluster.share_consumer() sc.close() sc.close() + + +def test_subscribe_before_topic_exists(kafka_cluster): + """A subscription made BEFORE the topic exists starts delivering once the + topic is created and produced to. + + The client keeps refreshing metadata for a subscribed-but-unknown topic, so + it picks the topic up when it appears and (earliest reset) drains it. + """ + topic = unique_id('test-share-consumer-prejoin-create') + n = 10 + + sc = kafka_cluster.share_consumer() + try: + # Subscribe before the topic exists; the join can't assign it yet, and + # no records should surface in the meantime. + sc.subscribe([topic]) + pre = [] + for _ in range(5): + pre.extend(msg for msg in sc.poll(timeout=0.2) if msg.error() is None) + assert pre == [], f'no records should arrive before the topic exists, got {len(pre)}' + + # Create the topic, then produce to it. + create_futures = kafka_cluster.admin().create_topics([NewTopic(topic, num_partitions=1, replication_factor=1)]) + create_futures[topic].result() + time.sleep(1) # propagation across brokers + + producer = kafka_cluster.cimpl_producer() + for i in range(n): + producer.produce(topic, value=f'msg-{i}'.encode()) + producer.flush(timeout=10.0) + + received_msgs = drain_share_consumers([sc], n, timeout_s=30.0)[0] + assert sorted(msg.value() for msg in received_msgs) == sorted( + f'msg-{i}'.encode() for i in range(n) + ), 'expected exactly the records produced after the topic was created' + finally: + sc.close() + + +def test_resubscribe_same_topic_keeps_delivering(kafka_cluster): + """Re-subscribing to the SAME topic doesn't disrupt consumption: records + produced after the redundant re-subscribe are still delivered. Distinct + from test_resubscribe_to_different_topic, which switches topics. + """ + topic = kafka_cluster.create_topic_and_wait_propogation('test-share-consumer-resub-same') + + sc = kafka_cluster.share_consumer() + try: + sc.subscribe([topic]) + + producer = kafka_cluster.cimpl_producer() + for i in range(5): + producer.produce(topic, value=f'first-{i}'.encode()) + producer.flush(timeout=10.0) + first_msgs = drain_share_consumers([sc], 5)[0] + assert sorted(msg.value() for msg in first_msgs) == [ + f'first-{i}'.encode() for i in range(5) + ], 'phase 1 records mismatch' + + # Redundant re-subscribe to the same topic; drive heartbeats so the + # (unchanged) subscription settles before producing more. + sc.subscribe([topic]) + for _ in range(10): + sc.poll(timeout=0.2) + + for i in range(5): + producer.produce(topic, value=f'second-{i}'.encode()) + producer.flush(timeout=10.0) + second_msgs = drain_share_consumers([sc], 5)[0] + # Exactly the new records — and only those, proving the redundant + # re-subscribe didn't redeliver phase 1's already-consumed records. + assert sorted(msg.value() for msg in second_msgs) == [ + f'second-{i}'.encode() for i in range(5) + ], 're-subscribe to the same topic should deliver the new records and only those' + assert all(msg.topic() == topic for msg in second_msgs) + finally: + sc.close() diff --git a/tests/test_ShareConsumer.py b/tests/test_ShareConsumer.py index 3ff48a7e7..99f4e6c19 100644 --- a/tests/test_ShareConsumer.py +++ b/tests/test_ShareConsumer.py @@ -9,7 +9,7 @@ import pytest -from confluent_kafka import AcknowledgeType, KafkaError, KafkaException, ShareConsumer +from confluent_kafka import AcknowledgeType, KafkaError, KafkaException, Message, ShareConsumer from tests.common import ( TestShareConsumer, TestUtils, @@ -129,6 +129,41 @@ def test_subscribe(share_consumer): assert 'test-topic' in subscription +def test_subscribe_multiple_topics(share_consumer): + """subscribe() with several topics: subscription() reports all of them. + + The result comes back sorted rather than in insertion order, so compare + order-agnostically — what matters is that every topic survives the round + trip, not the ordering.""" + share_consumer.subscribe(['topic-c', 'topic-a', 'topic-b']) + assert sorted(share_consumer.subscription()) == ['topic-a', 'topic-b', 'topic-c'] + + +def test_subscribe_idempotent_and_incremental(share_consumer): + """Re-subscribing to the same set is idempotent, growing the topic list + grows the subscription, and a repeated unsubscribe() is a harmless no-op. + """ + # Incremental: each subscribe() fully replaces the prior set, so a growing + # list yields a growing subscription. + share_consumer.subscribe(['a']) + assert share_consumer.subscription() == ['a'] + share_consumer.subscribe(['a', 'b']) + assert sorted(share_consumer.subscription()) == ['a', 'b'] + share_consumer.subscribe(['a', 'b', 'c']) + assert sorted(share_consumer.subscription()) == ['a', 'b', 'c'] + + # Idempotent: subscribing to the same set repeatedly doesn't duplicate it. + share_consumer.subscribe(['x', 'y']) + share_consumer.subscribe(['x', 'y']) + share_consumer.subscribe(['x', 'y']) + assert sorted(share_consumer.subscription()) == ['x', 'y'] + + # Repeated unsubscribe is a no-op, not an error. + share_consumer.unsubscribe() + share_consumer.unsubscribe() + assert share_consumer.subscription() == [] + + def test_unsubscribe(share_consumer): """Test unsubscribe() method.""" share_consumer.subscribe(['test-topic']) @@ -138,6 +173,14 @@ def test_unsubscribe(share_consumer): assert len(subscription) == 0 +def test_unsubscribe_without_subscription_is_noop(share_consumer): + """unsubscribe() before any subscribe() is a no-op: it returns None and + leaves the subscription empty rather than raising.""" + assert share_consumer.subscription() == [] + assert share_consumer.unsubscribe() is None + assert share_consumer.subscription() == [] + + def test_poll_no_broker(share_consumer): """Test poll() returns empty list when no broker available.""" share_consumer.subscribe(['test-topic']) @@ -146,6 +189,18 @@ def test_poll_no_broker(share_consumer): assert messages == [] +def test_poll_without_subscription_raises_state(share_consumer): + """poll() before any subscribe() raises KafkaException(_STATE). + + The "not subscribed" check fires before any broker I/O, so this returns + immediately without a broker. We assert only the error code, not the + message: depending on timing it can be either "not subscribed" or + "consumer group not initialized", both of which are _STATE.""" + with pytest.raises(KafkaException) as ex: + share_consumer.poll(timeout=0.1) + assert ex.value.args[0].code() == KafkaError._STATE + + def test_commit_does_not_hang_on_unreachable_broker(): """Commit on a fresh, unsubscribed consumer pointed at an unreachable broker returns immediately (no acks pending). The interesting case @@ -240,6 +295,17 @@ def test_any_method_after_close_throws_exception(): sc.poll(timeout=0.1) assert ex.match('Share consumer closed') + # The closed-state check happens before argument parsing, so acknowledge(None) + # raises the closed-consumer RuntimeError rather than a TypeError about the + # non-Message argument. + with pytest.raises(RuntimeError) as ex: + sc.acknowledge(None, AcknowledgeType.ACCEPT) + assert ex.match('Share consumer closed') + + with pytest.raises(RuntimeError) as ex: + sc.acknowledge_offset('test-topic', 0, 0, AcknowledgeType.ACCEPT) + assert ex.match('Share consumer closed') + with pytest.raises(RuntimeError) as ex: sc.commit_sync(timeout=0.1) assert ex.match('Share consumer closed') @@ -269,14 +335,43 @@ def test_subscribe_with_non_list_raises(share_consumer): def test_subscribe_with_empty_list_unsubscribes(share_consumer): - """subscribe([]) is equivalent to unsubscribe(): an empty topic list - clears the current subscription instead of raising.""" + """subscribe([]) is equivalent to unsubscribe(): an empty topic list clears + the current subscription instead of raising, after which poll() raises + _STATE (not subscribed). + + This is an empty *list* — an empty topic *name* is a different case and + still raises _INVALID_ARG (test_subscribe_rejects_empty_and_duplicate_topic_names). + """ share_consumer.subscribe(['test-topic']) assert share_consumer.subscription() == ['test-topic'] - share_consumer.subscribe([]) # no exception + assert share_consumer.subscribe([]) is None assert share_consumer.subscription() == [] + with pytest.raises(KafkaException) as ex: + share_consumer.poll(timeout=0.1) + assert ex.value.args[0].code() == KafkaError._STATE + + +def test_subscribe_rejects_empty_and_duplicate_topic_names(share_consumer): + """An empty topic name and duplicate topic names are rejected with + _INVALID_ARG. (An empty *list* is a different case — it unsubscribes.)""" + with pytest.raises(KafkaException) as ex: + share_consumer.subscribe(['']) + assert ex.value.args[0].code() == KafkaError._INVALID_ARG + + with pytest.raises(KafkaException) as ex: + share_consumer.subscribe(['dup-topic', 'dup-topic']) + assert ex.value.args[0].code() == KafkaError._INVALID_ARG + + +def test_subscribe_accepts_caret_topic_as_literal_name(share_consumer): + """A '^'-prefixed name is accepted and stored verbatim — it's treated as a + literal topic name, not a regex pattern. Whether it matches any topic is a + broker-side question; here we just confirm it's accepted and round-trips.""" + share_consumer.subscribe(['^literal-name']) + assert share_consumer.subscription() == ['^literal-name'] + def test_poll_with_non_numeric_timeout_raises(share_consumer): """poll(timeout=...) must reject non-numeric values.""" @@ -302,6 +397,21 @@ def test_acknowledge_rejects_non_message_argument(share_consumer): share_consumer.acknowledge(bad, AcknowledgeType.ACCEPT) +def test_acknowledge_none_topic_message_rejected(share_consumer): + """acknowledge() of a Message with no topic (topic() is None) is rejected + with _INVALID_ARG rather than crashing on the missing topic. + + partition/offset are valid, so the absent topic is the only thing wrong. + A missing topic is checked before the ack-mode check, which is why the + default implicit-mode fixture works here — the same call with a real topic + would instead return _STATE (an explicit ack in implicit mode).""" + msg = Message(partition=0, offset=0) + assert msg.topic() is None + with pytest.raises(KafkaException) as ex: + share_consumer.acknowledge(msg, AcknowledgeType.ACCEPT) + assert ex.value.args[0].code() == KafkaError._INVALID_ARG + + def test_acknowledge_offset_rejects_non_str_topic(share_consumer): """acknowledge_offset() must reject non-str topic.""" for bad in (None, 42, object(), []): @@ -337,6 +447,35 @@ def test_acknowledge_offset_rejects_negative_offset(share_consumer): assert ex.value.args[0].code() == KafkaError._INVALID_ARG +def test_acknowledge_offset_rejects_out_of_range_ack_type(): + """An out-of-range AcknowledgeType is rejected with _INVALID_ARG. + + Only ACCEPT(1), RELEASE(2) and REJECT(3) are valid. The ack_type is only + checked after the ack-mode check, and an explicit ack in implicit mode + returns _STATE first — so this needs an explicit-mode consumer (not the + shared implicit fixture) to actually reach the type check. topic/partition/ + offset are valid, so the bad ack_type is the only thing wrong. acknowledge() + takes the same path, so this covers both ack APIs.""" + sc = ShareConsumer( + { + 'group.id': unique_id('test-share-bad-ack-type'), + 'bootstrap.servers': 'localhost:9092', + 'socket.timeout.ms': 100, + 'share.acknowledgement.mode': 'explicit', + } + ) + try: + # 0 sits just below ACCEPT(1), 4 just above REJECT(3); 999 is far out. + for bad_ack_type in (0, 4, 999): + with pytest.raises(KafkaException) as ex: + sc.acknowledge_offset('test-topic', 0, 0, bad_ack_type) + assert ( + ex.value.args[0].code() == KafkaError._INVALID_ARG + ), f'ack_type={bad_ack_type} should be rejected with _INVALID_ARG' + finally: + sc.close() + + def test_commit_sync_rejects_non_numeric_timeout(share_consumer): """commit_sync(timeout=...) must reject non-numeric values.""" for bad in ('str', None, object(), []): @@ -408,3 +547,59 @@ def test_poll_interruptible_by_signal(): sc2.close() assert interrupted, "poll() (infinite) should have been interrupted by SIGINT" + + +def test_concurrent_thread_access_raises_conflict(): + """A ShareConsumer is not safe for concurrent use: touching it from a + second thread while another thread is inside poll() raises + KafkaException(_CONFLICT). + + Ownership is held by whichever thread is currently in a call, for the whole + duration of that call (including poll()'s blocking wait), so a second + thread's call is rejected. No broker needed — the guard is local and stays + held across the idle poll, so the hammer thread reliably hits it. + commit_async() makes a good probe: it's guarded but returns immediately. + """ + sc = TestShareConsumer( + { + 'group.id': unique_id('test-share-conflict'), + 'socket.timeout.ms': 100, + } + ) + sc.subscribe(['test-topic']) + + conflicts = [] + other_errors = [] + stop = threading.Event() + + def hammer(): + while not stop.is_set(): + try: + sc.commit_async() + except KafkaException as exc: + err = exc.args[0] + (conflicts if err.code() == KafkaError._CONFLICT else other_errors).append(err) + except Exception as exc: # noqa: BLE001 - record anything unexpected + other_errors.append(repr(exc)) + + hammer_thread = threading.Thread(target=hammer, daemon=True) + hammer_thread.start() + try: + # Keep the consumer busy inside poll() until the hammer thread sees a + # conflict, or give up after a few seconds. The main thread can lose the + # race too if the hammer briefly grabs ownership — that's fine, swallow + # it and keep polling. + deadline = time.monotonic() + 3.0 + while not conflicts and time.monotonic() < deadline: + try: + sc.poll(timeout=0.2) + except KafkaException: + pass + finally: + stop.set() + hammer_thread.join(timeout=2.0) + sc.close() + + assert conflicts, "second-thread access during poll() should have raised _CONFLICT" + assert all(err.code() == KafkaError._CONFLICT for err in conflicts) + assert not other_errors, f"unexpected errors from second thread: {[str(e) for e in other_errors]}"