Administrator
发布于 2022-10-02 / 8 阅读
0
0

osd 心跳

osd 心跳

osd 心跳是什么,有什么功能作用

心跳是什么?

心跳是一种用于故障检测的手段(简单的说就是发个数据包给你,通过有没有回复来判断你的状态)。在分布式系统中有很多节点,节点数量多了,各种异常就会经常发生,如:宕机、 磁盘损坏、网络故障等,通过心跳这种机制可以快速有效的 定位集群 中的错误节点,并做及时的处理保证集群正常服务。

osd 心跳

  • osd是什么?

通俗理解为 主要负责管理ceph集群中磁盘的守护进程

  • osd心跳是什么?有啥功能

osd心跳是检测osd故障的一种机制(集群健康工作的基石), 如果集群中的其中一个osd 突然故障了(假如说是网络隔绝),此时他也没办法发信息给mon,那么此时只有通过其他 osd来上报这个信息, 说我有个邻居挂掉了,那么osd是怎么确定他邻居osd已经出故障了呢?

osd之间检测

  1. 每个 Ceph OSD 守护进程以小于每 6 秒的随机间隔时间检查其他 Ceph OSD 守护进程的心跳 (可以理解为:定期去敲邻居的门)

!image-20220919000635555·

  1. 如果一个相邻的Ceph OSD Daemon在20秒的宽限期内没有显示心跳,Ceph OSD Daemon可能会认为相邻的Ceph OSD Daemon是 dowm状态的,并将其报告给Ceph Monitor (可以理解为:敲了那么长时间的门都没来开门,就报警了)

!image-20220919000758792

  1. 一个osd 无法和配置文件中的osd 达成 peer 关系,则这个osd 每 30 秒 ping 一次mon 以获取集群的osdmap

!image-20220915161221587

  1. 如果集群只有一个osd,则 osd每隔 一个 osd\_mon\_heartbeat\_interval 的时间向 mon 获取新的 osdmap

osd和mon之间

此外 osd除了有检测osd故障的职责,还需要向mon汇报自身的状况

触发汇报的情况有以下几种:

  • OSD有事件发生时(比如故障、PG变更)。
  • osd启动时5秒内
  • OSD周期性的上报 mon (无论是否发现变化 ,默认周期120s)
  • OSD检查failure\_queue中的伙伴OSD失败信息。

那些场景需要用到

查看 mon的日志,看丢失的心跳包:

osd心跳是一直伴随着集群存在的,也是检验集群健康的标志之一,

调整osd心跳 的参数:

检查其它 osd 心跳的时间间隔,心跳响应宽限期,这些都会影响业务;故障的发现时间和心跳带来的负载之间做权衡,比如大规模的集群中, 心跳频率太高则过多的心跳报文会影响系统性能,如果心跳频率过低则会延长发现故障节点的时间,从而影响系统的可用性。

基本使用(参数配置)

osd心跳是集群正常工作的基石,osd的心跳的使用一般都是通过调节参数来到性能上的平衡,以下是 osd心跳机制的 参数,可以用 ceph daemon 命令设置

这里补充比较重要的参数(修改可以在 使用adminsocket 在线修改参数,当然也可以在 ceph.conf 中修改添加参数后重启 )

osd\_mon\_heartbeat\_interval

如果 Ceph OSD 守护进程没有 Ceph OSD 守护进程对等点,Ceph OSD 守护进程对 Ceph 监视器执行 ping 操作的频率。

osd\_heartbeat\_interval

Ceph OSD 守护进程 ping 其对等节点的频率(默认为6s)。

osd\_heartbeat\_grace

当Ceph OSD Daemon没有显示心跳时,Ceph 认为它已经停机的时间 (默认为20s)

mon\_osd\_min\_down\_reporters (一般为2 )

报告故障的OSD 所需的最小数量

mon\_osd\_reporter\_subtree\_level

设置报告故障的OSD 来自那个故障域的,默认为 host;默认情况下只需要两个来自不同故障域的报告就可以报告另一个OSD为停机

osd\_heartbeat\_min\_peers

每次要发送心跳包对象的个数,默认 10个

osd\_mon\_report\_interval 默认 5s

OSD 启动或其它可报告事件的报告间隔时间

osd\_beacon\_report\_interval

osd周期性向 monitor发送 beacon消息进行保活 默认100s

mon\_osd\_report\_timeout

mon标记一个osd为down的最长等待时间 默认值 900

osd\_mon\_heartbeat\_stat\_stale

如果发送它将每 30 秒 ping 一次 Ceph 监视器以获取集群映射的最新副本…..

模块组成以及实现架构

关键的数据结构


// information about a heartbeat peer
//这个类 用来记录 peer osd 信息以及 关于heartbeat 的操作函数都在这
struct HeartbeatInfo {
 int peer;           ///< peer
    ConnectionRef con_front;   ///< peer connection (front)
    ConnectionRef con_back;    ///< peer connection (back)
    utime_t first_tx;   ///< time we sent our first ping request
    utime_t last_tx;    ///< last time we sent a ping request
    utime_t last_rx_front;  ///< last time we got a ping reply on the front side
    utime_t last_rx_back;   ///< last time we got a ping reply on the back side
    epoch_t epoch;      ///< most recent epoch we wanted this peer
    /// number of connections we send and receive heartbeat pings/replies
    static constexpr int HEARTBEAT_MAX_CONN = 2;
    /// history of inflight pings, arranging by timestamp we sent
    /// send time -> deadline -> remaining replies
    map<utime_t, pair<utime_t, int>> ping_history;

    //这个map保存了 peer osd的heartbeat_peers,key是 osd的 id
    map<int,HeartbeatInfo> heartbeat_peers;  ///< map of osd id to HeartbeatInfo
//  ....
 }


// osd 接受心跳消息的handle 函数
// 在 class OSD 里面
struct HeartbeatDispatcher : public Dispatcher {
    OSD *osd;
    explicit HeartbeatDispatcher(OSD *o) : Dispatcher(o->cct), osd(o) {}
    void ms_fast_dispatch(Message *m) override {
      osd->heartbeat_dispatch(m);
    }
    bool ms_dispatch(Message *m) override {
      return osd->heartbeat_dispatch(m);
    }
//........................
    KeyStore *ms_get_auth1_authorizer_keystore() override {
      return osd->ms_get_auth1_authorizer_keystore();
    }
  } heartbeat_dispatcher;

整体设计 流程

osd 如何发送和接受心跳的

public network 和 cluster network public network 和 cluster network

