diff --git a/kafka/conn.py b/kafka/conn.py index 5dfdd625c..0d12dacf5 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -214,10 +214,10 @@ def __init__(self, host, port, afi, broker_version_data=None, **configs): self.afi = afi self._sock_afi = afi self._sock_addr = None - self.broker_version_data = broker_version_data - self._check_version_idx = None - self._api_versions_idx = ApiVersionsRequest.max_version # version to try on first connect + self._api_versions_idx = None self._api_versions_future = None + self._broker_version_data = None + self._check_version_idx = None self._throttle_time = None self._socks5_proxy = None @@ -228,6 +228,9 @@ def __init__(self, host, port, afi, broker_version_data=None, **configs): self.node_id = self.config.pop('node_id') + # Accept cached data if provided + self.broker_version_data = broker_version_data + if self.config['receive_buffer_bytes'] is not None: self.config['socket_options'].append( (socket.SOL_SOCKET, socket.SO_RCVBUF, @@ -267,7 +270,6 @@ def __init__(self, host, port, afi, broker_version_data=None, **configs): self._ssl_context = None if self.config['ssl_context'] is not None: self._ssl_context = self.config['ssl_context'] - self._api_versions_future = None self._api_versions_check_timeout = self.config['api_version_auto_timeout_ms'] self._sasl_auth_future = None self.last_attempt = 0 @@ -284,6 +286,28 @@ def broker_version(self): return None return self.broker_version_data.broker_version + @property + def broker_version_data(self): + return self._broker_version_data + + @broker_version_data.setter + def broker_version_data(self, value): + if value is None: + self._api_versions_idx = ApiVersionsRequest.max_version + self._check_version_idx = 0 + elif not isinstance(value, BrokerVersionData): + raise TypeError('expected BrokerVersionData') + else: + self._broker_version_data = value + # If we got cached broker data, we'll skip to the max supported ApiVersionsRequest + # or, if not supported at all, we'll just rely on the cached api_versions data + try: + self._api_versions_idx = self._broker_version_data.api_version(ApiVersionsRequest) + except Errors.IncompatibleBrokerVersion: + self._api_versions_idx = None + self._check_version_idx = None + self._api_versions_check_timeout = self.config['api_version_auto_timeout_ms'] + def _new_protocol_parser(self): return KafkaProtocol( ident=f'node={self.node_id}[{self.host}:{self.port}]', @@ -491,12 +515,6 @@ def _try_handshake(self): def _try_api_versions_check(self): if self._api_versions_future is None: - if self.broker_version_data is not None: - try: - self._api_versions_idx = self.broker_version_data.api_version(ApiVersionsRequest) - except Errors.IncompatibleBrokerVersion: - log.debug('%s: Using pre-configured api_version %s for ApiVersions', self, self.broker_version) - return True if self._api_versions_idx is not None: version = self._api_versions_idx request = ApiVersionsRequest(version=version, @@ -522,9 +540,12 @@ def _try_api_versions_check(self): self._api_versions_future = future self.state = ConnectionStates.API_VERSIONS_RECV self.config['state_change_callback'](self.node_id, self._sock, self) - else: + elif self.broker_version_data is None: self.close(Errors.KafkaConnectionError('Unable to determine broker version.')) return False + else: + log.debug('%s: Using pre-configured api_version %s for ApiVersions', self, self.broker_version) + return True # Handle any immediate responses self.recv(resolve_futures=True) @@ -572,12 +593,10 @@ def _handle_api_versions_failure(self, future, ex): # but in case they do we always want to try v0 as a fallback if self._api_versions_idx > 0: self._api_versions_idx = 0 - # Only attempt fallback checks if we dont know anything about the cluster. - # Once we have a cached broker_version_data (i.e., after bootstrap) - # this is skipped. - elif self.broker_version_data is None: + # If we have VERSION_CHECKS fallback enabled, disable api versions + elif self._check_version_idx is not None: self._api_versions_idx = None - self._check_version_idx = 0 + # Otherwise, we'll just keep repeating api versions request on reconnect # after failure connection is closed, so state should already be DISCONNECTED def _handle_check_version_response(self, future, version, _response): diff --git a/kafka/protocol/broker_version_data.py b/kafka/protocol/broker_version_data.py index bb22cf68a..3d41e5a25 100644 --- a/kafka/protocol/broker_version_data.py +++ b/kafka/protocol/broker_version_data.py @@ -89,6 +89,10 @@ def api_version(self, operation, max_version=None): .format(operation[0].__name__)) return version + def __eq__(self, other): + return self.broker_version == other.broker_version and self.api_versions == other.api_versions + + def infer_broker_version_from_api_versions(api_versions): # The logic here is to check the list of supported request versions # in reverse order. As soon as we find one that works, return it diff --git a/kafka/protocol/new/api_message.py b/kafka/protocol/new/api_message.py index 09c730658..bea8be812 100644 --- a/kafka/protocol/new/api_message.py +++ b/kafka/protocol/new/api_message.py @@ -268,3 +268,8 @@ def decode(cls, data, version=None, header=False, framed=False): if hdr is not None: ret._header = hdr return ret + + def __eq__(self, other): + if self.version != other.version: + return False + return super().__eq__(other) diff --git a/test/test_client_async.py b/test/test_client_async.py index d9bea4c53..29b2e725b 100644 --- a/test/test_client_async.py +++ b/test/test_client_async.py @@ -49,7 +49,7 @@ def test_bootstrap(mocker, conn): kwargs.pop('node_id') kwargs.pop('broker_version_data') assert kwargs == cli.config - conn.send.assert_called_once_with(MetadataRequest[7]([], True), blocking=False, request_timeout_ms=None) + conn.send.assert_called_once_with(MetadataRequest(topics=[]), blocking=False, request_timeout_ms=None) assert cli._bootstrap_fails == 0 assert cli.cluster.brokers() == list([MetadataResponse.MetadataResponseBroker(0, 'foo', 12, None), MetadataResponse.MetadataResponseBroker(1, 'bar', 34, None)]) diff --git a/test/test_conn.py b/test/test_conn.py index 884db427c..31a26f8b2 100644 --- a/test/test_conn.py +++ b/test/test_conn.py @@ -12,6 +12,7 @@ from kafka.metrics.metrics import Metrics from kafka.metrics.stats.sensor import Sensor from kafka.protocol.broker_version_data import BrokerVersionData, VERSION_CHECKS +from kafka.protocol.new.admin import ListGroupsResponse from kafka.protocol.new.consumer import HeartbeatResponse from kafka.protocol.new.metadata import MetadataRequest, ApiVersionsRequest, ApiVersionsResponse from kafka.protocol.new.producer import ProduceRequest @@ -95,6 +96,41 @@ def test_api_versions_check(_socket, mocker): assert conn.connecting() is True assert conn.state is ConnectionStates.API_VERSIONS_RECV + api_versions_response = ApiVersionsResponse( + version=ApiVersionsRequest.max_version, + error_code=0, + api_keys=[(MetadataRequest.API_KEY, 0, 5)] + ) + conn.recv(responses=[(1, api_versions_response)], resolve_futures=True) + assert conn.broker_version_data.broker_version == (1, 0) + assert conn.broker_version_data.api_versions == {MetadataRequest.API_KEY: (0, 5)} + assert conn.state is ConnectionStates.CONNECTED + + +def test_api_versions_unsupported_versions(_socket, mocker): + conn = BrokerConnection('localhost', 9092, socket.AF_INET) + mocker.patch.object(conn, 'send_pending_requests') + mocker.patch.object(conn, 'connection_delay', return_value=0) + mocker.spy(conn, '_send') + assert conn._api_versions_future is None + conn.connect() + assert conn._api_versions_future is not None + assert conn.connecting() is True + assert conn.state is ConnectionStates.API_VERSIONS_RECV + conn._send.assert_called_with( + ApiVersionsRequest(version=ApiVersionsRequest.max_version, client_software_name='kafka-python', client_software_version=__version__), + blocking=True, + request_timeout_ms=1000.0 + ) + send_call_count = conn._send.call_count + + assert conn._try_api_versions_check() is False + assert conn._send.call_count == send_call_count + assert conn.connecting() is True + assert conn.state is ConnectionStates.API_VERSIONS_RECV + + # If we sent a higher-than-supported version, the broker sends back the min/max + # We should skip to the max supported for our next try. api_versions_error = ApiVersionsResponse( version=0, error_code=Errors.UnsupportedVersionError.errno, @@ -113,8 +149,51 @@ def test_api_versions_check(_socket, mocker): blocking=True, request_timeout_ms=500.0 ) + + api_versions_response = ApiVersionsResponse( + version=1, error_code=0, + api_keys=[(ApiVersionsRequest.API_KEY, 0, 1)] + ) + conn.recv(responses=[(2, api_versions_response)], resolve_futures=True) + assert conn.broker_version_data.broker_version == (0, 10, 0) + assert conn.broker_version_data.api_versions == {ApiVersionsRequest.API_KEY: (0, 1)} + assert conn.state is ConnectionStates.CONNECTED + + conn.close() + assert conn.state is ConnectionStates.DISCONNECTED + + # Reconnect uses the last api versions check + conn.connect() + assert conn.state is ConnectionStates.API_VERSIONS_RECV + conn._send.assert_called_with( + ApiVersionsRequest(version=1, client_software_name='kafka-python', client_software_version=__version__), + blocking=True, + request_timeout_ms=1000.0 + ) + + +def test_api_versions_fallback(_socket, mocker): + conn = BrokerConnection('localhost', 9092, socket.AF_INET) + mocker.patch.object(conn, 'send_pending_requests') + mocker.patch.object(conn, 'connection_delay', return_value=0) + mocker.spy(conn, '_send') + assert conn._api_versions_future is None + conn.connect() + assert conn._api_versions_future is not None + assert conn.connecting() is True + assert conn.state is ConnectionStates.API_VERSIONS_RECV + conn._send.assert_called_with( + ApiVersionsRequest(version=ApiVersionsRequest.max_version, client_software_name='kafka-python', client_software_version=__version__), + blocking=True, + request_timeout_ms=1000.0 + ) send_call_count = conn._send.call_count + assert conn._try_api_versions_check() is False + assert conn._send.call_count == send_call_count + assert conn.connecting() is True + assert conn.state is ConnectionStates.API_VERSIONS_RECV + future = conn._api_versions_future assert not future.is_done conn.close(Errors.RequestTimedOutError()) @@ -130,9 +209,8 @@ def test_api_versions_check(_socket, mocker): conn._send.assert_called_with( ApiVersionsRequest(version=0, client_software_name='kafka-python', client_software_version=__version__), blocking=True, - request_timeout_ms=250.0 + request_timeout_ms=500.0 ) - send_call_count = conn._send.call_count conn.close(Errors.RequestTimedOutError()) assert conn._api_versions_idx == None @@ -146,6 +224,93 @@ def test_api_versions_check(_socket, mocker): conn._send.assert_called_with( VERSION_CHECKS[0][1], blocking=True, + request_timeout_ms=250.0 + ) + + conn.recv(responses=[(1, ListGroupsResponse())], resolve_futures=True) + + assert conn.broker_version_data == BrokerVersionData((0, 9)) + assert conn.state is ConnectionStates.CONNECTED + + conn.close() + assert conn.state is ConnectionStates.DISCONNECTED + + # Reconnect skips check versions + conn.connect() + assert conn.state is ConnectionStates.CONNECTED + + +def test_api_versions_check_with_broker_version_data(_socket, mocker): + conn = BrokerConnection('localhost', 9092, socket.AF_INET, broker_version_data=BrokerVersionData((1, 0))) + mocker.patch.object(conn, 'send_pending_requests') + mocker.patch.object(conn, 'connection_delay', return_value=0) + mocker.spy(conn, '_send') + assert conn._api_versions_future is None + conn.connect() + assert conn._api_versions_future is not None + assert conn.connecting() is True + assert conn.state is ConnectionStates.API_VERSIONS_RECV + conn._send.assert_called_with( + ApiVersionsRequest(version=1, client_software_name='kafka-python', client_software_version=__version__), + blocking=True, + request_timeout_ms=1000.0 + ) + send_call_count = conn._send.call_count + + assert conn._try_api_versions_check() is False + assert conn._send.call_count == send_call_count + assert conn.connecting() is True + assert conn.state is ConnectionStates.API_VERSIONS_RECV + + api_versions_error = ApiVersionsResponse( + version=0, + error_code=Errors.UnsupportedVersionError.errno, + api_keys=[(ApiVersionsRequest.API_KEY, 0, 1)] + ) + conn.recv(responses=[(1, api_versions_error)], resolve_futures=True) + assert conn._api_versions_idx == 0 + assert conn._api_versions_future is None + assert conn.state is ConnectionStates.API_VERSIONS_SEND + + assert conn._try_api_versions_check() is False + assert conn.connecting() is True + assert conn.state is ConnectionStates.API_VERSIONS_RECV + conn._send.assert_called_with( + ApiVersionsRequest(version=0, client_software_name='kafka-python', client_software_version=__version__), + blocking=True, + request_timeout_ms=500.0 + ) + + future = conn._api_versions_future + assert not future.is_done + conn.close(Errors.RequestTimedOutError()) + assert future.failed() + assert future.exception == Errors.RequestTimedOutError() + assert conn._api_versions_idx == 0 + assert conn._api_versions_future is None + assert conn.state is ConnectionStates.DISCONNECTED + + conn.connect() + assert conn.connecting() is True + assert conn.state is ConnectionStates.API_VERSIONS_RECV + conn._send.assert_called_with( + ApiVersionsRequest(version=0, client_software_name='kafka-python', client_software_version=__version__), + blocking=True, + request_timeout_ms=250.0 + ) + + conn.close(Errors.RequestTimedOutError()) + assert conn._api_versions_idx == 0 + assert conn._api_versions_future is None + assert conn._check_version_idx is None + assert conn.state is ConnectionStates.DISCONNECTED + + conn.connect() + assert conn.connecting() is True + assert conn.state is ConnectionStates.API_VERSIONS_RECV + conn._send.assert_called_with( + ApiVersionsRequest(version=0, client_software_name='kafka-python', client_software_version=__version__), + blocking=True, request_timeout_ms=125.0 )