Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 26 additions & 15 deletions kafka/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from kafka import errors as Errors
from kafka.future import Future
from kafka.net.wakeup_notifier import WakeupNotifier
from kafka.protocol.metadata import MetadataRequest, MetadataResponse
from kafka.protocol.metadata import MetadataRequest, MetadataResponse, CoordinatorType
from kafka.structs import TopicPartition
from kafka.util import ensure_valid_topic_name

Expand Down Expand Up @@ -386,17 +386,18 @@ def partitions_for_broker(self, broker_id):
"""
return self._broker_partitions.get(broker_id)

def coordinator_for_group(self, group):
"""Return node_id of group coordinator.
def get_coordinator(self, key, key_type=CoordinatorType.GROUP):
"""Return node_id of group coordinator from cache.

Arguments:
group (str): name of consumer group
key (str): name of consumer group or transaction_id
key_type (CoordinatorType, optional): Default GROUP

Returns:
node_id (int or str) for group coordinator, -1 if coordinator unknown
node_id (int or str) for coordinator, -1 if coordinator unknown
None if the group does not exist.
"""
return self._coordinators.get(('group', group))
return self._coordinators.get((key_type, key))

def ttl(self):
"""Milliseconds until metadata should be refreshed"""
Expand Down Expand Up @@ -652,34 +653,44 @@ def remove_listener(self, listener):
except KeyError:
pass

def add_coordinator(self, response, key_type, key):
def add_coordinator(self, response, key_type, key, synthesize_node_id=True):
"""Update with metadata for a group or txn coordinator

Arguments:
response (FindCoordinatorResponse): broker response
key_type (str): 'group' or 'transaction'
key_type (CoordinatorType): GROUP / TRANSACTION / SHARE
key (str): consumer_group or transactional_id
synthesize_node_id (bool): If True synthesizes a unique
node_id to generate a dedicated network connection for
coordinator requests. Default: True.

Returns:
string: coordinator node_id if metadata is updated, None on error
string: coordinator node_id.

Raises:
BrokerResponseError: if ``response.error_code`` is non-zero.
"""
log.debug("Updating coordinator for %s/%s: %s", key_type, key, response)
key_type = CoordinatorType.build_from(key_type)
log.debug("Updating coordinator for %s/%s: %s", key_type.name, key, response)
error_type = Errors.for_code(response.error_code)
if error_type is not Errors.NoError:
log.error("FindCoordinatorResponse error: %s", error_type)
self._coordinators[(key_type, key)] = -1
return
raise error_type(
"FindCoordinatorResponse error for %s/%s: %s"
% (key_type.name, key, getattr(response, 'error_message', '')))

# Use a coordinator-specific node id so that requests
# get a dedicated connection
node_id = 'coordinator-{}'.format(response.node_id)
if synthesize_node_id:
node_id = 'coordinator-{}'.format(response.node_id)
else:
node_id = response.node_id
coordinator = MetadataResponse.MetadataResponseBroker(
node_id,
response.host,
response.port,
None)

