博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
kafka服务端源代码分析之Broker元数据(MetadataCache)
阅读量:2426 次
发布时间:2019-05-10

本文共 5390 字,大约阅读时间需要 17 分钟。

1. 什么是元数据缓存(MetadataCache)

MetadataCache指Broker上的元数据缓存,这些数据是Controller通过 UpdateMetadataRequest请求发送给Broker的。为什么每台 Broker 上都要保存这份相同的数据呢?主要有以下原因:

  1. Broker 就能够及时响应客户端发送的元数据请求。不需要所有的请求都发送Controller所在的节点,可以有效均衡Controller节点的负载。
  2. Kafka的一些重要组件会用到这部分数据。比如副本管理器会使用它来获取 Broker 的节点信息,事务管理器会使用它来获取分区 Leader 副本的信息。
2. MetadataCache源代码
class MetadataCache(brokerId: Int // broker id					) extends Logging {
...// metadataSnapshot实际保存了元数据@volatile private var metadataSnapshot: MetadataSnapshot = MetadataSnapshot(partitionStates = mutable.AnyRefMap.empty, controllerId = None, aliveBrokers = mutable.LongMap.empty, aliveNodes = mutable.LongMap.empty)...}

MetadataCache的实例化是在Kafka Broker启动时完成的,具体的调用发生在 KafkaServer类的startup 方法中。

// KafkaServer.scaladef startup(): Unit = {
...... metadataCache = new MetadataCache(config.brokerId) ...... }

重要方法

1.判断类:

// 判断给定主题是否包含在元数据缓存中  def contains(topic: String): Boolean = {
metadataSnapshot.partitionStates.contains(topic) } // 判断给定主题分区是否包含在元数据缓存中 def contains(tp: TopicPartition): Boolean = getPartitionInfo(tp.topic, tp.partition).isDefined

2.获取类方法

// 获取所有topic  private def getAllTopics(snapshot: MetadataSnapshot): Set[String] = {
snapshot.partitionStates.keySet } // 获取所有分区 private def getAllPartitions(snapshot: MetadataSnapshot): Map[TopicPartition, UpdateMetadataRequest.PartitionState] = {
snapshot.partitionStates.flatMap {
case (topic, partitionStates) => partitionStates.map {
case (partition, state ) => (new TopicPartition(topic, partition.toInt), state) } }.toMap } //获取不存在的topics def getNonExistingTopics(topics: Set[String]): Set[String] = {
topics -- metadataSnapshot.partitionStates.keySet }

3.更新类方法

元数据更新的方法是updateMetadata,它的方法体较复杂,流程图如下:

在这里插入图片描述

