From d92f2d24ec7b68cc63cc5135f43b6304eaba101a Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 29 Mar 2026 15:01:37 -0700 Subject: [PATCH] kafka.client_async: simplify check_version --- kafka/client_async.py | 54 ++++++++++--------------------------------- 1 file changed, 12 insertions(+), 42 deletions(-) diff --git a/kafka/client_async.py b/kafka/client_async.py index fdb057ee1..216798786 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -833,7 +833,6 @@ def least_loaded_node(self): # otherwise if this is the best we have found so far, record that inflight = curr_inflight found = node_id - return found def _refresh_delay_ms(self, node_id): @@ -925,7 +924,7 @@ def get_api_versions(self): assert self.broker_version_data is not None return self.broker_version_data.api_versions - def check_version(self, node_id=None, timeout=None, **kwargs): + def check_version(self, node_id=None, timeout_ms=None): """Attempt to guess the version of a Kafka broker. Keyword Arguments: @@ -934,7 +933,7 @@ def check_version(self, node_id=None, timeout=None, **kwargs): Default: None timeout (num, optional): Maximum time in seconds to try to check broker version. If unable to identify version before timeout, raise error (see below). - Default: api_version_auto_timeout_ms / 1000 + Default: None Returns: version tuple, i.e. (3, 9), (2, 0), (0, 10, 2) etc @@ -942,46 +941,17 @@ def check_version(self, node_id=None, timeout=None, **kwargs): NodeNotReadyError (if node_id is provided) NoBrokersAvailable (if node_id is None) """ - timeout = timeout or (self.config['api_version_auto_timeout_ms'] / 1000) - with self._lock: - end = time.monotonic() + timeout - while time.monotonic() < end: - time_remaining = max(end - time.monotonic(), 0) - if node_id is not None and self.connection_delay(node_id) > 0: - sleep_time = min(time_remaining, self.connection_delay(node_id) / 1000.0) - if sleep_time > 0: - time.sleep(sleep_time) - continue - try_node = node_id or self.least_loaded_node() - if try_node is None: - sleep_time = min(time_remaining, self.least_loaded_node_refresh_ms() / 1000.0) - if sleep_time > 0: - log.warning('No node available during check_version; sleeping %.2f secs', sleep_time) - time.sleep(sleep_time) - continue - log.debug('Attempting to check version with node %s', try_node) - if not self._init_connect(try_node): - if try_node == node_id: - raise Errors.NodeNotReadyError("Connection failed to %s" % node_id) - else: - continue - conn = self._conns[try_node] - - while conn.connecting() and time.monotonic() < end: - timeout_ms = min((end - time.monotonic()) * 1000, 200) - self.poll(timeout_ms=timeout_ms) - - if conn.broker_version_data is not None: - return conn.broker_version_data.broker_version - else: - log.debug('Failed to identify api_version after connection attempt to %s', conn) + if node_id is not None: + self.await_ready(node_id, timeout_ms=timeout_ms) + return self._conns[node_id].broker_version - # Timeout - else: - if node_id is not None: - raise Errors.NodeNotReadyError(node_id) - else: - raise Errors.NoBrokersAvailable() + # node_id is None, we can use the bootstrap broker_version + self.poll(future=self.broker_version_data_future, timeout_ms=timeout_ms) + if not self.broker_version_data_future.is_done: + raise Errors.NoBrokersAvailable() + elif self.broker_version_data_future.failed(): + raise self.broker_version_data_future.exception + return self.broker_version_data.broker_version def api_version(self, operation, max_version=None): assert self.broker_version_data is not None