博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
kafka服务端源代码分析之Controller(1)
阅读量:2426 次
发布时间:2019-05-10

本文共 6298 字,大约阅读时间需要 20 分钟。

1.controller简介

Controller是kafka中非常重要的组件,主要包括集群成员管理(Broker上下线)和Topic管理(创建,修改,删除)等功能。所有有关集群的元数据基本都会跟Controller打交道,因此有必要仔细了解Controller的实现原理。

2.controller请求发送管理

Controller会给集群中的所有Broker(包括它自己所在的 Broker)机器发送网络请求,

在这里插入图片描述

  • LeaderAndIsrRequest:最主要的功能是,告诉 Broker 相关主题各个分区的 Leader 副本位于哪台 Broker 上、ISR 中的副本都在哪些 Broker 上。在我看来,它应该被赋予最高的优先级,毕竟,它有令数据类请求直接失效的本领。试想一下,如果这个请求中的 Leader 副本变更了,之前发往老的 Leader 的 PRODUCE 请求是不是全部失效了?因此,我认为它是非常重要的控制类请求。
  • StopReplicaRequest:告知指定 Broker 停止它上面的副本对象,该请求甚至还能删除副本底层的日志数据。这个请求主要的使用场景,是分区副本迁移和删除主题。在这两个场景下,都要涉及停掉 Broker 上的副本操作。
  • UpdateMetadataRequest:顾名思义,该请求会更新 Broker 上的元数据缓存。集群上的所有元数据变更,都首先发生在 Controller 端,然后再经由这个请求广播给集群上的所有 Broker。在我刚刚分享的案例中,正是因为这个请求被处理得不及时,才导致集群 Broker 无法获取到最新的元数据信息。

在这里插入图片描述

RequestSendThread

class RequestSendThread(val controllerId: Int, //  Controller所在Broker的Id                        val controllerContext: ControllerContext, // Controller元数据信息                        val queue: BlockingQueue[QueueItem], // 请求阻塞队列                        val networkClient: NetworkClient, // 用于执行发送的网络I/O类                        val brokerNode: Node, // 目标Broker节点                        val config: KafkaConfig, // Kafka配置信息                        val time: Time,                        val requestRateAndQueueTimeMetrics: Timer,                        val stateChangeLogger: StateChangeLogger,                        name: String)  extends ShutdownableThread(name = name) {
}
override def doWork(): Unit = {
def backoff(): Unit = pause(100, TimeUnit.MILLISECONDS) val QueueItem(apiKey, requestBuilder, callback, enqueueTimeMs) = queue.take() requestRateAndQueueTimeMetrics.update(time.milliseconds() - enqueueTimeMs, TimeUnit.MILLISECONDS) var clientResponse: ClientResponse = null try {
var isSendSuccessful = false while (isRunning && !isSendSuccessful) {
try {
// 如果没有创建与目标Broker的TCP连接,或连接暂时不可用 if (!brokerReady()) {
isSendSuccessful = false // 等待重试 backoff() } else {
val clientRequest = networkClient.newClientRequest(brokerNode.idString, requestBuilder, time.milliseconds(), true) // 发送请求,等待接收Response clientResponse = NetworkClientUtils.sendAndReceive(networkClient, clientRequest, time) isSendSuccessful = true } } catch {
case e: Throwable => // if the send was not successful, reconnect to broker and resend the message warn(s"Controller $controllerId epoch ${controllerContext.epoch} fails to send request $requestBuilder " + s"to broker $brokerNode. Reconnecting to broker.", e) // 如果出现异常,关闭与对应Broker的连接 networkClient.close(brokerNode.idString) isSendSuccessful = false backoff() } } // 如果接收到了Response if (clientResponse != null) {
val requestHeader = clientResponse.requestHeader val api = requestHeader.apiKey // 此Response的请求类型必须是LeaderAndIsrRequest、StopReplicaRequest或UpdateMetadataRequest中的一种 if (api != ApiKeys.LEADER_AND_ISR && api != ApiKeys.STOP_REPLICA && api != ApiKeys.UPDATE_METADATA) throw new KafkaException(s"Unexpected apiKey received: $apiKey") val response = clientResponse.responseBody stateChangeLogger.withControllerEpoch(controllerContext.epoch).trace(s"Received response " + s"${response.toString(requestHeader.apiVersion)} for request $api with correlation id " + s"${requestHeader.correlationId} sent to broker $brokerNode") if (callback != null) {
callback(response) // 执行回调 } } } catch {
case e: Throwable => error(s"Controller $controllerId fails to send a request to broker $brokerNode", e) networkClient.close(brokerNode.idString) } }

ControllerChannelManager

protected val brokerStateInfo = new HashMap[Int, ControllerBrokerStateInfo]
case class ControllerBrokerStateInfo(networkClient: NetworkClient,                                     brokerNode: Node,  // 目标 Broker节点对象,封装了IP和端口等                                     messageQueue: BlockingQueue[QueueItem], // 请求消息阻塞队列                                     requestSendThread: RequestSendThread, // Controller向Broker发送请求的线程。                                     queueSizeGauge: Gauge[Int],                                     requestRateAndTimeMetrics: Timer)
3.controller请求处理

在这里插入图片描述

class ControllerEventManager(controllerId: Int, rateAndTimeMetrics: Map[ControllerState, KafkaTimer],                             eventProcessedListener: ControllerEvent => Unit,                             controllerMovedListener: () => Unit) extends KafkaMetricsGroup {
... // 事件队列 private val queue = new LinkedBlockingQueue[ControllerEvent] // 事件处理线程 private[controller] val thread = new ControllerEventThread(ControllerEventManager.ControllerEventThreadName) def start(): Unit = thread.start() def put(event: ControllerEvent): Unit = inLock(putLock) {
queue.put(event) } ... }
class ControllerEventThread(name: String) extends ShutdownableThread(name = name, isInterruptible = false) {
logIdent = s"[ControllerEventThread controllerId=$controllerId] " override def doWork(): Unit = {
queue.take() match {
// 如果是关闭线程事件,什么都不用做。关闭线程由外部来执行 case KafkaController.ShutdownEventThread => initiateShutdown() case controllerEvent => _state = controllerEvent.state // 更新对应事件在队列中保存的时间 eventQueueTimeHist.update(time.milliseconds() - controllerEvent.enqueueTimeMs) try {
// 调用ControllerEvent process方法处理事件,同时计算处理速率 rateAndTimeMetrics(state).time {
controllerEvent.process() } } catch {
case e: ControllerMovedException => info(s"Controller moved to another broker when processing $controllerEvent.", e) controllerMovedListener() case e: Throwable => error(s"Error processing event $controllerEvent", e) } try eventProcessedListener(controllerEvent) catch {
case e: Throwable => error(s"Error while invoking listener for processed event $controllerEvent", e) } _state = ControllerState.Idle } } }