osd 直接的通信数据需要通过 ceph网络层传输; 一般来说 ceph集群是 内外网分离的,集群内部通信用单独cluster network,客户端外部和集群通信也是单独用 public network(可以理解为用两张网卡)

ceph将osd间的副本数据、迁移数据的传输交由cluster network,将client和ceph后端的数据传输交由public network!image-20220914175829713

osd之间心跳通信使用了 四个message对象 (ceph中 用于通信的类在 osd 初始化时会创建,可以理解为一个专门负责派送某一种类型的快递员)用于osd心跳通信,分别是

  • ms\_hb\_front\_client : 用于向其他 osd 发送心跳
  • ms\_hb\_back\_client: 用于向其他 osd 发送心跳
  • ms\_hb\_back\_server: 用于接受他 osd 发送心跳
  • ms\_hb\_front\_server: 用于接收他 osd 发送心跳
  • front 用的是public network,用于检测客户端网络连接问题, back 用的是 cluster network。

# ceph_osd.cc
// 创建四个关于心跳的 message
int main(int argc, const char **argv) {
    Messenger *ms_hb_back_client = Messenger::create(g_ceph_context, cluster_msg_type,
  					     entity_name_t::OSD(whoami), "hb_back_client",
  					     getpid(), Messenger::HEARTBEAT);
    Messenger *ms_hb_front_client = Messenger::create(g_ceph_context, public_msg_type,
  					     entity_name_t::OSD(whoami), "hb_front_client",
  					     getpid(), Messenger::HEARTBEAT);
    Messenger *ms_hb_back_server = Messenger::create(g_ceph_context, cluster_msg_type,
  						   entity_name_t::OSD(whoami), "hb_back_server",
  						   getpid(), Messenger::HEARTBEAT);
    Messenger *ms_hb_front_server = Messenger::create(g_ceph_context, public_msg_type,
  						    entity_name_t::OSD(whoami), "hb_front_server",
  						    getpid(), Messenger::HEARTBEAT);
//osd 初始化的类其参数就需要 关于 心跳的 Message 实例
osd = new OSD(g_ceph_context,
                  store,
                  whoami,
                  ms_cluster,
                  ms_public,
                  ms_hb_front_client,
                  ms_hb_back_client,
                  ms_hb_front_server,
                  ms_hb_back_server,
                  ms_objecter,
                  &mc,
                  data_path,
                  journal_path);

// 启动 message
    ms_hb_front_client->start();
    ms_hb_back_client->start();
    ms_hb_front_server->start();
    ms_hb_back_server->start();
}

!image-20220914175706661ms_hbclient <-> ms_hb_back_server ms_hbclient <-> ms_hb_front_server

发送数据

!image-20220919105101754

osd 专门开了一个线程用来 发送心跳,遍历 heartbeat\_peers ,逐个发送心跳包

!image-20220925151116385


//解
struct T_Heartbeat : public Thread {
    OSD *osd;
    explicit T_Heartbeat(OSD *o) : osd(o) {}
    void *entry() override {
        osd->heartbeat_entry();
        return 0;
    }
} heartbeat_thread;
int OSD::init()
{
// start the heartbeat thread
//传入的参数 是线程的名字
  heartbeat_thread.create("osd_srv_heartbt");
//....
}
void OSD::heartbeat_entry()
{
  std::lock_guard l(heartbeat_lock);
  if (is_stopping())
    return;
  while (!heartbeat_stop) {
    heartbeat();

    double wait;
    //等待的时间分为两种,一种是固定的 osd_heartbeat_interval,另外一种是 基于osd_heartbeat_interval生成的随机时间
    if (cct->_conf.get_val<bool>("debug_disable_randomized_ping")) {
      wait = (float)cct->_conf->osd_heartbeat_interval;
    } else {
      wait = .5 + ((float)(rand() % 10)/10.0) * (float)cct->_conf->osd_heartbeat_interval;
    }
    utime_t w;
    //转换成 utime_t
    w.set_from_double(wait);
    dout(30) << "heartbeat_entry sleeping for " << wait << dendl;

    // 这里用条件变量来等待时间,并没有释放锁
    heartbeat_cond.WaitInterval(heartbeat_lock, w);
    if (is_stopping())
      return;
    dout(30) << "heartbeat_entry woke up" << dendl;

  }
}

发送 heartbeat 过程都在 heartbeat(); 中 ,这里分析下

heartbeat 函数主要为


void OSD::heartbeat()
{
  ceph_assert(heartbeat_lock.is_locked_by_me());
  dout(30) << "heartbeat" << dendl;

  // get CPU load avg
  double loadavgs[1];
  int hb_interval = cct->_conf->osd_heartbeat_interval;
  int n_samples = 86400;
  if (hb_interval > 1) {
    n_samples /= hb_interval;
    if (n_samples < 1)
      n_samples = 1;
  }
  cout << "send times :  " << n_samples << std::endl;
  // 估计下每次发送osd 负载值????
  if (getloadavg(loadavgs, 1) == 1) {
    cout <<loadavgs[0] << std::endl;
    logger->set(l_osd_loadavg, 100 * loadavgs[0]);
    daily_loadavg = (daily_loadavg * (n_samples - 1) + loadavgs[0]) / n_samples;

    dout(1) << "heartbeat: daily_loadavg " << daily_loadavg << dendl;
  }

  dout(1) << "heartbeat checking stats" << dendl;

  // refresh peer list and osd stats
  // hb_peers保存 id(这里是 osd 的id)
  vector<int> hb_peers;

  //遍历 peer osd, peer osd 如何更新在下面会说到
  for (map<int,HeartbeatInfo>::iterator p = heartbeat_peers.begin();
       p != heartbeat_peers.end();
       ++p)
    hb_peers.push_back(p->first);

  // 更新状态 osd 状态
  auto new_stat = service.set_osd_stat(hb_peers, get_num_pgs());
  dout(5) << __func__ << " " << new_stat << dendl;
  ceph_assert(new_stat.statfs.total);

// 这比例指 osd 使用空间变更比例
  float pratio;
  float ratio = service.compute_adjusted_ratio(new_stat, &pratio);
  std::cout << "ratio :  " << ratio << std::endl;
  service.check_full_status(ratio, pratio);

//获取当前实际  精确到纳秒
  utime_t now = ceph_clock_now();
  utime_t deadline = now;

//deadline  最大等待回应的时间
  deadline += cct->_conf->osd_heartbeat_grace;
  std::cout << "now " << now << std::endl;
  std::cout << "deadline " << deadline << std::endl;

  // send heartbeats
  // 遍历 对等 peer,逐个发送
  for (map<int,HeartbeatInfo>::iterator i = heartbeat_peers.begin();
       i != heartbeat_peers.end();
       ++i) {
    int peer = i->first;
    i->second.last_tx = now;
    if (i->second.first_tx == utime_t())
      i->second.first_tx = now;
    // 保存发送 heartbeat 记录,HEARTBEAT_MAX_CONN 为发送heartbeat 数量
    //key 发送的时间和, value 为 心跳回复的期限
    i->second.ping_history[now] = make_pair(deadline,
      HeartbeatInfo::HEARTBEAT_MAX_CONN);

    if (i->second.hb_interval_start == utime_t())
      i->second.hb_interval_start = now;

    dout(1) << "heartbeat sending ping to osd." << peer << dendl;
    //  集群内发送
    i->second.con_back->send_message(new MOSDPing(monc->get_fsid(),
					  service.get_osdmap_epoch(),
					  MOSDPing::PING, now,
					  cct->_conf->osd_heartbeat_min_size));

   // 如果有 设置 public network
    if (i->second.con_front)
      i->second.con_front->send_message(new MOSDPing(monc->get_fsid(),
					     service.get_osdmap_epoch(),
					     MOSDPing::PING, now,
					  cct->_conf->osd_heartbeat_min_size));
  }
//  MOSDPing::PING 心跳信息的类型,还有
  logger->set(l_osd_hb_to, heartbeat_peers.size());
// 如果集群只有一个 osd,他会在 osd_mon_heartbeat_interval 时间内,向mon获取新的 osdmap
  // hmm.. am i all alone?
  dout(30) << "heartbeat lonely?" << dendl;
  if (heartbeat_peers.empty()) {
    if (now - last_mon_heartbeat > cct->_conf->osd_mon_heartbeat_interval && is_active()) {
      last_mon_heartbeat = now;
      dout(1) << "i have no heartbeat peers; checking mon for new map" << dendl;
      osdmap_subscribe(get_osdmap_epoch() + 1, false);
    }
  }

  dout(30) << "heartbeat done" << dendl;
}

