-
Notifications
You must be signed in to change notification settings - Fork 49
Open
Description
Ran into this while trying to upgrade to the v3.8.0 Python client. Tested on a Pulsar v4.0.4 cluster, which is after this issue was fixed in Pulsar v4.0.1.
Possibly related to apache/pulsar #23917?
It seems that if you:
- Have a reader
- Seek by timestamp to a point where there's messages after it
read_next()until there's no more messages
Then the next has_message_available() call returns True, even though there's no more messages afterwards. Additionally, calling has_message_available() again correctly returns False, meaning that this bug only happens the first time.
This is an issue because a loop like:
while reader.has_message_available():
msg = reader.read_next()will end up hanging when it reaches the end of the topic, due to the incorrectly returned True value from has_message_available().
Tested by having a producer that publishes one message per second:
import json
import time
import pulsar
client = pulsar.Client("pulsar://localhost:6650")
producer = client.create_producer(
"persistent://foo/bar/baz",
)
while True:
data = json.dumps({"ts": time.time()}).encode("utf-8")
print("Sending data:", data, flush=True)
producer.send(data)
time.sleep(1)Then start a reader like so:
import time
import pulsar
client = pulsar.Client("pulsar://localhost:6650")
reader = client.create_reader(
"persistent://foo/bar/baz",
start_message_id=pulsar.MessageId.latest,
)
reader.seek(int(time.time() - 10) * 1000) # Seek to 10s ago.
while True:
print(reader.has_message_available(), flush=True)
print(reader.has_message_available(), flush=True)
print(reader.has_message_available(), flush=True)
print("="*20)
msg = reader.read_next()Observed output:
- Initially there's a lot of
True True True, for messages in the past while the reader is catching up - Once the reader catches up, the outputs become
True False False, indicating that the firsthas_message_available()call each time is incorrectly givingTrue
Expected output:
- Once the reader has caught up, the outputs should be
False False False, since there's no message available andread_next()needs to block until a new message comes in.
Notes:
- The bug doesn't seem to happen if there's no seek by timestamp done at the start.
- The bug doesn't seem to happen if there's a small delay after the
read_next()(e.g.time.sleep(0.1)) -- for this it's easier to observe if you make the producer publish slower. - The bug was probably introduced in v3.5.0, since it does not occur with the v3.4.0 client but it does with the v3.5.0 client.
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels