Administrator
发布于 2024-10-13 / 21 阅读
0
0

kafka 的创建topic 流程(一)更新中

kafka 的创建topic 流程(一)更新中

kafka 版本 2.6

创建一个 topic 后,kafka 底层做了什么?生成了什么元数据以及存放在哪? topic 的分区数目是如何分布在各个节点的?

通过命令创建 一个副本数为 1 ,分区数为 1 的 topic,先看看底层发生了什么,然后在分析流程

kafka 日志目录

一个分区对应一个目录,文件有 索引文件和日志文件


(base) user@userdeMacBook-Air-10 bin % ./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1
 --partitions 1 --topic test
Created topic test.
(base) user@userdeMacBook-Air-10 bin % ls -l /tmp/kafka-logs/test-0
total 8
-rw-r--r--  1 user  wheel  10485760 Oct 14 10:33 00000000000000000000.index
-rw-r--r--  1 user  wheel         0 Oct 14 10:33 00000000000000000000.log
-rw-r--r--  1 user  wheel  10485756 Oct 14 10:33 00000000000000000000.timeindex
-rw-r--r--  1 user  wheel         8 Oct 14 10:33 leader-epoch-checkpoint
(base) user@userdeMacBook-Air-10 bin %

zookeeper 生成了什么?

记录了配置文件


localhost:2181	$	get /config/topics/test
{"version":1,"config":{}}
cZxid = 0x88d
ctime = Mon Oct 14 10:33:38 CST 2024
mZxid = 0x88d
mtime = Mon Oct 14 10:33:38 CST 2024
pZxid = 0x88d
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 25
numChildren = 0

kafka AdminClient –> broker

kafka 执行命令也是遵从 CS 架构, 有个 AdminClient 现在本地做校验,初步验证没问题后,发送给 broker 处理,本文主要介绍 broker 端口的处理,AdminClient 简单带过

AdminClient

AdminClient 源码目录在 ~/kafka-prj/core/src/main/scala/kafka/admin/TopicCommand.scala 我们调用 kafka-topics.sh 命令实现的功能逻辑都在这个文件,这里面有个 TopicCommand 类

在 2.6 的版本还可以用 zookeeper ,命令可以指定用 zookeeper 还是 brokerserve(必须选一个),两者逻辑不一样,这里主要介绍 AdminClientTopicService


def main(args: Array[String]): Unit = {

   val opts = new TopicCommandOptions(args)
   opts.checkArgs()

  //根据 命令配置项来确定 用Zookeeper 还是brokerserver ,这里主要
   val topicService = if (opts.zkConnect.isDefined)
     ZookeeperTopicService(opts.zkConnect)
   else
     AdminClientTopicService(opts.commandConfig, opts.bootstrapServer)

   var exitCode = 0
   try {
     if (opts.hasCreateOption)
     //调用的是 AdminClientTopicService类的createTopic 方法
       topicService.createTopic(opts)
   }
  ....
 }


override def createTopic(topic: CommandTopicPartition): Unit = {
  //验证是否存在
      if (topic.replicationFactor.exists(rf => rf > Short.MaxValue || rf < 1))
        throw new IllegalArgumentException(s"The replication factor must be between 1 and ${Short.MaxValue} inclusive")
  //验证分区数是否符合规则,没有上限,只有下限
      if (topic.partitions.exists(partitions => partitions < 1))
        throw new IllegalArgumentException(s"The partitions must be greater than 0")

      try {
        //hasReplicaAssignment 指 是否有用 配置文件来指定 分区所在节点
        val newTopic = if (topic.hasReplicaAssignment)
          new NewTopic(topic.name, asJavaReplicaReassignment(topic.replicaAssignment.get))
        else {
          //默认创建流程
          new NewTopic(
            topic.name,
            topic.partitions.asJava,
            topic.replicationFactor.map(_.toShort).map(Short.box).asJava)
        }

        val configsMap = topic.configsToAdd.stringPropertyNames()
          .asScala
          .map(name => name -> topic.configsToAdd.getProperty(name))
          .toMap.asJava

        newTopic.configs(configsMap)
        //发送到broker 端,同步
        val createResult = adminClient.createTopics(Collections.singleton(newTopic))
        createResult.all().get()
        //下面也是我们执行后的命令回显输出
        println(s"Created topic ----- ${topic.name}.")
      } catch {
//.....
    }

发送过程省略,又是一个大话题,最后发送到 controller 节点

Broker

最终由 handleCreateTopicsRequest 处理,这里在做进一步的校验,如 是不是 control 节点,鉴权,是否有权限创建,是否已存在等


def handleCreateTopicsRequest(request: RequestChannel.Request): Unit = {
  //...

    val createTopicsRequest = request.body[CreateTopicsRequest]
    val results = new CreatableTopicResultCollection(createTopicsRequest.data.topics.size)

    if (!controller.isActive) {
 //.不是  controller 节点
    } else {
      createTopicsRequest.data.topics.forEach { topic =>
        results.add(new CreatableTopicResult().setName(topic.name))
      }
      ....
      }
    ///回调函数
      def handleCreateTopicsResults(errors: Map[String, ApiError]): Unit = {
        errors.foreach { case (topicName, error) =>
          val result = results.find(topicName)
          result.setErrorCode(error.error.code)
            .setErrorMessage(error.message)
          // Reset any configs in the response if Create failed
          if (error != ApiError.NONE) {
            result.setConfigs(List.empty.asJava)
              .setNumPartitions(-1)
              .setReplicationFactor(-1)
              .setTopicConfigErrorCode(0.toShort)
          }
        }
        sendResponseCallback(results)
      }
   //传入回调函数  createTopics也是创建的关键逻辑
      adminManager.createTopics(createTopicsRequest.data.timeoutMs,
          createTopicsRequest.data.validateOnly,
          toCreate,
          authorizedForDescribeConfigs,
          handleCreateTopicsResults)
    }
  }

