Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 35 additions & 16 deletions kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,10 +214,10 @@ def __init__(self, host, port, afi, broker_version_data=None, **configs):
self.afi = afi
self._sock_afi = afi
self._sock_addr = None
self.broker_version_data = broker_version_data
self._check_version_idx = None
self._api_versions_idx = ApiVersionsRequest.max_version # version to try on first connect
self._api_versions_idx = None
self._api_versions_future = None
self._broker_version_data = None
self._check_version_idx = None
self._throttle_time = None
self._socks5_proxy = None

Expand All @@ -228,6 +228,9 @@ def __init__(self, host, port, afi, broker_version_data=None, **configs):

self.node_id = self.config.pop('node_id')

# Accept cached data if provided
self.broker_version_data = broker_version_data

if self.config['receive_buffer_bytes'] is not None:
self.config['socket_options'].append(
(socket.SOL_SOCKET, socket.SO_RCVBUF,
Expand Down Expand Up @@ -267,7 +270,6 @@ def __init__(self, host, port, afi, broker_version_data=None, **configs):
self._ssl_context = None
if self.config['ssl_context'] is not None:
self._ssl_context = self.config['ssl_context']
self._api_versions_future = None
self._api_versions_check_timeout = self.config['api_version_auto_timeout_ms']
self._sasl_auth_future = None
self.last_attempt = 0
Expand All @@ -284,6 +286,28 @@ def broker_version(self):
return None
return self.broker_version_data.broker_version

@property
def broker_version_data(self):
return self._broker_version_data

@broker_version_data.setter
def broker_version_data(self, value):
if value is None:
self._api_versions_idx = ApiVersionsRequest.max_version
self._check_version_idx = 0
elif not isinstance(value, BrokerVersionData):
raise TypeError('expected BrokerVersionData')
else:
self._broker_version_data = value
# If we got cached broker data, we'll skip to the max supported ApiVersionsRequest
# or, if not supported at all, we'll just rely on the cached api_versions data
try:
self._api_versions_idx = self._broker_version_data.api_version(ApiVersionsRequest)
except Errors.IncompatibleBrokerVersion:
self._api_versions_idx = None
self._check_version_idx = None
self._api_versions_check_timeout = self.config['api_version_auto_timeout_ms']

def _new_protocol_parser(self):
return KafkaProtocol(
ident=f'node={self.node_id}[{self.host}:{self.port}]',
Expand Down Expand Up @@ -491,12 +515,6 @@ def _try_handshake(self):

def _try_api_versions_check(self):
if self._api_versions_future is None:
if self.broker_version_data is not None:
try:
self._api_versions_idx = self.broker_version_data.api_version(ApiVersionsRequest)
except Errors.IncompatibleBrokerVersion:
log.debug('%s: Using pre-configured api_version %s for ApiVersions', self, self.broker_version)
return True
if self._api_versions_idx is not None:
version = self._api_versions_idx
request = ApiVersionsRequest(version=version,
Expand All @@ -522,9 +540,12 @@ def _try_api_versions_check(self):
self._api_versions_future = future
self.state = ConnectionStates.API_VERSIONS_RECV
self.config['state_change_callback'](self.node_id, self._sock, self)
else:
elif self.broker_version_data is None:
self.close(Errors.KafkaConnectionError('Unable to determine broker version.'))
return False
else:
log.debug('%s: Using pre-configured api_version %s for ApiVersions', self, self.broker_version)
return True

# Handle any immediate responses
self.recv(resolve_futures=True)
Expand Down Expand Up @@ -572,12 +593,10 @@ def _handle_api_versions_failure(self, future, ex):
# but in case they do we always want to try v0 as a fallback
if self._api_versions_idx > 0:
self._api_versions_idx = 0
# Only attempt fallback checks if we dont know anything about the cluster.
# Once we have a cached broker_version_data (i.e., after bootstrap)
# this is skipped.
elif self.broker_version_data is None:
# If we have VERSION_CHECKS fallback enabled, disable api versions
elif self._check_version_idx is not None:
self._api_versions_idx = None
self._check_version_idx = 0
# Otherwise, we'll just keep repeating api versions request on reconnect
# after failure connection is closed, so state should already be DISCONNECTED

def _handle_check_version_response(self, future, version, _response):
Expand Down
4 changes: 4 additions & 0 deletions kafka/protocol/broker_version_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ def api_version(self, operation, max_version=None):
.format(operation[0].__name__))
return version

def __eq__(self, other):
return self.broker_version == other.broker_version and self.api_versions == other.api_versions


def infer_broker_version_from_api_versions(api_versions):
# The logic here is to check the list of supported request versions
# in reverse order. As soon as we find one that works, return it
Expand Down
5 changes: 5 additions & 0 deletions kafka/protocol/new/api_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,3 +268,8 @@ def decode(cls, data, version=None, header=False, framed=False):
if hdr is not None:
ret._header = hdr
return ret

def __eq__(self, other):
if self.version != other.version:
return False
return super().__eq__(other)
2 changes: 1 addition & 1 deletion test/test_client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def test_bootstrap(mocker, conn):
kwargs.pop('node_id')
kwargs.pop('broker_version_data')
assert kwargs == cli.config
conn.send.assert_called_once_with(MetadataRequest[7]([], True), blocking=False, request_timeout_ms=None)
conn.send.assert_called_once_with(MetadataRequest(topics=[]), blocking=False, request_timeout_ms=None)
assert cli._bootstrap_fails == 0
assert cli.cluster.brokers() == list([MetadataResponse.MetadataResponseBroker(0, 'foo', 12, None),
MetadataResponse.MetadataResponseBroker(1, 'bar', 34, None)])
Expand Down
169 changes: 167 additions & 2 deletions test/test_conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from kafka.metrics.metrics import Metrics
from kafka.metrics.stats.sensor import Sensor
from kafka.protocol.broker_version_data import BrokerVersionData, VERSION_CHECKS
from kafka.protocol.new.admin import ListGroupsResponse
from kafka.protocol.new.consumer import HeartbeatResponse
from kafka.protocol.new.metadata import MetadataRequest, ApiVersionsRequest, ApiVersionsResponse
from kafka.protocol.new.producer import ProduceRequest
Expand Down Expand Up @@ -95,6 +96,41 @@ def test_api_versions_check(_socket, mocker):
assert conn.connecting() is True
assert conn.state is ConnectionStates.API_VERSIONS_RECV

api_versions_response = ApiVersionsResponse(
version=ApiVersionsRequest.max_version,
error_code=0,
api_keys=[(MetadataRequest.API_KEY, 0, 5)]
)
conn.recv(responses=[(1, api_versions_response)], resolve_futures=True)
assert conn.broker_version_data.broker_version == (1, 0)
assert conn.broker_version_data.api_versions == {MetadataRequest.API_KEY: (0, 5)}
assert conn.state is ConnectionStates.CONNECTED


def test_api_versions_unsupported_versions(_socket, mocker):
conn = BrokerConnection('localhost', 9092, socket.AF_INET)
mocker.patch.object(conn, 'send_pending_requests')
mocker.patch.object(conn, 'connection_delay', return_value=0)
mocker.spy(conn, '_send')
assert conn._api_versions_future is None
conn.connect()
assert conn._api_versions_future is not None
assert conn.connecting() is True
assert conn.state is ConnectionStates.API_VERSIONS_RECV
conn._send.assert_called_with(
ApiVersionsRequest(version=ApiVersionsRequest.max_version, client_software_name='kafka-python', client_software_version=__version__),
blocking=True,
request_timeout_ms=1000.0
)
send_call_count = conn._send.call_count

assert conn._try_api_versions_check() is False
assert conn._send.call_count == send_call_count
assert conn.connecting() is True
assert conn.state is ConnectionStates.API_VERSIONS_RECV

# If we sent a higher-than-supported version, the broker sends back the min/max
# We should skip to the max supported for our next try.
api_versions_error = ApiVersionsResponse(
version=0,
error_code=Errors.UnsupportedVersionError.errno,
Expand All @@ -113,8 +149,51 @@ def test_api_versions_check(_socket, mocker):
blocking=True,
request_timeout_ms=500.0
)

api_versions_response = ApiVersionsResponse(
version=1, error_code=0,
api_keys=[(ApiVersionsRequest.API_KEY, 0, 1)]
)
conn.recv(responses=[(2, api_versions_response)], resolve_futures=True)
assert conn.broker_version_data.broker_version == (0, 10, 0)
assert conn.broker_version_data.api_versions == {ApiVersionsRequest.API_KEY: (0, 1)}
assert conn.state is ConnectionStates.CONNECTED

conn.close()
assert conn.state is ConnectionStates.DISCONNECTED

# Reconnect uses the last api versions check
conn.connect()
assert conn.state is ConnectionStates.API_VERSIONS_RECV
conn._send.assert_called_with(
ApiVersionsRequest(version=1, client_software_name='kafka-python', client_software_version=__version__),
blocking=True,
request_timeout_ms=1000.0
)


def test_api_versions_fallback(_socket, mocker):
conn = BrokerConnection('localhost', 9092, socket.AF_INET)
mocker.patch.object(conn, 'send_pending_requests')
mocker.patch.object(conn, 'connection_delay', return_value=0)
mocker.spy(conn, '_send')
assert conn._api_versions_future is None
conn.connect()
assert conn._api_versions_future is not None
assert conn.connecting() is True
assert conn.state is ConnectionStates.API_VERSIONS_RECV
conn._send.assert_called_with(
ApiVersionsRequest(version=ApiVersionsRequest.max_version, client_software_name='kafka-python', client_software_version=__version__),
blocking=True,
request_timeout_ms=1000.0
)
send_call_count = conn._send.call_count

assert conn._try_api_versions_check() is False
assert conn._send.call_count == send_call_count
assert conn.connecting() is True
assert conn.state is ConnectionStates.API_VERSIONS_RECV

future = conn._api_versions_future
assert not future.is_done
conn.close(Errors.RequestTimedOutError())
Expand All @@ -130,9 +209,8 @@ def test_api_versions_check(_socket, mocker):
conn._send.assert_called_with(
ApiVersionsRequest(version=0, client_software_name='kafka-python', client_software_version=__version__),
blocking=True,
request_timeout_ms=250.0
request_timeout_ms=500.0
)
send_call_count = conn._send.call_count

conn.close(Errors.RequestTimedOutError())
assert conn._api_versions_idx == None
Expand All @@ -146,6 +224,93 @@ def test_api_versions_check(_socket, mocker):
conn._send.assert_called_with(
VERSION_CHECKS[0][1],
blocking=True,
request_timeout_ms=250.0
)

conn.recv(responses=[(1, ListGroupsResponse())], resolve_futures=True)

assert conn.broker_version_data == BrokerVersionData((0, 9))
assert conn.state is ConnectionStates.CONNECTED

conn.close()
assert conn.state is ConnectionStates.DISCONNECTED

# Reconnect skips check versions
conn.connect()
assert conn.state is ConnectionStates.CONNECTED


def test_api_versions_check_with_broker_version_data(_socket, mocker):
conn = BrokerConnection('localhost', 9092, socket.AF_INET, broker_version_data=BrokerVersionData((1, 0)))
mocker.patch.object(conn, 'send_pending_requests')
mocker.patch.object(conn, 'connection_delay', return_value=0)
mocker.spy(conn, '_send')
assert conn._api_versions_future is None
conn.connect()
assert conn._api_versions_future is not None
assert conn.connecting() is True
assert conn.state is ConnectionStates.API_VERSIONS_RECV
conn._send.assert_called_with(
ApiVersionsRequest(version=1, client_software_name='kafka-python', client_software_version=__version__),
blocking=True,
request_timeout_ms=1000.0
)
send_call_count = conn._send.call_count

assert conn._try_api_versions_check() is False
assert conn._send.call_count == send_call_count
assert conn.connecting() is True
assert conn.state is ConnectionStates.API_VERSIONS_RECV

api_versions_error = ApiVersionsResponse(
version=0,
error_code=Errors.UnsupportedVersionError.errno,
api_keys=[(ApiVersionsRequest.API_KEY, 0, 1)]
)
conn.recv(responses=[(1, api_versions_error)], resolve_futures=True)
assert conn._api_versions_idx == 0
assert conn._api_versions_future is None
assert conn.state is ConnectionStates.API_VERSIONS_SEND

assert conn._try_api_versions_check() is False
assert conn.connecting() is True
assert conn.state is ConnectionStates.API_VERSIONS_RECV
conn._send.assert_called_with(
ApiVersionsRequest(version=0, client_software_name='kafka-python', client_software_version=__version__),
blocking=True,
request_timeout_ms=500.0
)

future = conn._api_versions_future
assert not future.is_done
conn.close(Errors.RequestTimedOutError())
assert future.failed()
assert future.exception == Errors.RequestTimedOutError()
assert conn._api_versions_idx == 0
assert conn._api_versions_future is None
assert conn.state is ConnectionStates.DISCONNECTED

conn.connect()
assert conn.connecting() is True
assert conn.state is ConnectionStates.API_VERSIONS_RECV
conn._send.assert_called_with(
ApiVersionsRequest(version=0, client_software_name='kafka-python', client_software_version=__version__),
blocking=True,
request_timeout_ms=250.0
)

conn.close(Errors.RequestTimedOutError())
assert conn._api_versions_idx == 0
assert conn._api_versions_future is None
assert conn._check_version_idx is None
assert conn.state is ConnectionStates.DISCONNECTED

conn.connect()
assert conn.connecting() is True
assert conn.state is ConnectionStates.API_VERSIONS_RECV
conn._send.assert_called_with(
ApiVersionsRequest(version=0, client_software_name='kafka-python', client_software_version=__version__),
blocking=True,
request_timeout_ms=125.0
)

Expand Down
Loading