log.info("Coordinator for %s/%s is %s", key_type, key, coordinator)
log.info("Coordinator for %s/%s is %s", key_type.name, key, coordinator)
self._coordinator_brokers[node_id] = coordinator
self._coordinators[(key_type, key)] = node_id
return node_id
Expand Down
11 changes: 3 additions & 8 deletions kafka/coordinator/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from kafka.metrics import AnonMeasurable
from kafka.metrics.stats import Avg, Count, Max, Rate
from kafka.net.wakeup_notifier import WakeupNotifier
from kafka.protocol.metadata import FindCoordinatorRequest
from kafka.protocol.metadata import FindCoordinatorRequest, CoordinatorType
from kafka.protocol.consumer import (
HeartbeatRequest, JoinGroupRequest, LeaveGroupRequest, SyncGroupRequest,
DEFAULT_GENERATION_ID, UNKNOWN_MEMBER_ID,
Expand Down Expand Up @@ -852,13 +852,8 @@ def _handle_find_coordinator_response(self, response):
error_type = Errors.for_code(result.error_code)
if error_type is Errors.NoError:
with self._lock:
coordinator_id = self._cluster.add_coordinator(result, 'group', self.group_id)
if not coordinator_id:
# This could happen if coordinator metadata is different
# than broker metadata
raise Errors.IllegalStateError()

self.coordinator_id = coordinator_id
self.coordinator_id = self._cluster.add_coordinator(
result, CoordinatorType.GROUP, self.group_id)
log.info("Discovered coordinator %s for group %s",
self.coordinator_id, self.group_id)
self._client.maybe_connect(self.coordinator_id)
Expand Down
44 changes: 19 additions & 25 deletions kafka/producer/transaction_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import threading

import kafka.errors as Errors
from kafka.protocol.metadata import FindCoordinatorRequest
from kafka.protocol.metadata import FindCoordinatorRequest, CoordinatorType
from kafka.protocol.producer import (
AddOffsetsToTxnRequest, AddPartitionsToTxnRequest,
EndTxnRequest, InitProducerIdRequest, TxnOffsetCommitRequest,
Expand Down Expand Up @@ -657,9 +657,9 @@ def authentication_failed(self, exc):
request.fatal_error(exc)

def coordinator(self, coord_type):
if coord_type == 'group':
if coord_type == CoordinatorType.GROUP:
return self._consumer_group_coordinator
elif coord_type == 'transaction':
elif coord_type == CoordinatorType.TRANSACTION:
return self._transaction_coordinator
else:
raise Errors.IllegalStateError("Received an invalid coordinator type: %s" % (coord_type,))
Expand Down Expand Up @@ -756,9 +756,9 @@ def _enqueue_request(self, request_handler):

def _lookup_coordinator(self, coord_type, coord_key):
with self._lock:
if coord_type == 'group':
if coord_type == CoordinatorType.GROUP:
self._consumer_group_coordinator = None
elif coord_type == 'transaction':
elif coord_type == CoordinatorType.TRANSACTION:
self._transaction_coordinator = None
else:
raise Errors.IllegalStateError("Invalid coordinator type: %s" % (coord_type,))
Expand Down Expand Up @@ -877,7 +877,7 @@ def result(self):

@property
def coordinator_type(self):
return 'transaction'
return CoordinatorType.TRANSACTION

@property
def coordinator_key(self):
Expand Down Expand Up @@ -936,7 +936,7 @@ def coordinator_type(self):
# coordinator -- InitProducerIdRequest can be sent to any broker.
if self.transaction_manager.transactional_id is None:
return None
return 'transaction'
return CoordinatorType.TRANSACTION

def handle_response(self, response):
error_type = Errors.for_code(response.error_code)
Expand All @@ -950,7 +950,7 @@ def handle_response(self, response):
self._result.done()
elif issubclass(error_type, Errors.RetriableError):
if error_type in (Errors.NotCoordinatorError, Errors.CoordinatorNotAvailableError):
self.transaction_manager._lookup_coordinator('transaction', self.transactional_id)
self.transaction_manager._lookup_coordinator(CoordinatorType.TRANSACTION, self.transactional_id)
self.reenqueue()
elif error_type is Errors.InvalidProducerEpochError and self._is_epoch_bump:
# KIP-360: our (producer_id, epoch) are stale--the broker no
Expand Down Expand Up @@ -1007,7 +1007,7 @@ def handle_response(self, response):
continue
elif issubclass(error_type, Errors.RetriableError):
if error_type in (Errors.CoordinatorNotAvailableError, Errors.NotCoordinatorError):
self.transaction_manager._lookup_coordinator('transaction', self.transactional_id)
self.transaction_manager._lookup_coordinator(CoordinatorType.TRANSACTION, self.transactional_id)
elif error_type is Errors.ConcurrentTransactionsError:
self.maybe_override_retry_backoff_ms()
self.reenqueue()
Expand Down Expand Up @@ -1067,21 +1067,15 @@ class FindCoordinatorHandler(TxnRequestHandler):
def __init__(self, transaction_manager, coord_type, coord_key):
super().__init__(transaction_manager)

self._coord_type = coord_type
self._coord_type = CoordinatorType.build_from(coord_type)
self._coord_key = coord_key
if coord_type == 'group':
coord_type_int8 = 0
elif coord_type == 'transaction':
coord_type_int8 = 1
else:
raise ValueError("Unrecognized coordinator type: %s" % (coord_type,))
# Setting key, key_type, and coordinator_keys all at once lets the
# connection layer negotiate any version: v0-v3 emit `key`/`key_type`,
# v4+ (KIP-699) emit `key_type`/`coordinator_keys`.
self.request = FindCoordinatorRequest(
key=coord_key,
key_type=coord_type_int8,
coordinator_keys=[coord_key],
key=self._coord_key,
key_type=self._coord_type.value,
coordinator_keys=[self._coord_key],
)

@property
Expand All @@ -1105,9 +1099,9 @@ def handle_response(self, response):
if error_type is Errors.NoError:
coordinator_id = self.transaction_manager._metadata.add_coordinator(
result, self._coord_type, self._coord_key)
if self._coord_type == 'group':
if self._coord_type == CoordinatorType.GROUP:
self.transaction_manager._consumer_group_coordinator = coordinator_id
elif self._coord_type == 'transaction':
elif self._coord_type == CoordinatorType.TRANSACTION:
self.transaction_manager._transaction_coordinator = coordinator_id
self._result.done()
elif issubclass(error_type, Errors.RetriableError):
Expand Down Expand Up @@ -1144,7 +1138,7 @@ def handle_response(self, response):
self._result.done()
elif issubclass(error_type, Errors.RetriableError):
if error_type in (Errors.CoordinatorNotAvailableError, Errors.NotCoordinatorError):
self.transaction_manager._lookup_coordinator('transaction', self.transactional_id)
self.transaction_manager._lookup_coordinator(CoordinatorType.TRANSACTION, self.transactional_id)
self.reenqueue()
elif error_type in (Errors.InvalidProducerEpochError, Errors.ProducerFencedError):
# Java client normalizes INVALID_PRODUCER_EPOCH to PRODUCER_FENCED
Expand Down Expand Up @@ -1194,7 +1188,7 @@ def handle_response(self, response):
self.transaction_manager._transaction_started = True
elif issubclass(error_type, Errors.RetriableError):
if error_type in (Errors.CoordinatorNotAvailableError, Errors.NotCoordinatorError):
self.transaction_manager._lookup_coordinator('transaction', self.transactional_id)
self.transaction_manager._lookup_coordinator(CoordinatorType.TRANSACTION, self.transactional_id)
self.reenqueue()
elif error_type in (Errors.CoordinatorLoadInProgressError, Errors.ConcurrentTransactionsError):
self.reenqueue()
Expand Down Expand Up @@ -1260,7 +1254,7 @@ def priority(self):

@property
def coordinator_type(self):
return 'group'
return CoordinatorType.GROUP

@property
def coordinator_key(self):
Expand Down Expand Up @@ -1318,7 +1312,7 @@ def handle_response(self, response):
return

if lookup_coordinator:
self.transaction_manager._lookup_coordinator('group', self.consumer_group_id)
self.transaction_manager._lookup_coordinator(CoordinatorType.GROUP, self.consumer_group_id)

if not retriable_failure:
# all attempted partitions were either successful, or there was a fatal failure.
Expand Down
11 changes: 10 additions & 1 deletion kafka/protocol/metadata/find_coordinator.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,19 @@
from enum import IntEnum

from ..api_message import ApiMessage
from kafka.util import EnumHelper


class CoordinatorType(EnumHelper, IntEnum):
GROUP = 0
TRANSACTION = 1
SHARE = 2


class FindCoordinatorRequest(ApiMessage): pass
class FindCoordinatorResponse(ApiMessage): pass


__all__ = [
'FindCoordinatorRequest', 'FindCoordinatorResponse',
'CoordinatorType', 'FindCoordinatorRequest', 'FindCoordinatorResponse',
]
8 changes: 7 additions & 1 deletion kafka/protocol/metadata/find_coordinator.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,16 @@
import uuid
from typing import Any, Self

from enum import IntEnum
from kafka.protocol.api_message import ApiMessage
from kafka.protocol.data_container import DataContainer

__all__ = ['FindCoordinatorRequest', 'FindCoordinatorResponse']
__all__ = ['CoordinatorType', 'FindCoordinatorRequest', 'FindCoordinatorResponse']

class CoordinatorType(EnumHelper, IntEnum):
GROUP: int
TRANSACTION: int
SHARE: int

class FindCoordinatorRequest(ApiMessage):
key: str
Expand Down
2 changes: 1 addition & 1 deletion test/producer/test_transaction_manager_mock_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
TransactionState,
TxnOffsetCommitHandler,
)
from kafka.protocol.metadata import FindCoordinatorResponse, MetadataResponse
from kafka.protocol.metadata import FindCoordinatorResponse, MetadataResponse, CoordinatorType
from kafka.protocol.producer import (
AddOffsetsToTxnResponse,
AddPartitionsToTxnResponse,
Expand Down
Loading