本文共 5390 字,大约阅读时间需要 17 分钟。
MetadataCache指Broker上的元数据缓存,这些数据是Controller通过 UpdateMetadataRequest请求发送给Broker的。为什么每台 Broker 上都要保存这份相同的数据呢?主要有以下原因:
class MetadataCache(brokerId: Int // broker id ) extends Logging { ...// metadataSnapshot实际保存了元数据@volatile private var metadataSnapshot: MetadataSnapshot = MetadataSnapshot(partitionStates = mutable.AnyRefMap.empty, controllerId = None, aliveBrokers = mutable.LongMap.empty, aliveNodes = mutable.LongMap.empty)...}
MetadataCache的实例化是在Kafka Broker启动时完成的,具体的调用发生在 KafkaServer类的startup 方法中。
// KafkaServer.scaladef startup(): Unit = { ...... metadataCache = new MetadataCache(config.brokerId) ...... }
重要方法
1.判断类:
// 判断给定主题是否包含在元数据缓存中 def contains(topic: String): Boolean = { metadataSnapshot.partitionStates.contains(topic) } // 判断给定主题分区是否包含在元数据缓存中 def contains(tp: TopicPartition): Boolean = getPartitionInfo(tp.topic, tp.partition).isDefined
2.获取类方法
// 获取所有topic private def getAllTopics(snapshot: MetadataSnapshot): Set[String] = { snapshot.partitionStates.keySet } // 获取所有分区 private def getAllPartitions(snapshot: MetadataSnapshot): Map[TopicPartition, UpdateMetadataRequest.PartitionState] = { snapshot.partitionStates.flatMap { case (topic, partitionStates) => partitionStates.map { case (partition, state ) => (new TopicPartition(topic, partition.toInt), state) } }.toMap } //获取不存在的topics def getNonExistingTopics(topics: Set[String]): Set[String] = { topics -- metadataSnapshot.partitionStates.keySet }
3.更新类方法
元数据更新的方法是updateMetadata,它的方法体较复杂,流程图如下:
def updateMetadata(correlationId: Int, updateMetadataRequest: UpdateMetadataRequest): Seq[TopicPartition] = { inWriteLock(partitionMetadataLock) { val aliveBrokers = new mutable.LongMap[Broker](metadataSnapshot.aliveBrokers.size) val aliveNodes = new mutable.LongMap[collection.Map[ListenerName, Node]](metadataSnapshot.aliveNodes.size) // 从UpdateMetadataRequest中获取Controller所在的Broker ID val controllerId = updateMetadataRequest.controllerId match { case id if id < 0 => None case id => Some(id) } // 遍历UpdateMetadataRequest请求中的所有存活Broker对象 updateMetadataRequest.liveBrokers.asScala.foreach { broker => val nodes = new java.util.HashMap[ListenerName, Node] val endPoints = new mutable.ArrayBuffer[EndPoint] broker.endPoints.asScala.foreach { ep => endPoints += EndPoint(ep.host, ep.port, ep.listenerName, ep.securityProtocol) nodes.put(ep.listenerName, new Node(broker.id, ep.host, ep.port)) } // 将Broker加入到存活Broker对象集合 aliveBrokers(broker.id) = Broker(broker.id, endPoints, Option(broker.rack)) // 将Broker节点加入到存活节点对象集合 aliveNodes(broker.id) = nodes.asScala } aliveNodes.get(brokerId).foreach { listenerMap => val listeners = listenerMap.keySet if (!aliveNodes.values.forall(_.keySet == listeners)) error(s"Listeners are not identical across brokers: $aliveNodes") } // 构造已删除分区数组,将其作为方法返回结果 val deletedPartitions = new mutable.ArrayBuffer[TopicPartition] // UpdateMetadataRequest请求没有携带任何分区信息 if (updateMetadataRequest.partitionStates().isEmpty) { // 使用之前的分区信息和新的Broker列表信息,构造新的MetadataSnapshot对象 metadataSnapshot = MetadataSnapshot(metadataSnapshot.partitionStates, controllerId, aliveBrokers, aliveNodes) } else { // 备份现有元数据缓存中的分区数据 val partitionStates = new mutable.AnyRefMap[String, mutable.LongMap[UpdateMetadataRequest.PartitionState]](metadataSnapshot.partitionStates.size) metadataSnapshot.partitionStates.foreach { case (topic, oldPartitionStates) => val copy = new mutable.LongMap[UpdateMetadataRequest.PartitionState](oldPartitionStates.size) copy ++= oldPartitionStates partitionStates += (topic -> copy) } // 遍历UpdateMetadataRequest请求中的所有分区 updateMetadataRequest.partitionStates.asScala.foreach { case (tp, info) => val controllerId = updateMetadataRequest.controllerId val controllerEpoch = updateMetadataRequest.controllerEpoch // 如果分区处于被删除过程中 if (info.basePartitionState.leader == LeaderAndIsr.LeaderDuringDelete) { // 将分区从备份分区数据中移除 removePartitionInfo(partitionStates, tp.topic, tp.partition) stateChangeLogger.trace(s"Deleted partition $tp from metadata cache in response to UpdateMetadata " + s"request sent by controller $controllerId epoch $controllerEpoch with correlation id $correlationId") deletedPartitions += tp } else { // 将分区加入到备份分区数据 addOrUpdatePartitionInfo(partitionStates, tp.topic, tp.partition, info) stateChangeLogger.trace(s"Cached leader info $info for partition $tp in response to " + s"UpdateMetadata request sent by controller $controllerId epoch $controllerEpoch with correlation id $correlationId") } } // 使用备份分区数据和aliveBrokers, aliveNodes构建最新的元数据缓存 metadataSnapshot = MetadataSnapshot(partitionStates, controllerId, aliveBrokers, aliveNodes) } deletedPartitions } }
转载地址:http://grcmb.baihongyu.com/