更新 heartbeat_peers 更新 heartbeat\_peers

osd 发送心跳包给peer osd,peer OSD的选择并不是集群中的全部osd,peer OSD如果选择其他所有结点,则会增加集群负载影响系统性能。peer OSD的选择是通过maybe\_update\_heartbeat\_peers函数经行更新 heartbeat\_peers;有以下三种情况会更新 heartbeat\_peers:

  • pg创建的时候,参见handle\_pg\_create;
  • osdmap变更时,参见handle\_osd\_map;
  • tick定时器会周期性的检测.

osd 每次发送心跳的时候都需要遍历 heartbeat\_peers, heartbeat\_peers的 是个map,其 key 为osd的id,value是 HeartbeatInfo 结构体

map<int,HeartbeatInfo> heartbeat_peers; ///< map of osd id to HeartbeatInfo

tick定时器会周期性的检测.

ceph 使用了一个定时器来更新 heartbeat\_peers


// tick
//get_tick_interval生成随机时间 在1s上下,不想让他太频繁
double OSD::get_tick_interval() const
{
  // vary +/- 5% to avoid scrub scheduling livelocks
  constexpr auto delta = 0.05;
  return (OSD_TICK_INTERVAL *
	  ceph::util::generate_random_number(1.0 - delta, 1.0 + delta));
}
// 在 int OSD::init()中有一个  tick_timer.add_event_after
// 从函数名 可看出 添加一个时间,在 get_tick_interval() 后执行事件
  tick_timer.add_event_after(get_tick_interval(),
			     new C_Tick(this));
// 从 OSD::C_Tick 可以看出, 执行的是 osd->tick() 函数
class OSD::C_Tick : public Context {
  OSD *osd;
  public:
  explicit C_Tick(OSD *o) : osd(o) {}
  void finish(int r) override {
    osd->tick();
  }
};

osd->tick()


//对几种情况
void OSD::tick()
{
  ceph_assert(osd_lock.is_locked());
  dout(10) << "tick" << dendl;
//等待 osd心跳健康,
  if (is_active() || is_waiting_for_healthy()) {
 //更新 heartbeat_peers
    maybe_update_heartbeat_peers();
  }

  if (is_waiting_for_healthy()) {
    start_boot();
  }
//更新 osdmap epoch
  if (is_waiting_for_healthy() || is_booting()) {
    std::lock_guard l(heartbeat_lock);
    utime_t now = ceph_clock_now();
    if (now - last_mon_heartbeat > cct->_conf->osd_mon_heartbeat_interval) {
      last_mon_heartbeat = now;
      dout(1) << __func__ << " checking mon for new map" << dendl;
      osdmap_subscribe(get_osdmap_epoch() + 1, false);
    }
  }

  do_waiters();
//继续添加 事件,循环调用  OSD::tick()
  tick_timer.add_event_after(get_tick_interval(), new C_Tick(this));
}

OSD::tick 中 更新 heartbeat\_peers 的关键函数是 maybe\_update\_heartbeat\_peers() 这里简要分析下


