Administrator
发布于 2024-10-12 / 12 阅读
0
0

kafka 的删除策略

kafka 的删除策略

前段时间客户咨询了个Kafka过期策略删除的问题,趁着机会,深入理解下

kafka 日志数据若是设置了清除策略,会按照策略定期清除kafka log 数据,清除方式有两种

清除策略有有两种,分别是compact 和 delete ,两者都能达到释放空间的效果,这里介绍触发的机制和相关参数

从配置文件看参数:

log.retention.hours/ log.retention.minutes/l og.retention.ms

日志留存时间,默认是 72h,以上三个参数都作用相同,控制的时间粒度不同,l若果同时设置 log.retention.ms 优先级最高

log.retention.check.interval.ms

检查日志段以查看是否可以删除的周期时间,默认 5 分钟,即每隔 5 分钟扫描一次日志,看是否有匹配策略的文件

log.segment.bytes

每个日志段最大的大小,默认 1G(注意 : 每个日志段就是一个文件),kafka 数据是不断的追加到一个文件上,当一个日志段达到 log.segment.bytes 大小时,会写入到新的 日志文件上, 此外 kafka 策略删除数据粒度是按照 日志段来的

log.retention.bytes

这个参数独立于 log.retention.hours,如果 所有的 日志段大小超过 log.retention.bytes,比且超过的大小至少是一个日志段的大小,这样才会被清除; 默认是-1,不开启

举个例子: log.segment.bytes 设置为 100M,log.retention.bytes 为 400M,现有五个日志段,分别是 4 个100M,1 个 50M,总大小是 450M,此时 超过log.retention.bytes 50M,但超出的大小小于一个日志段大小,所以Kafka不会执行任何删除操作

接下来结合代码调试来参数作用

清理任务是用单独的线程任务来执行,这里有很多后台任务线程,这里单独介绍清除策略任务

周期调度 cleanupLogs 这个任务,间隔时间为 retentionCheckMs ,该参数就是 log.retention.check.interval.ms


def startup(): Unit = {
    /* Schedule the cleanup task to delete old logs */
    if (scheduler != null) {
      info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs))

      scheduler.schedule("kafka-log-retention",
                         cleanupLogs _,
                         delay = InitialTaskDelayMs,
                         period = retentionCheckMs,
                         TimeUnit.MILLISECONDS)
 //    .....
      scheduler.schedule("kafka-delete-logs", // will be rescheduled after each delete logs with a dynamic period
                         deleteLogs _,
                         delay = InitialTaskDelayMs,
                         unit = TimeUnit.MILLISECONDS)
    }
    if (cleanerConfig.enableCleaner)
      cleaner.startup()
  }

当 Kafka 启动后,可以在 server.log 日志中看到提示, 说明当前轮询周期是 5000ms


[2024-10-10 20:47:24,621] INFO Starting log cleanup with a period of 5000 ms. (kafka.log.LogManager)

现在看看 cleanupLogs() 函数逻辑


def cleanupLogs(): Unit = {
  info(s"Beginning log cleanup...")

  var total = 0
  val startMs = time.milliseconds

  // clean current logs.
  //这里防止多个 clean 线程竞争
  //TODO:
  val deletableLogs = {
    if (cleaner != null) {
      // prevent cleaner from working on same partitions when changing cleanup policy
      cleaner.pauseCleaningForNonCompactedPartitions()
    } else {
      currentLogs.filter {
        case (_, log) => !log.config.compact
      }
    }
  }
  //deletableLogs 是一个 分区目录集合 ,逐个遍历分区目录
  try {
    deletableLogs.foreach {
      case (topicPartition, log) =>
      //开始 检查日志段是否符合,也是关键逻辑,后面单独介绍
        total += log.deleteOldSegments()

        val futureLog = futureLogs.get(topicPartition)
        if (futureLog != null) {
          // clean future logs
          debug(s"Garbage collecting future log '${futureLog.name}'")
          total += futureLog.deleteOldSegments()
        }
    }
  } finally {
    if (cleaner != null) {
      cleaner.resumeCleaning(deletableLogs.map(_._1))
    }
  }
  info(s"Log cleanup completed. $total files deleted in " +
                (time.milliseconds - startMs) / 1000 + " seconds")
}

log.deleteOldSegments()