def updateMetadata(correlationId: Int, updateMetadataRequest: UpdateMetadataRequest): Seq[TopicPartition] = {
inWriteLock(partitionMetadataLock) {
val aliveBrokers = new mutable.LongMap[Broker](metadataSnapshot.aliveBrokers.size) val aliveNodes = new mutable.LongMap[collection.Map[ListenerName, Node]](metadataSnapshot.aliveNodes.size) // 从UpdateMetadataRequest中获取Controller所在的Broker ID val controllerId = updateMetadataRequest.controllerId match {
case id if id < 0 => None case id => Some(id) } // 遍历UpdateMetadataRequest请求中的所有存活Broker对象 updateMetadataRequest.liveBrokers.asScala.foreach {
broker => val nodes = new java.util.HashMap[ListenerName, Node] val endPoints = new mutable.ArrayBuffer[EndPoint] broker.endPoints.asScala.foreach {
ep => endPoints += EndPoint(ep.host, ep.port, ep.listenerName, ep.securityProtocol) nodes.put(ep.listenerName, new Node(broker.id, ep.host, ep.port)) } // 将Broker加入到存活Broker对象集合 aliveBrokers(broker.id) = Broker(broker.id, endPoints, Option(broker.rack)) // 将Broker节点加入到存活节点对象集合 aliveNodes(broker.id) = nodes.asScala } aliveNodes.get(brokerId).foreach {
listenerMap => val listeners = listenerMap.keySet if (!aliveNodes.values.forall(_.keySet == listeners)) error(s"Listeners are not identical across brokers: $aliveNodes") } // 构造已删除分区数组,将其作为方法返回结果 val deletedPartitions = new mutable.ArrayBuffer[TopicPartition] // UpdateMetadataRequest请求没有携带任何分区信息 if (updateMetadataRequest.partitionStates().isEmpty) {
// 使用之前的分区信息和新的Broker列表信息,构造新的MetadataSnapshot对象 metadataSnapshot = MetadataSnapshot(metadataSnapshot.partitionStates, controllerId, aliveBrokers, aliveNodes) } else {
// 备份现有元数据缓存中的分区数据 val partitionStates = new mutable.AnyRefMap[String, mutable.LongMap[UpdateMetadataRequest.PartitionState]](metadataSnapshot.partitionStates.size) metadataSnapshot.partitionStates.foreach {
case (topic, oldPartitionStates) => val copy = new mutable.LongMap[UpdateMetadataRequest.PartitionState](oldPartitionStates.size) copy ++= oldPartitionStates partitionStates += (topic -> copy) } // 遍历UpdateMetadataRequest请求中的所有分区 updateMetadataRequest.partitionStates.asScala.foreach {
case (tp, info) => val controllerId = updateMetadataRequest.controllerId val controllerEpoch = updateMetadataRequest.controllerEpoch // 如果分区处于被删除过程中 if (info.basePartitionState.leader == LeaderAndIsr.LeaderDuringDelete) {
// 将分区从备份分区数据中移除 removePartitionInfo(partitionStates, tp.topic, tp.partition) stateChangeLogger.trace(s"Deleted partition $tp from metadata cache in response to UpdateMetadata " + s"request sent by controller $controllerId epoch $controllerEpoch with correlation id $correlationId") deletedPartitions += tp } else {
// 将分区加入到备份分区数据 addOrUpdatePartitionInfo(partitionStates, tp.topic, tp.partition, info) stateChangeLogger.trace(s"Cached leader info $info for partition $tp in response to " + s"UpdateMetadata request sent by controller $controllerId epoch $controllerEpoch with correlation id $correlationId") } } // 使用备份分区数据和aliveBrokers, aliveNodes构建最新的元数据缓存 metadataSnapshot = MetadataSnapshot(partitionStates, controllerId, aliveBrokers, aliveNodes) } deletedPartitions } }

转载地址:http://grcmb.baihongyu.com/

你可能感兴趣的文章
回首互联网十年,我们能从八次烧钱大战中学到什么
查看>>
漫画:如何辨别二逼互联网公司!?
查看>>
麒麟信安面向场景化创新,赋能openEuler商业验证
查看>>
王者又连跪了?快让 AI 帮你上分!
查看>>
1 分钟带你认识从 "�" 到 "锟斤拷"
查看>>
3 年培养 10 万“码农”,郑州推出“码农计划”
查看>>
一个三本程序猿的大厂逆袭之路
查看>>
程序员弃码投中医?还做成了不错的生意! | 极客视频
查看>>
百度一 29 岁程序员因“篡改数据”被抓
查看>>
去年我年薪 30W,今年我一天做 3 顿饭
查看>>
入职大厂,我容易吗?
查看>>
《互联网人退化简史》
查看>>
CTO 写的低级 Bug 再致网站被黑,CEO 的号都被盗了!
查看>>
955 加班少的公司名单来了!
查看>>
狂赚 1227 亿!腾讯员工 2020 年人均年薪 81 万;小米员工人均年薪 45 万
查看>>
漫画:什么是加密算法?
查看>>
程序员有话说 |当那个不靠谱的程序员跟我做同一个项目时
查看>>
怎样以程序员的方式来用百度呢?
查看>>
程序员是如何运用增长思维找到女朋友?
查看>>
@程序员,离职让企业损失近900亿,还遭疯抢!他凭什么?
查看>>