void OSD::maybe_update_heartbeat_peers()
{
  ceph_assert(osd_lock.is_locked());
  if (is_waiting_for_healthy() || is_active()) {
    utime_t now = ceph_clock_now();
    //第一次启动时候
    if (last_heartbeat_resample == utime_t()) {
      last_heartbeat_resample = now;
      heartbeat_set_peers_need_update();
    } else if (!heartbeat_peers_need_update()) {
      utime_t dur = now - last_heartbeat_resample;
      //仅仅在超出grace时间后才更新
      if (dur > cct->_conf->osd_heartbeat_grace) {
	dout(10) << "maybe_update_heartbeat_peers forcing update after " << dur << " seconds" << dendl;
        // 更新 peer的标志位
	heartbeat_set_peers_need_update();
        ////
	last_heartbeat_resample = now;
	// automatically clean up any stale heartbeat peers
	// if we are unhealthy, then clean all
        //清空 peers
	reset_heartbeat_peers(is_waiting_for_healthy());
      }
    }
  }

// ............
  // build heartbeat from set
  // 从同一个 pg下的osd中 获取 peer
  if (is_active()) {
    vector<PGRef> pgs;
    _get_pgs(&pgs);
    for (auto& pg : pgs) {
      pg->with_heartbeat_peers([&](int peer) {
	  if (get_osdmap()->is_up(peer)) {
	    _add_heartbeat_peer(peer);
	  }
	});
    }
  }

  // include next and previous up osds to ensure we have a fully-connected set
  //want: 保存 osd的相邻osd id,以防止 peer数量不够
  set<int> want, extras;
  const int next = get_osdmap()->get_next_up_osd_after(whoami);
  if (next >= 0)
    want.insert(next);
  int prev = get_osdmap()->get_previous_up_osd_before(whoami);
  if (prev >= 0 && prev != next)
    want.insert(prev);

  // make sure we have at least **min_down** osds coming from different
  // subtree level (e.g., hosts) for fast failure detection.
  // 确保至少有 min_down 个osd peer(理论上), 从 level 故障域 和  min_down 从整个集群 随机抽取两个osd
  auto min_down = cct->_conf.get_val<uint64_t>("mon_osd_min_down_reporters");
  auto subtree = cct->_conf.get_val<string>("mon_osd_reporter_subtree_level");

  get_osdmap()->get_random_up_osds_by_subtree(
    whoami, subtree, min_down, want, &want);

  for (set<int>::iterator p = want.begin(); p != want.end(); ++p) {
    dout(10) << " adding neighbor peer osd." << *p << dendl;
    extras.insert(*p);
    _add_heartbeat_peer(*p);
  }
  //删除已经down的osd
  // remove down peers; enumerate extras
  map<int,HeartbeatInfo>::iterator p = heartbeat_peers.begin();
  while (p != heartbeat_peers.end()) {
    if (!get_osdmap()->is_up(p->first)) {
      int o = p->first;
      ++p;
      _remove_heartbeat_peer(o);
      continue;
    }
    if (p->second.epoch < get_osdmap_epoch()) {
      extras.insert(p->first);
    }
    ++p;
  }

  // too few?
  //peer数量过少,默认 10个,从 want 里面添加
  for (int n = next; n >= 0; ) {
    if ((int)heartbeat_peers.size() >= cct->_conf->osd_heartbeat_min_peers)
      break;
    if (!extras.count(n) && !want.count(n) && n != whoami) {
      dout(10) << " adding random peer osd." << n << dendl;
      extras.insert(n);
      _add_heartbeat_peer(n);
    }
    n = get_osdmap()->get_next_up_osd_after(n);
    if (n == next)
      break;  // came full circle; stop
  }

  // too many?
  for (set<int>::iterator p = extras.begin();
       (int)heartbeat_peers.size() > cct->_conf->osd_heartbeat_min_peers && p != extras.end();
       ++p) {
    if (want.count(*p))
      continue;
    _remove_heartbeat_peer(*p);
  }

  dout(10) << "maybe_update_heartbeat_peers " << heartbeat_peers.size() << " peers, extras " << extras << dendl;
}

pg创建的时候

在 函数 handle\_pg\_create 里最后也调用了 maybe\_update\_heartbeat\_peers();


void OSD::handle_pg_create(OpRequestRef op)
{
  const MOSDPGCreate *m = static_cast<const MOSDPGCreate*>(op->get_req());
  ceph_assert(m->get_type() == MSG_OSD_PG_CREATE);

  dout(10) << "handle_pg_create " << *m << dendl;
 //........................

  maybe_update_heartbeat_peers();
}

osdmap 变更时候

void OSD::handle_osd_map(MOSDMap *m)
{
  //..................
  if (is_active() || is_waiting_for_healthy())
    maybe_update_heartbeat_peers();
  //..................
}

接收数据

在 osd初始化时,创建四个关于心跳的 message,(上文有提到), Message 类中 处理消息的方法 handle 会注册到分发中心


//heartbeat_dispatcher 是一个结构体,里面有处理心跳信息的函数

int OSD::init()
{
...
//注册dispatcher, heartbeat_dispatcher 处理心跳信息的函数
hb_front_client_messenger->add_dispatcher_head(&heartbeat_dispatcher);
hb_back_client_messenger->add_dispatcher_head(&heartbeat_dispatcher);
hb_front_server_messenger->add_dispatcher_head(&heartbeat_dispatcher);
hb_back_server_messenger->add_dispatcher_head(&heartbeat_dispatcher);
...
}


// heartbeat_dispatch
// osd 收到心跳消息 的类型是 MSG_OSD_PING ,执行 handle_osd_ping
bool OSD::heartbeat_dispatch(Message *m)
{
  dout(30) << "heartbeat_dispatch " << m << dendl;
  switch (m->get_type()) {

  case CEPH_MSG_PING:
    dout(10) << "ping from " << m->get_source_inst() << dendl;
    m->put();
    break;

  case MSG_OSD_PING:
    handle_osd_ping(static_cast<MOSDPing*>(m));
    break;

  default:
    dout(0) << "dropping unexpected message " << *m << " from " << m->get_source_inst() << dendl;
    m->put();
  }

  return true;
}

这里分析下 当 osd 收到心跳包后 handle\_osd\_ping是怎么处理的

处理的代码很长,主要可以分为三部分: 心跳通知、心跳回复、osd dowm,从这三个case 分析下

!image-20220920104648238

处理心跳通知

//收到的是心跳通知
  case MOSDPing::PING:
    {
      //debug 模式
     //这里判断是否超时
      if (!cct->get_heartbeat_map()->is_healthy()) {
		dout(10) << "internal heartbeat not healthy, dropping ping request" << dendl;
		break;
      }
      //回复 心跳通知
      Message *r = new MOSDPing(monc->get_fsid(),
				curmap->get_epoch(),
				MOSDPing::PING_REPLY, m->stamp,
				cct->_conf->osd_heartbeat_min_size);
      m->get_connection()->send_message(r);
      //验证 发送方的osd是否 为 up
      if (curmap->is_up(from)) {
      //  从收到消息中 更新osd peer的epoch
	service.note_peer_epoch(from, m->map_epoch);
	if (is_active()) {
	  ConnectionRef con = service.get_con_osd_cluster(from, curmap->get_epoch());
	  if (con) {
	    service.share_map_peer(from, con.get());
	  }
	}
      //没有在 osdmap中发现你
      } else if (!curmap->exists(from) ||
		 curmap->get_down_at(from) > m->map_epoch) {
	// tell them they have died
	Message *r = new MOSDPing(monc->get_fsid(),
				  curmap->get_epoch(),
				  MOSDPing::YOU_DIED,
				  m->stamp,
				  cct->_conf->osd_heartbeat_min_size);
	m->get_connection()->send_message(r);
      }
    }

处理心跳回复

处理函数太长了,大概看了下,主要功能是 根据时间戳更新 ping\_history,更新peer对应的 HeartbeatInfo,以及 osd状态,这里没有心跳超时的检测,有一个定时任务专门负责