createTopics


def createTopics(timeout: Int,
                   validateOnly: Boolean,
                   toCreate: Map[String, CreatableTopic],
                   includeConfigsAndMetatadata: Map[String, CreatableTopicResult],
                   responseCallback: Map[String, ApiError] => Unit): Unit = {

    // 1. map over topics creating assignment and calling zookeeper
      //从内存中获取 brokers信息
    val brokers = metadataCache.getAliveBrokers.map { b => kafka.admin.BrokerMetadata(b.id, b.rack) }
    val metadata = toCreate.values.map(topic =>
      try {
        //判断缓存中是否存在
        if (metadataCache.contains(topic.name))
          throw new TopicExistsException(s"Topic '${topic.name}' already exists.")
        //收集配置
        val configs = new Properties()
        topic.configs.forEach { entry =>
          configs.setProperty(entry.name, entry.value)
        }
        LogConfig.validate(configs)
 				// 分区数和副本数校验
        if ((topic.numPartitions != NO_NUM_PARTITIONS || topic.replicationFactor != NO_REPLICATION_FACTOR)
            && !topic.assignments().isEmpty) {
          throw new InvalidRequestException("Both numPartitions or replicationFactor and replicasAssignments were set. " +
            "Both cannot be used at the same time.")
        }
       //....
        //这里会指定分配好副本在那个broker 上
        val assignments = if (topic.assignments().isEmpty) {
          AdminUtils.assignReplicasToBrokers(
            brokers, resolvedNumPartitions, resolvedReplicationFactor)
        } else {
          val assignments = new mutable.HashMap[Int, Seq[Int]]
          // Note: we don't check that replicaAssignment contains unknown brokers - unlike in add-partitions case,
          // this follows the existing logic in TopicCommand
          topic.assignments.forEach { assignment =>
            assignments(assignment.partitionIndex) = assignment.brokerIds.asScala.map(a => a: Int)
          }
          assignments
        }
       //  使用方式是自定义实现 org.apache.kafka.server.policy.CreateTopicPolicy
       // 可以自定义 一个类来实现创建规则   create.topic.policy.class.name=自定义类
        createTopicPolicy match {
          case Some(policy) =>
            adminZkClient.validateTopicCreate(topic.name, assignments, configs)

           //.... 省略
          // 把topic相关数据写入到zk中 拎出来分析下 createTopicWithAssignment
            if (!validateOnly)
              adminZkClient.createTopicWithAssignment(topic.name, configs, assignments)
          case None =>
            if (validateOnly)
              adminZkClient.validateTopicCreate(topic.name, assignments, configs)
            else
              adminZkClient.createTopicWithAssignment(topic.name, configs, assignments)
        }

        // For responses with DescribeConfigs permission, populate metadata and configs
        includeConfigsAndMetatadata.get(topic.name).foreach { result =>
          val logConfig = LogConfig.fromProps(KafkaServer.copyKafkaConfigToLog(config), configs)
          val createEntry = createTopicConfigEntry(logConfig, configs, includeSynonyms = false, includeDocumentation = false)(_, _)
          val topicConfigs = logConfig.values.asScala.map { case (k, v) =>
            val entry = createEntry(k, v)
            val source = ConfigSource.values.indices.map(_.toByte)
              .find(i => ConfigSource.forId(i.toByte) == entry.source)
              .getOrElse(0.toByte)
            new CreatableTopicConfigs()
                .setName(k)
                .setValue(entry.value)
                .setIsSensitive(entry.isSensitive)
                .setReadOnly(entry.isReadOnly)
                .setConfigSource(source)
          }.toList.asJava
          result.setConfigs(topicConfigs)
          result.setNumPartitions(assignments.size)
          result.setReplicationFactor(assignments(0).size.toShort)
        }
        CreatePartitionsMetadata(topic.name, assignments.keySet, ApiError.NONE)
      } catch {
       //一些异常
          error(s"Error processing create topic request $topic", e)
          CreatePartitionsMetadata(topic.name, Set.empty, ApiError.fromThrowable(e))
      }).toBuffer

 //	省略
  }

