diff --git a/kafka/conn.py b/kafka/conn.py index a146fd984..a65474a0a 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -220,6 +220,7 @@ def __init__(self, host, port, afi, broker_version_data=None, **configs): self._api_versions_future = None self._throttle_time = None self._socks5_proxy = None + self.connect_future = Future() self.config = copy.copy(self.DEFAULT_CONFIG) for key in self.config: @@ -408,6 +409,7 @@ def connect(self): log.info('%s: Connection complete.', self) self._reset_reconnect_backoff() self.config['state_change_callback'](self.node_id, self._sock, self) + self.connect_future.success(self) if self.state is ConnectionStates.AUTHENTICATING: assert self.config['security_protocol'] in ('SASL_PLAINTEXT', 'SASL_SSL') @@ -418,6 +420,7 @@ def connect(self): log.info('%s: Connection complete.', self) self._reset_reconnect_backoff() self.config['state_change_callback'](self.node_id, self._sock, self) + self.connect_future.success(self) if self.state not in (ConnectionStates.CONNECTED, ConnectionStates.DISCONNECTED): @@ -906,6 +909,9 @@ def close(self, error=None): sock = self._sock self._sock = None self._socks5_proxy = None + if not self.connect_future.is_done: + self.connect_future.failure(error) + self.connect_future = Future() # drop lock before state change callback and processing futures self.config['state_change_callback'](self.node_id, sock, self)