心跳超时检测 和上报消息

心跳超时检测 在 OSD::init()


int OSD::init()
{
    //.......
  tick_timer.add_event_after(get_tick_interval(),
			     new C_Tick(this));
  {
    std::lock_guard l(tick_timer_lock);
   //每隔 get_tick_interval()时间  调C_Tick_WithoutOSDLock类里的 osd->tick_without_osd_lock() 函数
    tick_timer_without_osd_lock.add_event_after(get_tick_interval(),
						new C_Tick_WithoutOSDLock(this));
  }
    //.......
}

class OSD::C_Tick_WithoutOSDLock : public Context {
  OSD *osd;
  public:
  explicit C_Tick_WithoutOSDLock(OSD *o) : osd(o) {}
  void finish(int r) override {
    osd->tick_without_osd_lock();
  }
};

这里详细介绍下 tick\_without\_osd\_lock


void OSD::tick_without_osd_lock()
{
 //......
  if (is_active() || is_waiting_for_healthy()) {
    heartbeat_lock.Lock();
    //关键函数 heartbeat_check
    //heartbeat_check 会将 有错误的peer都会记录到 failure_queue
    heartbeat_check();
    heartbeat_lock.Unlock();

    map_lock.get_read();
    std::lock_guard l(mon_report_lock);

    // mon report?
    utime_t now = ceph_clock_now();
    if (service.need_fullness_update() ||
	now - last_mon_report > cct->_conf->osd_mon_report_interval) {
      last_mon_report = now;
      send_full_update();
      //send_failures 会遍历 failure_queue,然后将错误的消息发送给 mon
      send_failures();
    }
    map_lock.put_read();

    epoch_t max_waiting_epoch = 0;
    for (auto s : shards) {
      max_waiting_epoch = std::max(max_waiting_epoch,
				   s->get_max_waiting_epoch());
    }
    if (max_waiting_epoch > get_osdmap()->get_epoch()) {
      dout(20) << __func__ << " max_waiting_epoch " << max_waiting_epoch
	       << ", requesting new map" << dendl;
      osdmap_subscribe(superblock.newest_map + 1, false);
    }
  }

  if (is_active()) {
 .....
    const auto now = ceph::coarse_mono_clock::now();
    {
//    周期性的向 mon报告
        std::lock_guard l{min_last_epoch_clean_lock};
      const auto elapsed = now - last_sent_beacon;
      if (chrono::duration_cast<chrono::seconds>(elapsed).count() >
        cct->_conf->osd_beacon_report_interval) {
        need_send_beacon = true;
      }
    }
    if (need_send_beacon) {
//将自己当前的 eponch 发送给mon
      send_beacon(now);
    }
  }
....
//更新 定时器任务
  tick_timer_without_osd_lock.add_event_after(get_tick_interval(),
					      new C_Tick_WithoutOSDLock(this));
}

heartbeat_check heartbeat\_check

void OSD::heartbeat_check()
{
  ceph_assert(heartbeat_lock.is_locked());
  utime_t now = ceph_clock_now();

  // check for incoming heartbeats (move me elsewhere?)
  // 遍历 heartbeat_peers
  for (map<int,HeartbeatInfo>::iterator p = heartbeat_peers.begin();
       p != heartbeat_peers.end();
       ++p) {

    if (p->second.first_tx == utime_t()) {
      dout(30) << "heartbeat_check we haven't sent ping to osd." << p->first
               << " yet, skipping" << dendl;
      continue;
    }
.....
    // 收到回复后,都会根据接收时间 记录 ping_history里
    if (p->second.is_unhealthy(now)) {
      utime_t oldest_deadline = p->second.ping_history.begin()->second.first;
      //
      if (p->second.last_rx_back == utime_t() ||
	  p->second.last_rx_front == utime_t()) {
	// fail
        //有错误的peer都会记录到 failure_queue,会统一发送给mon
	failure_queue[p->first] = p->second.first_tx;
      } else {
.....
	// fail
	failure_queue[p->first] = std::min(p->second.last_rx_back, p->second.last_rx_front);
      }
    }
  }
}

mon 如何处理上报信息的

!image-20220922164958665

!image-20220922170235620

和osd接受心跳消息一样,在mon中 也是有一个dispatch 来处理信息,并且也是根据消息类型来处理

!image-20220926110653634

最终会调用 paxos\_service\[PAXOS\_OSDMAP\]->dispatch(op); paxos\_service 在 mon初始化时候就已经构造好了,可以看到处理reset重新指向了OSDMonitor的类,所以说处理报错消息是由OSDMonitor类的函数处理的;

!image-20220926145352556

看看 paxos\_service\[PAXOS\_OSDMAP\]->dispatch(op) 是怎么处理的

对于上报消息并不是直接处理,而是分为两个步骤

  • 预处理 preprocess\_query
  • 更行状态 prepare\_update

bool PaxosService::dispatch(MonOpRequestRef op)
{
........
  // 预处理 操作
  // preprocess
  if (preprocess_query(op))
    return true;  // easy!
...........
.......
  // update
  if (!prepare_update(op)) {
    // no changes made.
    return true;
  }

  if (need_immediate_propose) {
    dout(10) << __func__ << " forced immediate propose" << dendl;
    need_immediate_propose = false;
    propose_pending();
    return true;
  }

......
}

先说下 preprocess\_query

preprocess\_query 函数中处理了很多种类型消息,其中 MSG\_OSD\_FAILURE 对应的处理函数是 preprocess\_failure


bool OSDMonitor::preprocess_query(MonOpRequestRef op)
{
  op->mark_osdmon_event(__func__);
  Message *m = op->get_req();
  switch (m->get_type()) {
    // READs
.....
    // damp updates
  case MSG_OSD_MARK_ME_DOWN:
    return preprocess_mark_me_down(op);
  case MSG_OSD_FULL:
    return preprocess_full(op);
  case MSG_OSD_FAILURE:
    return preprocess_failure(op);
......
  default:
    ceph_abort();
    return true;
  }
}

最终 调用 preprocess\_failure ,这里多次验证这个消息是否准确的