转载地址:http://qrcmb.baihongyu.com/

你可能感兴趣的文章
Storm精华问答 | task与executor有什么关系?
查看>>
K8S安全军规101:对CNCF最佳实践的扩充
查看>>
要闻君说:小米手机部组织架构突现大调整;河南联通重启VDC扩容工程招标;英特尔已收购Ineda Systems,剑指独显;...
查看>>
Docker精华问答 | task与executor有什么关系?
查看>>
2019年中国IT市场趋势热点
查看>>
2019年最值得关注的五大微服务发展趋势
查看>>
你与数据科学家只差这26条python技巧
查看>>
云漫圈 | 什么是DNS?什么是DNS污染?什么又是DNS劫持?
查看>>
阿里产品岗需是技术出身?分享技术转型产品的成功经验
查看>>
全面剖析企业私有云
查看>>
云评测 | OpenStack智能运维解决方案 @文末有福利!
查看>>
要闻君说: 百度云喜提信息安全首证;紫光展锐携5G芯片进击2019MWC;OPPO首发5G手机惊艳亮相……...
查看>>
基于Kubernetes的持续部署方案
查看>>
Spring精华问答 | Spring Boot有哪些优点?
查看>>
推动边缘计算的七项核心技术
查看>>
边缘计算精华问答 | 边缘计算需要IaaS、PaaS、SaaS等服务能力吗?
查看>>
Spark精华问答 | Spark 会替代Hadoop 吗?
查看>>
豆瓣已玩烂,来爬点有逼格的 ——IMDB 电影提升你的品位
查看>>
一部刷爆朋友圈的5G短片,看完才知道5G多暖多重要!
查看>>
要闻君说:华云数据“豪气”收购超融合厂商Maxta;VMware有意“携手”微软Azure云;亚马逊获3亿美元云计算合同...
查看>>