diff --git a/kafka/cluster.py b/kafka/cluster.py index 3f2e39aa7..561f22c17 100644 --- a/kafka/cluster.py +++ b/kafka/cluster.py @@ -12,7 +12,7 @@ from kafka import errors as Errors from kafka.future import Future from kafka.net.wakeup_notifier import WakeupNotifier -from kafka.protocol.metadata import MetadataRequest, MetadataResponse +from kafka.protocol.metadata import MetadataRequest, MetadataResponse, CoordinatorType from kafka.structs import TopicPartition from kafka.util import ensure_valid_topic_name @@ -386,17 +386,18 @@ def partitions_for_broker(self, broker_id): """ return self._broker_partitions.get(broker_id) - def coordinator_for_group(self, group): - """Return node_id of group coordinator. + def get_coordinator(self, key, key_type=CoordinatorType.GROUP): + """Return node_id of group coordinator from cache. Arguments: - group (str): name of consumer group + key (str): name of consumer group or transaction_id + key_type (CoordinatorType, optional): Default GROUP Returns: - node_id (int or str) for group coordinator, -1 if coordinator unknown + node_id (int or str) for coordinator, -1 if coordinator unknown None if the group does not exist. """ - return self._coordinators.get(('group', group)) + return self._coordinators.get((key_type, key)) def ttl(self): """Milliseconds until metadata should be refreshed""" @@ -652,34 +653,44 @@ def remove_listener(self, listener): except KeyError: pass - def add_coordinator(self, response, key_type, key): + def add_coordinator(self, response, key_type, key, synthesize_node_id=True): """Update with metadata for a group or txn coordinator Arguments: response (FindCoordinatorResponse): broker response - key_type (str): 'group' or 'transaction' + key_type (CoordinatorType): GROUP / TRANSACTION / SHARE key (str): consumer_group or transactional_id + synthesize_node_id (bool): If True synthesizes a unique + node_id to generate a dedicated network connection for + coordinator requests. Default: True. Returns: - string: coordinator node_id if metadata is updated, None on error + string: coordinator node_id. + + Raises: + BrokerResponseError: if ``response.error_code`` is non-zero. """ - log.debug("Updating coordinator for %s/%s: %s", key_type, key, response) + key_type = CoordinatorType.build_from(key_type) + log.debug("Updating coordinator for %s/%s: %s", key_type.name, key, response) error_type = Errors.for_code(response.error_code) if error_type is not Errors.NoError: - log.error("FindCoordinatorResponse error: %s", error_type) - self._coordinators[(key_type, key)] = -1 - return + raise error_type( + "FindCoordinatorResponse error for %s/%s: %s" + % (key_type.name, key, getattr(response, 'error_message', ''))) # Use a coordinator-specific node id so that requests # get a dedicated connection - node_id = 'coordinator-{}'.format(response.node_id) + if synthesize_node_id: + node_id = 'coordinator-{}'.format(response.node_id) + else: + node_id = response.node_id coordinator = MetadataResponse.MetadataResponseBroker( node_id, response.host, response.port, None) - log.info("Coordinator for %s/%s is %s", key_type, key, coordinator) + log.info("Coordinator for %s/%s is %s", key_type.name, key, coordinator) self._coordinator_brokers[node_id] = coordinator self._coordinators[(key_type, key)] = node_id return node_id diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 8d15fa11b..1e44f81cc 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -11,7 +11,7 @@ from kafka.metrics import AnonMeasurable from kafka.metrics.stats import Avg, Count, Max, Rate from kafka.net.wakeup_notifier import WakeupNotifier -from kafka.protocol.metadata import FindCoordinatorRequest +from kafka.protocol.metadata import FindCoordinatorRequest, CoordinatorType from kafka.protocol.consumer import ( HeartbeatRequest, JoinGroupRequest, LeaveGroupRequest, SyncGroupRequest, DEFAULT_GENERATION_ID, UNKNOWN_MEMBER_ID, @@ -852,13 +852,8 @@ def _handle_find_coordinator_response(self, response): error_type = Errors.for_code(result.error_code) if error_type is Errors.NoError: with self._lock: - coordinator_id = self._cluster.add_coordinator(result, 'group', self.group_id) - if not coordinator_id: - # This could happen if coordinator metadata is different - # than broker metadata - raise Errors.IllegalStateError() - - self.coordinator_id = coordinator_id + self.coordinator_id = self._cluster.add_coordinator( + result, CoordinatorType.GROUP, self.group_id) log.info("Discovered coordinator %s for group %s", self.coordinator_id, self.group_id) self._client.maybe_connect(self.coordinator_id) diff --git a/kafka/producer/transaction_manager.py b/kafka/producer/transaction_manager.py index 3236497e8..36e8a823f 100644 --- a/kafka/producer/transaction_manager.py +++ b/kafka/producer/transaction_manager.py @@ -6,7 +6,7 @@ import threading import kafka.errors as Errors -from kafka.protocol.metadata import FindCoordinatorRequest +from kafka.protocol.metadata import FindCoordinatorRequest, CoordinatorType from kafka.protocol.producer import ( AddOffsetsToTxnRequest, AddPartitionsToTxnRequest, EndTxnRequest, InitProducerIdRequest, TxnOffsetCommitRequest, @@ -657,9 +657,9 @@ def authentication_failed(self, exc): request.fatal_error(exc) def coordinator(self, coord_type): - if coord_type == 'group': + if coord_type == CoordinatorType.GROUP: return self._consumer_group_coordinator - elif coord_type == 'transaction': + elif coord_type == CoordinatorType.TRANSACTION: return self._transaction_coordinator else: raise Errors.IllegalStateError("Received an invalid coordinator type: %s" % (coord_type,)) @@ -756,9 +756,9 @@ def _enqueue_request(self, request_handler): def _lookup_coordinator(self, coord_type, coord_key): with self._lock: - if coord_type == 'group': + if coord_type == CoordinatorType.GROUP: self._consumer_group_coordinator = None - elif coord_type == 'transaction': + elif coord_type == CoordinatorType.TRANSACTION: self._transaction_coordinator = None else: raise Errors.IllegalStateError("Invalid coordinator type: %s" % (coord_type,)) @@ -877,7 +877,7 @@ def result(self): @property def coordinator_type(self): - return 'transaction' + return CoordinatorType.TRANSACTION @property def coordinator_key(self): @@ -936,7 +936,7 @@ def coordinator_type(self): # coordinator -- InitProducerIdRequest can be sent to any broker. if self.transaction_manager.transactional_id is None: return None - return 'transaction' + return CoordinatorType.TRANSACTION def handle_response(self, response): error_type = Errors.for_code(response.error_code) @@ -950,7 +950,7 @@ def handle_response(self, response): self._result.done() elif issubclass(error_type, Errors.RetriableError): if error_type in (Errors.NotCoordinatorError, Errors.CoordinatorNotAvailableError): - self.transaction_manager._lookup_coordinator('transaction', self.transactional_id) + self.transaction_manager._lookup_coordinator(CoordinatorType.TRANSACTION, self.transactional_id) self.reenqueue() elif error_type is Errors.InvalidProducerEpochError and self._is_epoch_bump: # KIP-360: our (producer_id, epoch) are stale--the broker no @@ -1007,7 +1007,7 @@ def handle_response(self, response): continue elif issubclass(error_type, Errors.RetriableError): if error_type in (Errors.CoordinatorNotAvailableError, Errors.NotCoordinatorError): - self.transaction_manager._lookup_coordinator('transaction', self.transactional_id) + self.transaction_manager._lookup_coordinator(CoordinatorType.TRANSACTION, self.transactional_id) elif error_type is Errors.ConcurrentTransactionsError: self.maybe_override_retry_backoff_ms() self.reenqueue() @@ -1067,21 +1067,15 @@ class FindCoordinatorHandler(TxnRequestHandler): def __init__(self, transaction_manager, coord_type, coord_key): super().__init__(transaction_manager) - self._coord_type = coord_type + self._coord_type = CoordinatorType.build_from(coord_type) self._coord_key = coord_key - if coord_type == 'group': - coord_type_int8 = 0 - elif coord_type == 'transaction': - coord_type_int8 = 1 - else: - raise ValueError("Unrecognized coordinator type: %s" % (coord_type,)) # Setting key, key_type, and coordinator_keys all at once lets the # connection layer negotiate any version: v0-v3 emit `key`/`key_type`, # v4+ (KIP-699) emit `key_type`/`coordinator_keys`. self.request = FindCoordinatorRequest( - key=coord_key, - key_type=coord_type_int8, - coordinator_keys=[coord_key], + key=self._coord_key, + key_type=self._coord_type.value, + coordinator_keys=[self._coord_key], ) @property @@ -1105,9 +1099,9 @@ def handle_response(self, response): if error_type is Errors.NoError: coordinator_id = self.transaction_manager._metadata.add_coordinator( result, self._coord_type, self._coord_key) - if self._coord_type == 'group': + if self._coord_type == CoordinatorType.GROUP: self.transaction_manager._consumer_group_coordinator = coordinator_id - elif self._coord_type == 'transaction': + elif self._coord_type == CoordinatorType.TRANSACTION: self.transaction_manager._transaction_coordinator = coordinator_id self._result.done() elif issubclass(error_type, Errors.RetriableError): @@ -1144,7 +1138,7 @@ def handle_response(self, response): self._result.done() elif issubclass(error_type, Errors.RetriableError): if error_type in (Errors.CoordinatorNotAvailableError, Errors.NotCoordinatorError): - self.transaction_manager._lookup_coordinator('transaction', self.transactional_id) + self.transaction_manager._lookup_coordinator(CoordinatorType.TRANSACTION, self.transactional_id) self.reenqueue() elif error_type in (Errors.InvalidProducerEpochError, Errors.ProducerFencedError): # Java client normalizes INVALID_PRODUCER_EPOCH to PRODUCER_FENCED @@ -1194,7 +1188,7 @@ def handle_response(self, response): self.transaction_manager._transaction_started = True elif issubclass(error_type, Errors.RetriableError): if error_type in (Errors.CoordinatorNotAvailableError, Errors.NotCoordinatorError): - self.transaction_manager._lookup_coordinator('transaction', self.transactional_id) + self.transaction_manager._lookup_coordinator(CoordinatorType.TRANSACTION, self.transactional_id) self.reenqueue() elif error_type in (Errors.CoordinatorLoadInProgressError, Errors.ConcurrentTransactionsError): self.reenqueue() @@ -1260,7 +1254,7 @@ def priority(self): @property def coordinator_type(self): - return 'group' + return CoordinatorType.GROUP @property def coordinator_key(self): @@ -1318,7 +1312,7 @@ def handle_response(self, response): return if lookup_coordinator: - self.transaction_manager._lookup_coordinator('group', self.consumer_group_id) + self.transaction_manager._lookup_coordinator(CoordinatorType.GROUP, self.consumer_group_id) if not retriable_failure: # all attempted partitions were either successful, or there was a fatal failure. diff --git a/kafka/protocol/metadata/find_coordinator.py b/kafka/protocol/metadata/find_coordinator.py index 9df38724f..f73bddff9 100644 --- a/kafka/protocol/metadata/find_coordinator.py +++ b/kafka/protocol/metadata/find_coordinator.py @@ -1,4 +1,13 @@ +from enum import IntEnum + from ..api_message import ApiMessage +from kafka.util import EnumHelper + + +class CoordinatorType(EnumHelper, IntEnum): + GROUP = 0 + TRANSACTION = 1 + SHARE = 2 class FindCoordinatorRequest(ApiMessage): pass @@ -6,5 +15,5 @@ class FindCoordinatorResponse(ApiMessage): pass __all__ = [ - 'FindCoordinatorRequest', 'FindCoordinatorResponse', + 'CoordinatorType', 'FindCoordinatorRequest', 'FindCoordinatorResponse', ] diff --git a/kafka/protocol/metadata/find_coordinator.pyi b/kafka/protocol/metadata/find_coordinator.pyi index 7ccb99b95..39ad7ea4d 100644 --- a/kafka/protocol/metadata/find_coordinator.pyi +++ b/kafka/protocol/metadata/find_coordinator.pyi @@ -2,10 +2,16 @@ import uuid from typing import Any, Self +from enum import IntEnum from kafka.protocol.api_message import ApiMessage from kafka.protocol.data_container import DataContainer -__all__ = ['FindCoordinatorRequest', 'FindCoordinatorResponse'] +__all__ = ['CoordinatorType', 'FindCoordinatorRequest', 'FindCoordinatorResponse'] + +class CoordinatorType(EnumHelper, IntEnum): + GROUP: int + TRANSACTION: int + SHARE: int class FindCoordinatorRequest(ApiMessage): key: str diff --git a/test/producer/test_transaction_manager_mock_broker.py b/test/producer/test_transaction_manager_mock_broker.py index 44251d7d1..d040507d4 100644 --- a/test/producer/test_transaction_manager_mock_broker.py +++ b/test/producer/test_transaction_manager_mock_broker.py @@ -27,7 +27,7 @@ TransactionState, TxnOffsetCommitHandler, ) -from kafka.protocol.metadata import FindCoordinatorResponse, MetadataResponse +from kafka.protocol.metadata import FindCoordinatorResponse, MetadataResponse, CoordinatorType from kafka.protocol.producer import ( AddOffsetsToTxnResponse, AddPartitionsToTxnResponse,