bool OSDMonitor::preprocess_failure(MonOpRequestRef op)
{
  op->mark_osdmon_event(__func__);
  MOSDFailure *m = static_cast<MOSDFailure*>(op->get_req());
  // who is target_osd
  //获取osd id
  int badboy = m->get_target_osd();
  // check permissions
  if (check_source(op, m->fsid))
    goto didit;
  dout(5) << "osd test "<< badboy << dendl;
  // first, verify the reporting host is valid
  //先从 osdmap中验证有无这个osd
  if (m->get_orig_source().is_osd()) {
    int from = m->get_orig_source().num();
    if (!osdmap.exists(from) ||
	!osdmap.get_addrs(from).legacy_equals(m->get_orig_source_addrs()) ||
	(osdmap.is_down(from) && m->if_osd_failed())) {
      dout(5) << "preprocess_failure from dead osd." << from
	      << ", ignoring" << dendl;
      send_incremental(op, m->get_epoch()+1);
      goto didit;
    }
  }
  // weird?
  //看osd状态
  if (osdmap.is_down(badboy)) {
    dout(5) << "preprocess_failure dne(/dup?): osd." << m->get_target_osd()
	    << " " << m->get_target_addrs()
	    << ", from " << m->get_orig_source() << dendl;
    if (m->get_epoch() < osdmap.get_epoch())
      send_incremental(op, m->get_epoch()+1);
    goto didit;
  }
  //核查上报错误osd 的地址
  if (osdmap.get_addrs(badboy) != m->get_target_addrs()) {
    dout(5) << "preprocess_failure wrong osd: report osd." << m->get_target_osd()
	    << " " << m->get_target_addrs()
	    << " != map's " << osdmap.get_addrs(badboy)
	    << ", from " << m->get_orig_source() << dendl;
    if (m->get_epoch() < osdmap.get_epoch())
      send_incremental(op, m->get_epoch()+1);
    goto didit;
  }

  // already reported?
  //再次验证 验证epoch
  if (osdmap.is_down(badboy) ||
      osdmap.get_up_from(badboy) > m->get_epoch()) {
    dout(5) << "preprocess_failure dup/old: osd." << m->get_target_osd()
	    << " " << m->get_target_addrs()
	    << ", from " << m->get_orig_source() << dendl;
    if (m->get_epoch() < osdmap.get_epoch())
      send_incremental(op, m->get_epoch()+1);
    goto didit;
  }

  if (!can_mark_down(badboy)) {
    dout(5) << "preprocess_failure ignoring report of osd."
	    << m->get_target_osd() << " " << m->get_target_addrs()
	    << " from " << m->get_orig_source() << dendl;
    goto didit;
 }
  return false;
 didit:
  mon->no_reply(op);
  return true;
}

验证之后执行 prepare\_update,和preprocess\_query 一样也是通过消息类型来执行对应的处理函数


bool OSDMonitor::prepare_update(MonOpRequestRef op)
{
  op->mark_osdmon_event(__func__);
  Message *m = op->get_req();
  dout(7) << "prepare_update " << *m << " from " << m->get_orig_source_inst() << dendl;

  switch (m->get_type()) {
    // damp updates
 .....
  case MSG_OSD_FAILURE:
    return prepare_failure(op);
 .....
  }

  return false;
}

prepare\_failure 函数将报告添加到 failure\_info中,最后再 check\_failure 执行标记osd down的错误


bool OSDMonitor::prepare_failure(MonOpRequestRef op)
{
  op->mark_osdmon_event(__func__);
  //转换消息类型
  MOSDFailure *m = static_cast<MOSDFailure*>(op->get_req());
  dout(1) << "prepare_failure osd." << m->get_target_osd()
	  << " " << m->get_target_addrs()
	  << " from " << m->get_orig_source()
          << " is reporting failure:" << m->if_osd_failed() << dendl;
 //获取上报的osd
  int target_osd = m->get_target_osd();
 //
  int reporter = m->get_orig_source().num();
  ceph_assert(osdmap.is_up(target_osd));
  ceph_assert(osdmap.get_addrs(target_osd) == m->get_target_addrs());

  mon->no_reply(op);

  if (m->if_osd_failed()) {
    // calculate failure time
    utime_t now = ceph_clock_now();
    utime_t failed_since =
      m->get_recv_stamp() - utime_t(m->failed_for, 0);

    // add a report
    if (m->is_immediate()) {
      mon->clog->debug() << "osd." << m->get_target_osd()
			 << " reported immediately failed by "
			 << m->get_orig_source();
      force_failure(target_osd, reporter);
      return true;
    }
    mon->clog->debug() << "osd." << m->get_target_osd() << " reported failed by "
		      << m->get_orig_source();

    failure_info_t& fi = failure_info[target_osd];

     // 在这个添加失败的报告
    MonOpRequestRef old_op = fi.add_report(reporter, failed_since, op);
    if (old_op) {
      mon->no_reply(old_op);
    }

    return check_failure(now, target_osd, fi);
  } else {
    // remove the report
    mon->clog->debug() << "osd." << m->get_target_osd()
		       << " failure report canceled by "
		       << m->get_orig_source();
    if (failure_info.count(target_osd)) {
      failure_info_t& fi = failure_info[target_osd];
      MonOpRequestRef report_op = fi.cancel_report(reporter);
      if (report_op) {
        mon->no_reply(report_op);
      }
      if (fi.reporters.empty()) {
	dout(10) << " removing last failure_info for osd." << target_osd
		 << dendl;
	failure_info.erase(target_osd);
      } else {
	dout(10) << " failure_info for osd." << target_osd << " now "
		 << fi.reporters.size() << " reporters" << dendl;
      }
    } else {
      dout(10) << " no failure_info for osd." << target_osd << dendl;
    }
  }

  return false;
}

check\_failure 函数并没有直接标记osd 为down,而是通过各种条件来调节 osd 失败的时间,然后再和 实际的osd\_heartbeat\_grace 做比较,以及上报数量


