本文共 6298 字,大约阅读时间需要 20 分钟。
Controller是kafka中非常重要的组件,主要包括集群成员管理(Broker上下线)和Topic管理(创建,修改,删除)等功能。所有有关集群的元数据基本都会跟Controller打交道,因此有必要仔细了解Controller的实现原理。
Controller会给集群中的所有Broker(包括它自己所在的 Broker)机器发送网络请求,
RequestSendThread
class RequestSendThread(val controllerId: Int, // Controller所在Broker的Id val controllerContext: ControllerContext, // Controller元数据信息 val queue: BlockingQueue[QueueItem], // 请求阻塞队列 val networkClient: NetworkClient, // 用于执行发送的网络I/O类 val brokerNode: Node, // 目标Broker节点 val config: KafkaConfig, // Kafka配置信息 val time: Time, val requestRateAndQueueTimeMetrics: Timer, val stateChangeLogger: StateChangeLogger, name: String) extends ShutdownableThread(name = name) { }
override def doWork(): Unit = { def backoff(): Unit = pause(100, TimeUnit.MILLISECONDS) val QueueItem(apiKey, requestBuilder, callback, enqueueTimeMs) = queue.take() requestRateAndQueueTimeMetrics.update(time.milliseconds() - enqueueTimeMs, TimeUnit.MILLISECONDS) var clientResponse: ClientResponse = null try { var isSendSuccessful = false while (isRunning && !isSendSuccessful) { try { // 如果没有创建与目标Broker的TCP连接,或连接暂时不可用 if (!brokerReady()) { isSendSuccessful = false // 等待重试 backoff() } else { val clientRequest = networkClient.newClientRequest(brokerNode.idString, requestBuilder, time.milliseconds(), true) // 发送请求,等待接收Response clientResponse = NetworkClientUtils.sendAndReceive(networkClient, clientRequest, time) isSendSuccessful = true } } catch { case e: Throwable => // if the send was not successful, reconnect to broker and resend the message warn(s"Controller $controllerId epoch ${controllerContext.epoch} fails to send request $requestBuilder " + s"to broker $brokerNode. Reconnecting to broker.", e) // 如果出现异常,关闭与对应Broker的连接 networkClient.close(brokerNode.idString) isSendSuccessful = false backoff() } } // 如果接收到了Response if (clientResponse != null) { val requestHeader = clientResponse.requestHeader val api = requestHeader.apiKey // 此Response的请求类型必须是LeaderAndIsrRequest、StopReplicaRequest或UpdateMetadataRequest中的一种 if (api != ApiKeys.LEADER_AND_ISR && api != ApiKeys.STOP_REPLICA && api != ApiKeys.UPDATE_METADATA) throw new KafkaException(s"Unexpected apiKey received: $apiKey") val response = clientResponse.responseBody stateChangeLogger.withControllerEpoch(controllerContext.epoch).trace(s"Received response " + s"${response.toString(requestHeader.apiVersion)} for request $api with correlation id " + s"${requestHeader.correlationId} sent to broker $brokerNode") if (callback != null) { callback(response) // 执行回调 } } } catch { case e: Throwable => error(s"Controller $controllerId fails to send a request to broker $brokerNode", e) networkClient.close(brokerNode.idString) } }
ControllerChannelManager
protected val brokerStateInfo = new HashMap[Int, ControllerBrokerStateInfo]
case class ControllerBrokerStateInfo(networkClient: NetworkClient, brokerNode: Node, // 目标 Broker节点对象,封装了IP和端口等 messageQueue: BlockingQueue[QueueItem], // 请求消息阻塞队列 requestSendThread: RequestSendThread, // Controller向Broker发送请求的线程。 queueSizeGauge: Gauge[Int], requestRateAndTimeMetrics: Timer)
class ControllerEventManager(controllerId: Int, rateAndTimeMetrics: Map[ControllerState, KafkaTimer], eventProcessedListener: ControllerEvent => Unit, controllerMovedListener: () => Unit) extends KafkaMetricsGroup { ... // 事件队列 private val queue = new LinkedBlockingQueue[ControllerEvent] // 事件处理线程 private[controller] val thread = new ControllerEventThread(ControllerEventManager.ControllerEventThreadName) def start(): Unit = thread.start() def put(event: ControllerEvent): Unit = inLock(putLock) { queue.put(event) } ... }
class ControllerEventThread(name: String) extends ShutdownableThread(name = name, isInterruptible = false) { logIdent = s"[ControllerEventThread controllerId=$controllerId] " override def doWork(): Unit = { queue.take() match { // 如果是关闭线程事件,什么都不用做。关闭线程由外部来执行 case KafkaController.ShutdownEventThread => initiateShutdown() case controllerEvent => _state = controllerEvent.state // 更新对应事件在队列中保存的时间 eventQueueTimeHist.update(time.milliseconds() - controllerEvent.enqueueTimeMs) try { // 调用ControllerEvent process方法处理事件,同时计算处理速率 rateAndTimeMetrics(state).time { controllerEvent.process() } } catch { case e: ControllerMovedException => info(s"Controller moved to another broker when processing $controllerEvent.", e) controllerMovedListener() case e: Throwable => error(s"Error processing event $controllerEvent", e) } try eventProcessedListener(controllerEvent) catch { case e: Throwable => error(s"Error while invoking listener for processed event $controllerEvent", e) } _state = ControllerState.Idle } } }
转载地址:http://qrcmb.baihongyu.com/