createTopicWithAssignment

这里会将 topic 的配置写入 zk 中,也就是我们看到的 /config/topics/目录下的内容,以及 /brokers/topics/\[topic-name\]


def createTopicWithAssignment(topic: String,
                                config: Properties,
                                partitionReplicaAssignment: Map[Int, Seq[Int]]): Unit = {
  // 校验 topic 配置
    validateTopicCreate(topic, partitionReplicaAssignment, config)
    // write out the config if there is any, this isn't transactional with the partition assignments
    //topic 的配置写入 zk 中
    zkClient.setOrCreateEntityConfigs(ConfigType.Topic, topic, config)

    // create the partition assignment
  //副本分配策略,分区信息写入到 /brokers/topics/topicName
    writeTopicPartitionAssignment(topic, partitionReplicaAssignment.map { case (k, v) => k -> ReplicaAssignment(v) },
      isUpdate = false)
  }

此时,已经更新了 zk 数据, Controller 节点上有监听 zk 数据变化的线程(由 TopicChangeListener 监听), 如果 /brokers/topics 目录下发生了变化,会有相对应的逻辑处理,这里是由 processTopicChange 函数处理

processTopicChange


private def processTopicChange(): Unit = {
    if (!isActive) return
  //从 zk 中获取全部 topic
    val topics = zkClient.getAllTopicsInCluster(true)
  //比对 topic 数量来确定 topic 是否新增
    val newTopics = topics -- controllerContext.allTopics
    val deletedTopics = controllerContext.allTopics.diff(topics)
    controllerContext.setAllTopics(topics)

    registerPartitionModificationsHandlers(newTopics.toSeq)
    //获取新增 topic 的分区信息
    val addedPartitionReplicaAssignment = zkClient.getFullReplicaAssignmentForTopics(newTopics)

    deletedTopics.foreach(controllerContext.removeTopic)
    addedPartitionReplicaAssignment.foreach {
      case (topicAndPartition, newReplicaAssignment) => controllerContext.updatePartitionFullReplicaAssignment(topicAndPartition, newReplicaAssignment)
    }
    info(s"New topics: [$newTopics], deleted topics: [$deletedTopics], new partition replica assignment " +
      s"[$addedPartitionReplicaAssignment]")
  //如果从 zk 读取到的数据不为空
    if (addedPartitionReplicaAssignment.nonEmpty)
    //进入状态机流程,创建分区目录,onNewPartitionCreation 也是创建 topic 最核心的逻辑
  	onNewPartitionCreation(addedPartitionReplicaAssignment.keySet)
  }

onNewPartitionCreation

进入状态机流程, 这里 Replica状态机 和 Partition状态机 , 这里先留坑


private def onNewPartitionCreation(newPartitions: Set[TopicPartition]): Unit = {
  info(s"New partition creation callback for ${newPartitions.mkString(",")}")
  //NewPartition : 这种状态下该 Replica 只能作为 follower,它可以是 Replica 删除后的一个临时状态,它有效的前置状态是 NonExistentReplica
  partitionStateMachine.handleStateChanges(newPartitions.toSeq, NewPartition)
  replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions).toSeq, NewReplica)

  partitionStateMachine.handleStateChanges(
    newPartitions.toSeq,
    OnlinePartition,
    Some(OfflinePartitionLeaderElectionStrategy(false))
  )
  //Replica 转换成  OnlineReplica 状态
  replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions).toSeq, OnlineReplica)
}

… 这中间过程先留坑

最后, control 节点会先发送 leaderAndIsrRequest 请求给 leader ,本地创建副本,也就是 我们分区看到的文件夹


评论