Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 12 additions & 42 deletions kafka/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -833,7 +833,6 @@ def least_loaded_node(self):
# otherwise if this is the best we have found so far, record that
inflight = curr_inflight
found = node_id

return found

def _refresh_delay_ms(self, node_id):
Expand Down Expand Up @@ -925,7 +924,7 @@ def get_api_versions(self):
assert self.broker_version_data is not None
return self.broker_version_data.api_versions

def check_version(self, node_id=None, timeout=None, **kwargs):
def check_version(self, node_id=None, timeout_ms=None):
"""Attempt to guess the version of a Kafka broker.

Keyword Arguments:
Expand All @@ -934,54 +933,25 @@ def check_version(self, node_id=None, timeout=None, **kwargs):
Default: None
timeout (num, optional): Maximum time in seconds to try to check broker version.
If unable to identify version before timeout, raise error (see below).
Default: api_version_auto_timeout_ms / 1000
Default: None

Returns: version tuple, i.e. (3, 9), (2, 0), (0, 10, 2) etc

Raises:
NodeNotReadyError (if node_id is provided)
NoBrokersAvailable (if node_id is None)
"""
timeout = timeout or (self.config['api_version_auto_timeout_ms'] / 1000)
with self._lock:
end = time.monotonic() + timeout
while time.monotonic() < end:
time_remaining = max(end - time.monotonic(), 0)
if node_id is not None and self.connection_delay(node_id) > 0:
sleep_time = min(time_remaining, self.connection_delay(node_id) / 1000.0)
if sleep_time > 0:
time.sleep(sleep_time)
continue
try_node = node_id or self.least_loaded_node()
if try_node is None:
sleep_time = min(time_remaining, self.least_loaded_node_refresh_ms() / 1000.0)
if sleep_time > 0:
log.warning('No node available during check_version; sleeping %.2f secs', sleep_time)
time.sleep(sleep_time)
continue
log.debug('Attempting to check version with node %s', try_node)
if not self._init_connect(try_node):
if try_node == node_id:
raise Errors.NodeNotReadyError("Connection failed to %s" % node_id)
else:
continue
conn = self._conns[try_node]

while conn.connecting() and time.monotonic() < end:
timeout_ms = min((end - time.monotonic()) * 1000, 200)
self.poll(timeout_ms=timeout_ms)

if conn.broker_version_data is not None:
return conn.broker_version_data.broker_version
else:
log.debug('Failed to identify api_version after connection attempt to %s', conn)
if node_id is not None:
self.await_ready(node_id, timeout_ms=timeout_ms)
return self._conns[node_id].broker_version

# Timeout
else:
if node_id is not None:
raise Errors.NodeNotReadyError(node_id)
else:
raise Errors.NoBrokersAvailable()
# node_id is None, we can use the bootstrap broker_version
self.poll(future=self.broker_version_data_future, timeout_ms=timeout_ms)
if not self.broker_version_data_future.is_done:
raise Errors.NoBrokersAvailable()
elif self.broker_version_data_future.failed():
raise self.broker_version_data_future.exception
return self.broker_version_data.broker_version

def api_version(self, operation, max_version=None):
assert self.broker_version_data is not None
Expand Down
Loading