bool OSDMonitor::check_failure(utime_t now, int target_osd, failure_info_t& fi)
{
  // already pending failure?
  if (pending_inc.new_state.count(target_osd) &&
      pending_inc.new_state[target_osd] & CEPH_OSD_UP) {
    dout(10) << " already pending failure" << dendl;
    return true;
  }

  set<string> reporters_by_subtree;
  //故障域
  auto reporter_subtree_level = g_conf().get_val<string>("mon_osd_reporter_subtree_level");
  //
  utime_t orig_grace(g_conf()->osd_heartbeat_grace, 0);
  utime_t max_failed_since = fi.get_failed_since();
  utime_t failed_for = now - max_failed_since;

  utime_t grace = orig_grace;
  double my_grace = 0, peer_grace = 0;
  double decay_k = 0;

  //根据以往 osd的错误报告来 调整下  osd_heartbeat_grace 时间
  if (g_conf()->mon_osd_adjust_heartbeat_grace) {
 // decay_k 为 -0.000192541     ???
 //::log(.5)  -0.693147
    double halflife = (double)g_conf()->mon_osd_laggy_halflife;
    decay_k = ::log(.5) / halflife;

    // scale grace period based on historical probability of 'lagginess'
    //根据历史上的 "滞后性 "概率确定宽限期的规模
    // (false positive failures due to slowness).

    const osd_xinfo_t& xi = osdmap.get_xinfo(target_osd);
    //计算 log ?
    double decay = exp((double)failed_for * decay_k);
    dout(1) << " halflife " << halflife << " decay_k " << decay_k
	     << " failed_for " << failed_for << " decay " << decay << dendl;

    my_grace = decay * (double)xi.laggy_interval * xi.laggy_probability;
    grace += my_grace;
  }

  // consider the peers reporting a failure a proxy for a potential
  // 'subcluster' over the overall cluster that is similarly
  // laggy.  this is clearly not true in all cases, but will sometimes
  // help us localize the grace correction to a subset of the system
  // (say, a rack with a bad switch) that is unhappy.
  ceph_assert(fi.reporters.size());
  //遍历上报错误
  for (auto p = fi.reporters.begin(); p != fi.reporters.end();) {
    // get the parent bucket whose type matches with "reporter_subtree_level".
    // fall back to OSD if the level doesn't exist.
    if (osdmap.exists(p->first)) {
      //通过osdmap 获取完整的osd位置
      auto reporter_loc = osdmap.crush->get_full_location(p->first);
      //不在 指定的故障域
      if (auto iter = reporter_loc.find(reporter_subtree_level);
          iter == reporter_loc.end()) {
        reporters_by_subtree.insert("osd." + to_string(p->first));

        dout(1) <<"reporters_by_subtree, reporter_subtree_level:" <<reporter_subtree_level
                       << " vname:" << "osd." + to_string(p->first) << dendl;

      } else {
        string name = get_phy_name(iter->second);
        dout(1) <<"reporters_by_subtree, reporter_subtree_level:" <<reporter_subtree_level
                       << " vname:" << iter->second << " pname:" << name << dendl;
         reporters_by_subtree.insert(name);
      }
      if (g_conf()->mon_osd_adjust_heartbeat_grace) {
        const osd_xinfo_t& xi = osdmap.get_xinfo(p->first);
        //距离上次标记down的时间
        utime_t elapsed = now - xi.down_stamp;
        double decay = exp((double)elapsed * decay_k);
        //被标记为滞后和恢复的平均时间间隔
        peer_grace += decay * (double)xi.laggy_interval * xi.laggy_probability;
      }
      ++p;
    } else {
      fi.cancel_report(p->first);;
      p = fi.reporters.erase(p);
    }
  }

  if (g_conf()->mon_osd_adjust_heartbeat_grace) {
    peer_grace /= (double)fi.reporters.size();
    grace += peer_grace;
  }

  dout(10) << " osd." << target_osd << " has "
	   << fi.reporters.size() << " reporters, "
	   << grace << " grace (" << orig_grace << " + " << my_grace
	   << " + " << peer_grace << "), max_failed_since " << max_failed_since
	   << dendl;

// 以上都是在调整 grace的时间

// 实际 osd报错的时间 和 调整过的grace 时间做比较
// 以及报错的上报数量
  if (failed_for >= grace &&
      reporters_by_subtree.size() >= g_conf().get_val<uint64_t>("mon_osd_min_down_reporters")) {
    dout(1) << " we have enough reporters to mark osd." << target_osd
	    << " down" << dendl;
    // 将目标状态标记为 CEPH_OSD_UP,看注释是说 XORed onto previous state,异或?
    pending_inc.new_state[target_osd] = CEPH_OSD_UP;

    mon->clog->info() << "osd." << target_osd << " failed ("
		      << osdmap.crush->get_full_location_ordered_string(
			target_osd)
		      << ") ("
		      << (int)reporters_by_subtree.size()
		      << " reporters from different "
		      << reporter_subtree_level << " after "
		      << failed_for << " >= grace " << grace << ")";
    return true;
  }
  return false;
}

让后执行

bool PaxosService::dispatch(MonOpRequestRef op)

!image-20220926140755087

!image-20220926140802665

!image-20220926140809506

OSDMonitor::preprocess\_query(MonOpRequestRef op)


bool OSDMonitor::preprocess_query(MonOpRequestRef op)
{
  op->mark_osdmon_event(__func__);
  //取得 message
  Message *m = op->get_req();
  dout(10) << "preprocess_query " << *m << " from " << m->get_orig_source_inst() << dendl;
  //判断 message类性
  switch (m->get_type()) {
.....
  //osd上报错误的处理函数
  case MSG_OSD_FAILURE:
    return preprocess_failure(op);
.......
  default:
    ceph_abort();
    return true;
  }
}


bool OSDMonitor::preprocess_failure(MonOpRequestRef op)
{
  op->mark_osdmon_event(__func__);
  MOSDFailure *m = static_cast<MOSDFailure*>(op->get_req());
  // who is target_osd
  int badboy = m->get_target_osd();

  // check permissions
  if (check_source(op, m->fsid))
    goto didit;

  // first, verify the reporting host is valid
  if (m->get_orig_source().is_osd()) {
    int from = m->get_orig_source().num();
    if (!osdmap.exists(from) ||
	!osdmap.get_addrs(from).legacy_equals(m->get_orig_source_addrs()) ||
	(osdmap.is_down(from) && m->if_osd_failed())) {
      dout(5) << "preprocess_failure from dead osd." << from
	      << ", ignoring" << dendl;
      send_incremental(op, m->get_epoch()+1);
      goto didit;
    }
  }

  // weird?
  if (osdmap.is_down(badboy)) {
    dout(5) << "preprocess_failure dne(/dup?): osd." << m->get_target_osd()
	    << " " << m->get_target_addrs()
	    << ", from " << m->get_orig_source() << dendl;
    if (m->get_epoch() < osdmap.get_epoch())
      send_incremental(op, m->get_epoch()+1);
    goto didit;
  }
  if (osdmap.get_addrs(badboy) != m->get_target_addrs()) {
    dout(5) << "preprocess_failure wrong osd: report osd." << m->get_target_osd()
	    << " " << m->get_target_addrs()
	    << " != map's " << osdmap.get_addrs(badboy)
	    << ", from " << m->get_orig_source() << dendl;
    if (m->get_epoch() < osdmap.get_epoch())
      send_incremental(op, m->get_epoch()+1);
    goto didit;
  }

  // already reported?
  if (osdmap.is_down(badboy) ||
      osdmap.get_up_from(badboy) > m->get_epoch()) {
    dout(5) << "preprocess_failure dup/old: osd." << m->get_target_osd()
	    << " " << m->get_target_addrs()
	    << ", from " << m->get_orig_source() << dendl;
    if (m->get_epoch() < osdmap.get_epoch())
      send_incremental(op, m->get_epoch()+1);
    goto didit;
  }

  if (!can_mark_down(badboy)) {
    dout(5) << "preprocess_failure ignoring report of osd."
	    << m->get_target_osd() << " " << m->get_target_addrs()
	    << " from " << m->get_orig_source() << dendl;
    goto didit;
  }

  dout(10) << "preprocess_failure new: osd." << m->get_target_osd()
	   << " " << m->get_target_addrs()
	   << ", from " << m->get_orig_source() << dendl;
  return false;

 didit:
  mon->no_reply(op);
  return true;
}