这里策略有三种, 分别是按照时间维度,空间大小维度,偏移量维度,下面分别介绍


/**
 * If topic deletion is enabled, delete any log segments that have either expired due to time based retention
 * or because the log size is > retentionSize.
 *
 * Whether or not deletion is enabled, delete any log segments that are before the log start offset
 */
def deleteOldSegments(): Int = {
  if (config.delete) {
    deleteRetentionMsBreachedSegments() + deleteRetentionSizeBreachedSegments() + deleteLogStartOffsetBreachedSegments()
  } else {
    deleteLogStartOffsetBreachedSegments()
  }
}

时间维度 deleteRetentionMsBreachedSegments 时间维度 deleteRetentionMsBreachedSegments


private def deleteRetentionMsBreachedSegments(): Int = {
  if (config.retentionMs < 0) return 0
  val startMs = time.milliseconds
// 闭包的形式传入函数
//比较时间   现在时间减去最近修改的时间?
  deleteOldSegments((segment, _) => startMs - segment.largestTimestamp > config.retentionMs,
    reason = s"retention time ${config.retentionMs}ms breach")
}

空间维度 deleteRetentionSizeBreachedSegments 空间维度 deleteRetentionSizeBreachedSegments


private def deleteRetentionSizeBreachedSegments(): Int = {
  if (config.retentionSize < 0 || size < config.retentionSize) return 0
  //diff 为改 topic 的差值
  var diff = size - config.retentionSize

  def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]) = {
    //如果差值 还大于当前日志段的大小,则开始清理
    if (diff - segment.size >= 0) {
      diff -= segment.size
      true
    } else {
      false
    }
  }
  deleteOldSegments(shouldDelete, reason = s"retention size in bytes ${config.retentionSize} breach")
}

偏移量 deleteLogStartOffsetBreachedSegments 偏移量 deleteLogStartOffsetBreachedSegments

通过偏移量删除,是一种外部触发方式,即改变最初始的日志偏移量, 小于最初始的偏移量则会被删除 ,可用命令 kafka-delete-records.sh 来变更


private def deleteLogStartOffsetBreachedSegments(): Int = {
  def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): Boolean = {
    //baseOffset :     是这个分区的偏移量的下限
    //logStartOffset : 任何小于logStartOffset的偏移量都不会被客户端访问
    // 如果说日志段的最小  baseOffset 小于日志段的起始 则被删除
    nextSegmentOpt.exists(_.baseOffset <= logStartOffset)
  }

  deleteOldSegments(shouldDelete, StartOffsetBreach(this))
}

以上三种方式都是先构造一个判断条件函数(像是闭包?lambada?), 最终都是调用了一个函数 deleteOldSegments

predicate 则是上面构造的验证函数

highWatermark: 高水位即 offset 是被 commit 的


private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean,
                              reason: SegmentDeletionReason): Int = {
  def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): Boolean = {
    //确实当前 日志段是已备份的
    highWatermark >= nextSegmentOpt.map(_.baseOffset).getOrElse(localLog.logEndOffset) &&
      predicate(segment, nextSegmentOpt)
  }
  //上锁
  lock synchronized {
   //deletableSegments 关键流程
    val deletable = localLog.deletableSegments(shouldDelete)
    if (deletable.nonEmpty)
      deleteSegments(deletable, reason)
    else
      0
  }
}

如果都满足条件 则执行 deletableSegments


private[log] def deletableSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean): Iterable[LogSegment] = {
  //segments 是一个描述日志段的LogSegment 类,里面很多操作
  if (segments.isEmpty) {
    Seq.empty
  } else {
    //创建一个ArrayBuffer类型的deletable,用于存储可删除的日志片段
    val deletable = ArrayBuffer.empty[LogSegment]
   //  用于遍历segments集合中的所有日志片段
    val segmentsIterator = segments.values.iterator
    //存放下一个片段
    var segmentOpt = nextOption(segmentsIterator)
  //这里就开始循环遍历日志段 (感觉像是 leetcode 题目)
    while (segmentOpt.isDefined) {
      //当前的 segment
      val segment = segmentOpt.get
      //下一个的 nextSegmentOpt
      val nextSegmentOpt = nextOption(segmentsIterator)
      val isLastSegmentAndEmpty = nextSegmentOpt.isEmpty && segment.size == 0
		//用刚才传入的条件判断是否满足 以及 当前和下一个segment的容量
      if (predicate(segment, nextSegmentOpt) && !isLastSegmentAndEmpty) {
        //累加
        deletable += segment
        segmentOpt = nextSegmentOpt
      } else {
        segmentOpt = Option.empty
      }
    }
    //最大得到 带删除的segment 集合 deletable
    deletable
  }
}