bool OSDMonitor::check_failure(utime_t now, int target_osd, failure_info_t& fi)
{
  // already pending failure?
  if (pending_inc.new_state.count(target_osd) &&
      pending_inc.new_state[target_osd] & CEPH_OSD_UP) {
    dout(10) << " already pending failure" << dendl;
    return true;
  }

  set<string> reporters_by_subtree;
  //故障域
  auto reporter_subtree_level = g_conf().get_val<string>("mon_osd_reporter_subtree_level");
  //
  utime_t orig_grace(g_conf()->osd_heartbeat_grace, 0);
  utime_t max_failed_since = fi.get_failed_since();
  utime_t failed_for = now - max_failed_since;

  utime_t grace = orig_grace;
  double my_grace = 0, peer_grace = 0;
  double decay_k = 0;

  //根据以往 osd的错误报告来 调整下  osd_heartbeat_grace 时间
  if (g_conf()->mon_osd_adjust_heartbeat_grace) {
 // decay_k 为 -0.000192541     ???
 //::log(.5)  -0.693147
    double halflife = (double)g_conf()->mon_osd_laggy_halflife;
    decay_k = ::log(.5) / halflife;

    // scale grace period based on historical probability of 'lagginess'
    //根据历史上的 "滞后性 "概率确定宽限期的规模
    // (false positive failures due to slowness).

    const osd_xinfo_t& xi = osdmap.get_xinfo(target_osd);
    //计算 log ?
    double decay = exp((double)failed_for * decay_k);
    dout(1) << " halflife " << halflife << " decay_k " << decay_k
	     << " failed_for " << failed_for << " decay " << decay << dendl;

    my_grace = decay * (double)xi.laggy_interval * xi.laggy_probability;
    grace += my_grace;
  }
  //遍历上报错误
  for (auto p = fi.reporters.begin(); p != fi.reporters.end();) {
    // get the parent bucket whose type matches with "reporter_subtree_level".
    // fall back to OSD if the level doesn't exist.
    if (osdmap.exists(p->first)) {
      //通过osdmap 获取完整的osd位置
      auto reporter_loc = osdmap.crush->get_full_location(p->first);
      //不在 指定的故障域
      if (auto iter = reporter_loc.find(reporter_subtree_level);
          iter == reporter_loc.end()) {
        reporters_by_subtree.insert("osd." + to_string(p->first));
      } else {
        string name = get_phy_name(iter->second);
           reporters_by_subtree.insert(name);
      }
      if (g_conf()->mon_osd_adjust_heartbeat_grace) {
        const osd_xinfo_t& xi = osdmap.get_xinfo(p->first);
        //距离上次标记down的时间
        utime_t elapsed = now - xi.down_stamp;
        double decay = exp((double)elapsed * decay_k);
        //被标记为滞后和恢复的平均时间间隔
        peer_grace += decay * (double)xi.laggy_interval * xi.laggy_probability;
      }
      ++p;
    } else {
      fi.cancel_report(p->first);;
      p = fi.reporters.erase(p);
    }
  }

  if (g_conf()->mon_osd_adjust_heartbeat_grace) {
    peer_grace /= (double)fi.reporters.size();
    grace += peer_grace;
  }
// 以上都是在调整 grace的时间

// 实际 osd报错的时间 和 调整过的grace 时间做比较
// 以及报错的上报数量
  if (failed_for >= grace &&
      reporters_by_subtree.size() >= g_conf().get_val<uint64_t>("mon_osd_min_down_reporters")) {

    // 将目标状态标记为 CEPH_OSD_UP,看注释是说 XORed onto previous state,异或?
    pending_inc.new_state[target_osd] = CEPH_OSD_UP;
    return true;
  }
  return false;
}

一类是同个PG下的OSD节点之间

另一类是OSD的左右两个相邻的节点,

可以用于节点间检测对方是否故障,以便及时发现故障节点后进入相应的故障处理流程。

每个OSD节点(守护进程)会监听public、cluster、front和back四个端口

  • public端口:监听来自Monitor和Client的连接。
  • cluster端口:监听来自OSD Peer的连接(同一个集群内的osd)。
  • front端口:供客户端连接集群使用的网卡, 这里临时给集群内部之间进行心跳。
  • back端口:供客集群内部使用的网卡。集群内部之间进行心跳。

每个 Ceph OSD 守护进程每 6 秒检查其他 Ceph OSD 守护进程的心跳。要更改心跳间隔,请在 Ceph 配置文件的 [osd] 部分下添加 osd heartbeat interval 设置,或者在运行时更改其值。

如果邻居 Ceph OSD 守护进程未在 20 秒宽限期内发送 heartbeat 数据包,Ceph OSD 守护进程可能会考虑相邻的 Ceph OSD 守护进程 停机。它将报告回 Ceph 监控器,后者将更新 Ceph 群集映射。若要更改此宽限期,可在 Ceph 配置文件的 [osd] 部分下添加 osd heartbeat 宽限期,或者在运行时设置其值。

!image-20220914114020984

osd 之间心跳机制:osd之间会互相通信,通信间隔时间在随机 6s以内;如果在20s后没有收到osd没有收到相邻的osd心跳,会认为相邻的osd已经 dowm,并报告给 mon;

!image-20220912181919117

osd和mon之间的心跳机制:

  • 向mon 上报 osd down信息,mon判断机制是收到两个以上某个osd dowm的信息

!image-20220912182244414

  • 上报 osd 状态,变更信息,不管osd 有没有变更信息,每个120s都要上报一次

!image-20220912182611844

  • 当 获取不到 邻居osd信息时,上报给mon获取新的 osdmap

!image-20220912182528403

!image-20220913174728093

!image-20220907160040430

!image-20220907165035868

\


评论