得到需要删除的 segments 后,调用 deleteSegments


private def deleteSegments(deletable: Iterable[LogSegment], reason: SegmentDeletionReason): Int = {
   maybeHandleIOException(s"Error while deleting segments for $topicPartition in dir ${dir.getParent}") {
     val numToDelete = deletable.size

     if (numToDelete > 0) {
       // we must always have at least one segment, so if we are going to delete all the segments, create a new one first
       var segmentsToDelete = deletable
       if (localLog.segments.numberOfSegments == numToDelete) {
         val newSegment = roll()
         if (deletable.last.baseOffset == newSegment.baseOffset) {
           warn(s"Empty active segment at ${deletable.last.baseOffset} was deleted and recreated due to $reason")
           segmentsToDelete = deletable.dropRight(1)
         }
       }
       //检测当前的 segments是否还映射在内存中,还有则抛出异常
       localLog.checkIfMemoryMappedBufferClosed()

       // 删除日志和索引 removeAndDeleteSegments 比较关键,下面抽出来讲
       localLog.removeAndDeleteSegments(segmentsToDelete, asyncDelete = true, reason)
       deleteProducerSnapshots(deletable, asyncDelete = true)
       maybeIncrementLogStartOffset(localLog.segments.firstSegmentBaseOffset.get, SegmentDeletion)
     }
     numToDelete
   }
 }

一般删除的时候,我们都是先看到 文件出现后面多一个 .delete 后缀,其实就是异步删除,逻辑在 removeAndDeleteSegments


private[log] def removeAndDeleteSegments(segmentsToDelete: Iterable[LogSegment],
                                         asyncDelete: Boolean,
                                         reason: SegmentDeletionReason): Unit = {
  if (segmentsToDelete.nonEmpty) {
    // Most callers hold an iterator into the `segments` collection and `removeAndDeleteSegment` mutates it by
    // removing the deleted segment, we should force materialization of the iterator here, so that results of the
    // iteration remain valid and deterministic. We should also pass only the materialized view of the
    // iterator to the logic that actually deletes the segments.
    val toDelete = segmentsToDelete.toList
    reason.logReason(toDelete)
    toDelete.foreach { segment =>
      //segments 是个 map ,先删除 内存 中的元数据
      segments.remove(segment.baseOffset)
    }
    //正式删除 关键函数
    LocalLog.deleteSegmentFiles(toDelete, asyncDelete, dir, topicPartition, config, scheduler, logDirFailureChannel, logIdent)
  }
}

deleteSegmentFiles


private[log] def deleteSegmentFiles(segmentsToDelete: immutable.Iterable[LogSegment],
                                      asyncDelete: Boolean,
                                      dir: File,
                                      topicPartition: TopicPartition,
                                      config: LogConfig,
                                      scheduler: Scheduler,
                                      logDirFailureChannel: LogDirFailureChannel,
                                      logPrefix: String): Unit = {
    segmentsToDelete.foreach {
      segment =>
      if (!segment.hasSuffix(DeletedFileSuffix))
      //变更日志段文件夹的后缀
        segment.changeFileSuffixes("", DeletedFileSuffix)
    }

    def deleteSegments(): Unit = {
      info(s"${logPrefix}Deleting segment files ${segmentsToDelete.mkString(",")}")
      val parentDir = dir.getParent
      maybeHandleIOException(logDirFailureChannel, parentDir, s"Error while deleting segments for $topicPartition in dir $parentDir") {
        segmentsToDelete.foreach { segment =>
          segment.deleteIfExists()
        }
      }
    }
// 默认是异步删除
    if (asyncDelete)
  //开始调度删除,间隔时间是 fileDeleteDelayMs ,参数 log.segment.delete.delay.ms 可以修改,默认 1min
  //deleteSegments 删除流程以后再研究
      scheduler.schedule("delete-file", () => deleteSegments(), delay = config.fileDeleteDelayMs)
    else
      deleteSegments()
  }


评论