osd 心跳

Last updated on 3 months ago


osd 心跳是什么,有什么功能作用

心跳是什么?

​ 心跳是一种用于故障检测的手段(简单的说就是发个数据包给你,通过有没有回复来判断你的状态)。在分布式系统中有很多节点,节点数量多了,各种异常就会经常发生,如:宕机、磁盘损坏、网络故障等,通过心跳这种机制可以快速有效的定位集群中的错误节点,并做及时的处理保证集群正常服务。

osd 心跳

  • osd是什么?
    通俗理解为 主要负责管理ceph集群中磁盘的守护进程

  • osd心跳是什么?有啥功能

    osd心跳是检测osd故障的一种机制(集群健康工作的基石), 如果集群中的其中一个osd 突然故障了(假如说是网络隔绝),此时他也没办法发信息给mon,那么此时只有通过其他 osd来上报这个信息,说我有个邻居挂掉了,那么osd是怎么确定他邻居osd已经出故障了呢?

osd之间检测

  1. 每个 Ceph OSD 守护进程以小于每 6 秒的随机间隔时间检查其他 Ceph OSD 守护进程的心跳(可以理解为:定期去敲邻居的门)

    image-20220919000635555·

  2. 如果一个相邻的Ceph OSD Daemon在20秒的宽限期内没有显示心跳,Ceph OSD Daemon可能会认为相邻的Ceph OSD Daemon是 dowm状态的,并将其报告给Ceph Monitor(可以理解为:敲了那么长时间的门都没来开门,就报警了)

    image-20220919000758792
  3. 一个osd 无法和配置文件中的osd 达成 peer 关系,则这个osd 每 30 秒 ping 一次mon 以获取集群的osdmap
    image-20220915161221587

  4. 如果集群只有一个osd,则 osd每隔 一个 osd_mon_heartbeat_interval 的时间向 mon 获取新的 osdmap

osd和mon之间

此外 osd除了有检测osd故障的职责,还需要向mon汇报自身的状况

触发汇报的情况有以下几种:

  • OSD有事件发生时(比如故障、PG变更)。

  • osd启动时5秒内

  • OSD周期性的上报 mon (无论是否发现变化 ,默认周期120s)

  • OSD检查failure_queue中的伙伴OSD失败信息。

那些场景需要用到

查看 mon的日志,看丢失的心跳包:
osd心跳是一直伴随着集群存在的,也是检验集群健康的标志之一,

调整osd心跳 的参数:
检查其它 osd 心跳的时间间隔,心跳响应宽限期,这些都会影响业务;故障的发现时间和心跳带来的负载之间做权衡,比如大规模的集群中, 心跳频率太高则过多的心跳报文会影响系统性能,如果心跳频率过低则会延长发现故障节点的时间,从而影响系统的可用性。

基本使用(参数配置)

osd心跳是集群正常工作的基石,osd的心跳的使用一般都是通过调节参数来到性能上的平衡,以下是 osd心跳机制的 参数,可以用 ceph daemon 命令设置

这里补充比较重要的参数(修改可以在 使用adminsocket 在线修改参数,当然也可以在 ceph.conf 中修改添加参数后重启 )

osd_mon_heartbeat_interval
如果 Ceph OSD 守护进程没有 Ceph OSD 守护进程对等点,Ceph OSD 守护进程对 Ceph 监视器执行 ping 操作的频率。

osd_heartbeat_interval
Ceph OSD 守护进程 ping 其对等节点的频率(默认为6s)。

osd_heartbeat_grace
当Ceph OSD Daemon没有显示心跳时,Ceph 认为它已经停机的时间 (默认为20s)

mon_osd_min_down_reporters (一般为2 )
报告故障的OSD 所需的最小数量

mon_osd_reporter_subtree_level
设置报告故障的OSD 来自那个故障域的,默认为 host;默认情况下只需要两个来自不同故障域的报告就可以报告另一个OSD为停机

osd_heartbeat_min_peers
每次要发送心跳包对象的个数,默认 10个

osd_mon_report_interval 默认 5s
OSD 启动或其它可报告事件的报告间隔时间

osd_beacon_report_interval
osd周期性向 monitor发送 beacon消息进行保活 默认100s

mon_osd_report_timeout
mon标记一个osd为down的最长等待时间 默认值 900

osd_mon_heartbeat_stat_stale
如果发送它将每 30 秒 ping 一次 Ceph 监视器以获取集群映射的最新副本…..

模块组成以及实现架构

关键的数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// information about a heartbeat peer 
//这个类 用来记录 peer osd 信息以及 关于heartbeat 的操作函数都在这
struct HeartbeatInfo {
int peer; ///< peer
ConnectionRef con_front; ///< peer connection (front)
ConnectionRef con_back; ///< peer connection (back)
utime_t first_tx; ///< time we sent our first ping request
utime_t last_tx; ///< last time we sent a ping request
utime_t last_rx_front; ///< last time we got a ping reply on the front side
utime_t last_rx_back; ///< last time we got a ping reply on the back side
epoch_t epoch; ///< most recent epoch we wanted this peer
/// number of connections we send and receive heartbeat pings/replies
static constexpr int HEARTBEAT_MAX_CONN = 2;
/// history of inflight pings, arranging by timestamp we sent
/// send time -> deadline -> remaining replies
map<utime_t, pair<utime_t, int>> ping_history;

//这个map保存了 peer osd的heartbeat_peers,key是 osd的 id
map<int,HeartbeatInfo> heartbeat_peers; ///< map of osd id to HeartbeatInfo
// ....
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// osd 接受心跳消息的handle 函数
// 在 class OSD 里面
struct HeartbeatDispatcher : public Dispatcher {
OSD *osd;
explicit HeartbeatDispatcher(OSD *o) : Dispatcher(o->cct), osd(o) {}
void ms_fast_dispatch(Message *m) override {
osd->heartbeat_dispatch(m);
}
bool ms_dispatch(Message *m) override {
return osd->heartbeat_dispatch(m);
}
//........................
KeyStore *ms_get_auth1_authorizer_keystore() override {
return osd->ms_get_auth1_authorizer_keystore();
}
} heartbeat_dispatcher;

整体设计 流程

osd 如何发送和接受心跳的

public network 和 cluster network

​ osd 直接的通信数据需要通过 ceph网络层传输; 一般来说 ceph集群是 内外网分离的,集群内部通信用单独cluster network,客户端外部和集群通信也是单独用 public network(可以理解为用两张网卡)
​ ceph将osd间的副本数据、迁移数据的传输交由cluster network,将client和ceph后端的数据传输交由public networkimage-20220914175829713

​ osd之间心跳通信使用了 四个message对象 (ceph中 用于通信的类在 osd 初始化时会创建,可以理解为一个专门负责派送某一种类型的快递员)用于osd心跳通信,分别是

  • ms_hb_front_client : 用于向其他 osd 发送心跳

  • ms_hb_back_client: 用于向其他 osd 发送心跳

  • ms_hb_back_server: 用于接受他 osd 发送心跳

  • ms_hb_front_server: 用于接收他 osd 发送心跳

  • front 用的是public network,用于检测客户端网络连接问题,back 用的是 cluster network。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    # ceph_osd.cc
    // 创建四个关于心跳的 message
    int main(int argc, const char **argv) {
    Messenger *ms_hb_back_client = Messenger::create(g_ceph_context, cluster_msg_type,
    entity_name_t::OSD(whoami), "hb_back_client",
    getpid(), Messenger::HEARTBEAT);
    Messenger *ms_hb_front_client = Messenger::create(g_ceph_context, public_msg_type,
    entity_name_t::OSD(whoami), "hb_front_client",
    getpid(), Messenger::HEARTBEAT);
    Messenger *ms_hb_back_server = Messenger::create(g_ceph_context, cluster_msg_type,
    entity_name_t::OSD(whoami), "hb_back_server",
    getpid(), Messenger::HEARTBEAT);
    Messenger *ms_hb_front_server = Messenger::create(g_ceph_context, public_msg_type,
    entity_name_t::OSD(whoami), "hb_front_server",
    getpid(), Messenger::HEARTBEAT);
    //osd 初始化的类其参数就需要 关于 心跳的 Message 实例
    osd = new OSD(g_ceph_context,
    store,
    whoami,
    ms_cluster,
    ms_public,
    ms_hb_front_client,
    ms_hb_back_client,
    ms_hb_front_server,
    ms_hb_back_server,
    ms_objecter,
    &mc,
    data_path,
    journal_path);

    // 启动 message
    ms_hb_front_client->start();
    ms_hb_back_client->start();
    ms_hb_front_server->start();
    ms_hb_back_server->start();
    }

image-20220914175706661ms_hbclient <-> ms_hb_back_server ms_hbclient <-> ms_hb_front_server

发送数据

image-20220919105101754

osd 专门开了一个线程用来 发送心跳,遍历 heartbeat_peers ,逐个发送心跳包

image-20220925151116385

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
//解
struct T_Heartbeat : public Thread {
OSD *osd;
explicit T_Heartbeat(OSD *o) : osd(o) {}
void *entry() override {
osd->heartbeat_entry();
return 0;
}
} heartbeat_thread;
int OSD::init()
{
// start the heartbeat thread
//传入的参数 是线程的名字
heartbeat_thread.create("osd_srv_heartbt");
//....
}
void OSD::heartbeat_entry()
{
std::lock_guard l(heartbeat_lock);
if (is_stopping())
return;
while (!heartbeat_stop) {
heartbeat();

double wait;
//等待的时间分为两种,一种是固定的 osd_heartbeat_interval,另外一种是 基于osd_heartbeat_interval生成的随机时间
if (cct->_conf.get_val<bool>("debug_disable_randomized_ping")) {
wait = (float)cct->_conf->osd_heartbeat_interval;
} else {
wait = .5 + ((float)(rand() % 10)/10.0) * (float)cct->_conf->osd_heartbeat_interval;
}
utime_t w;
//转换成 utime_t
w.set_from_double(wait);
dout(30) << "heartbeat_entry sleeping for " << wait << dendl;

// 这里用条件变量来等待时间,并没有释放锁
heartbeat_cond.WaitInterval(heartbeat_lock, w);
if (is_stopping())
return;
dout(30) << "heartbeat_entry woke up" << dendl;

}
}

发送 heartbeat 过程都在 heartbeat(); 中 ,这里分析下
heartbeat 函数主要为

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
void OSD::heartbeat()
{
ceph_assert(heartbeat_lock.is_locked_by_me());
dout(30) << "heartbeat" << dendl;

// get CPU load avg
double loadavgs[1];
int hb_interval = cct->_conf->osd_heartbeat_interval;
int n_samples = 86400;
if (hb_interval > 1) {
n_samples /= hb_interval;
if (n_samples < 1)
n_samples = 1;
}
cout << "send times : " << n_samples << std::endl;
// 估计下每次发送osd 负载值????
if (getloadavg(loadavgs, 1) == 1) {
cout <<loadavgs[0] << std::endl;
logger->set(l_osd_loadavg, 100 * loadavgs[0]);
daily_loadavg = (daily_loadavg * (n_samples - 1) + loadavgs[0]) / n_samples;

dout(1) << "heartbeat: daily_loadavg " << daily_loadavg << dendl;
}

dout(1) << "heartbeat checking stats" << dendl;

// refresh peer list and osd stats
// hb_peers保存 id(这里是 osd 的id)
vector<int> hb_peers;

//遍历 peer osd, peer osd 如何更新在下面会说到
for (map<int,HeartbeatInfo>::iterator p = heartbeat_peers.begin();
p != heartbeat_peers.end();
++p)
hb_peers.push_back(p->first);

// 更新状态 osd 状态
auto new_stat = service.set_osd_stat(hb_peers, get_num_pgs());
dout(5) << __func__ << " " << new_stat << dendl;
ceph_assert(new_stat.statfs.total);

// 这比例指 osd 使用空间变更比例
float pratio;
float ratio = service.compute_adjusted_ratio(new_stat, &pratio);
std::cout << "ratio : " << ratio << std::endl;
service.check_full_status(ratio, pratio);

//获取当前实际 精确到纳秒
utime_t now = ceph_clock_now();
utime_t deadline = now;

//deadline 最大等待回应的时间
deadline += cct->_conf->osd_heartbeat_grace;
std::cout << "now " << now << std::endl;
std::cout << "deadline " << deadline << std::endl;

// send heartbeats
// 遍历 对等 peer,逐个发送
for (map<int,HeartbeatInfo>::iterator i = heartbeat_peers.begin();
i != heartbeat_peers.end();
++i) {
int peer = i->first;
i->second.last_tx = now;
if (i->second.first_tx == utime_t())
i->second.first_tx = now;
// 保存发送 heartbeat 记录,HEARTBEAT_MAX_CONN 为发送heartbeat 数量
//key 发送的时间和, value 为 心跳回复的期限
i->second.ping_history[now] = make_pair(deadline,
HeartbeatInfo::HEARTBEAT_MAX_CONN);

if (i->second.hb_interval_start == utime_t())
i->second.hb_interval_start = now;

dout(1) << "heartbeat sending ping to osd." << peer << dendl;
// 集群内发送
i->second.con_back->send_message(new MOSDPing(monc->get_fsid(),
service.get_osdmap_epoch(),
MOSDPing::PING, now,
cct->_conf->osd_heartbeat_min_size));

// 如果有 设置 public network
if (i->second.con_front)
i->second.con_front->send_message(new MOSDPing(monc->get_fsid(),
service.get_osdmap_epoch(),
MOSDPing::PING, now,
cct->_conf->osd_heartbeat_min_size));
}
// MOSDPing::PING 心跳信息的类型,还有
logger->set(l_osd_hb_to, heartbeat_peers.size());
// 如果集群只有一个 osd,他会在 osd_mon_heartbeat_interval 时间内,向mon获取新的 osdmap
// hmm.. am i all alone?
dout(30) << "heartbeat lonely?" << dendl;
if (heartbeat_peers.empty()) {
if (now - last_mon_heartbeat > cct->_conf->osd_mon_heartbeat_interval && is_active()) {
last_mon_heartbeat = now;
dout(1) << "i have no heartbeat peers; checking mon for new map" << dendl;
osdmap_subscribe(get_osdmap_epoch() + 1, false);
}
}

dout(30) << "heartbeat done" << dendl;
}

更新 heartbeat_peers

osd 发送心跳包给peer osd,peer OSD的选择并不是集群中的全部osd,peer OSD如果选择其他所有结点,则会增加集群负载影响系统性能。peer OSD的选择是通过maybe_update_heartbeat_peers函数经行更新 heartbeat_peers;有以下三种情况会更新 heartbeat_peers:

  • pg创建的时候,参见handle_pg_create;
  • osdmap变更时,参见handle_osd_map;
  • tick定时器会周期性的检测.

osd 每次发送心跳的时候都需要遍历 heartbeat_peers, heartbeat_peers的 是个map,其 key 为osd的id,value是 HeartbeatInfo 结构体

map<int,HeartbeatInfo> heartbeat_peers; ///< map of osd id to HeartbeatInfo

tick定时器会周期性的检测.

ceph 使用了一个定时器来更新 heartbeat_peers

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// tick
//get_tick_interval生成随机时间 在1s上下,不想让他太频繁
double OSD::get_tick_interval() const
{
// vary +/- 5% to avoid scrub scheduling livelocks
constexpr auto delta = 0.05;
return (OSD_TICK_INTERVAL *
ceph::util::generate_random_number(1.0 - delta, 1.0 + delta));
}
// 在 int OSD::init()中有一个 tick_timer.add_event_after
// 从函数名 可看出 添加一个时间,在 get_tick_interval() 后执行事件
tick_timer.add_event_after(get_tick_interval(),
new C_Tick(this));
// 从 OSD::C_Tick 可以看出, 执行的是 osd->tick() 函数
class OSD::C_Tick : public Context {
OSD *osd;
public:
explicit C_Tick(OSD *o) : osd(o) {}
void finish(int r) override {
osd->tick();
}
};

osd->tick()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
//对几种情况
void OSD::tick()
{
ceph_assert(osd_lock.is_locked());
dout(10) << "tick" << dendl;
//等待 osd心跳健康,
if (is_active() || is_waiting_for_healthy()) {
//更新 heartbeat_peers
maybe_update_heartbeat_peers();
}

if (is_waiting_for_healthy()) {
start_boot();
}
//更新 osdmap epoch
if (is_waiting_for_healthy() || is_booting()) {
std::lock_guard l(heartbeat_lock);
utime_t now = ceph_clock_now();
if (now - last_mon_heartbeat > cct->_conf->osd_mon_heartbeat_interval) {
last_mon_heartbeat = now;
dout(1) << __func__ << " checking mon for new map" << dendl;
osdmap_subscribe(get_osdmap_epoch() + 1, false);
}
}

do_waiters();
//继续添加 事件,循环调用 OSD::tick()
tick_timer.add_event_after(get_tick_interval(), new C_Tick(this));
}

OSD::tick 中 更新 heartbeat_peers 的关键函数是 maybe_update_heartbeat_peers() 这里简要分析下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
void OSD::maybe_update_heartbeat_peers()
{
ceph_assert(osd_lock.is_locked());
if (is_waiting_for_healthy() || is_active()) {
utime_t now = ceph_clock_now();
//第一次启动时候
if (last_heartbeat_resample == utime_t()) {
last_heartbeat_resample = now;
heartbeat_set_peers_need_update();
} else if (!heartbeat_peers_need_update()) {
utime_t dur = now - last_heartbeat_resample;
//仅仅在超出grace时间后才更新
if (dur > cct->_conf->osd_heartbeat_grace) {
dout(10) << "maybe_update_heartbeat_peers forcing update after " << dur << " seconds" << dendl;
// 更新 peer的标志位
heartbeat_set_peers_need_update();
////
last_heartbeat_resample = now;
// automatically clean up any stale heartbeat peers
// if we are unhealthy, then clean all
//清空 peers
reset_heartbeat_peers(is_waiting_for_healthy());
}
}
}

// ............
// build heartbeat from set
// 从同一个 pg下的osd中 获取 peer
if (is_active()) {
vector<PGRef> pgs;
_get_pgs(&pgs);
for (auto& pg : pgs) {
pg->with_heartbeat_peers([&](int peer) {
if (get_osdmap()->is_up(peer)) {
_add_heartbeat_peer(peer);
}
});
}
}

// include next and previous up osds to ensure we have a fully-connected set
//want: 保存 osd的相邻osd id,以防止 peer数量不够
set<int> want, extras;
const int next = get_osdmap()->get_next_up_osd_after(whoami);
if (next >= 0)
want.insert(next);
int prev = get_osdmap()->get_previous_up_osd_before(whoami);
if (prev >= 0 && prev != next)
want.insert(prev);

// make sure we have at least **min_down** osds coming from different
// subtree level (e.g., hosts) for fast failure detection.
// 确保至少有 min_down 个osd peer(理论上), 从 level 故障域 和 min_down 从整个集群 随机抽取两个osd
auto min_down = cct->_conf.get_val<uint64_t>("mon_osd_min_down_reporters");
auto subtree = cct->_conf.get_val<string>("mon_osd_reporter_subtree_level");

get_osdmap()->get_random_up_osds_by_subtree(
whoami, subtree, min_down, want, &want);

for (set<int>::iterator p = want.begin(); p != want.end(); ++p) {
dout(10) << " adding neighbor peer osd." << *p << dendl;
extras.insert(*p);
_add_heartbeat_peer(*p);
}
//删除已经down的osd
// remove down peers; enumerate extras
map<int,HeartbeatInfo>::iterator p = heartbeat_peers.begin();
while (p != heartbeat_peers.end()) {
if (!get_osdmap()->is_up(p->first)) {
int o = p->first;
++p;
_remove_heartbeat_peer(o);
continue;
}
if (p->second.epoch < get_osdmap_epoch()) {
extras.insert(p->first);
}
++p;
}

// too few?
//peer数量过少,默认 10个,从 want 里面添加
for (int n = next; n >= 0; ) {
if ((int)heartbeat_peers.size() >= cct->_conf->osd_heartbeat_min_peers)
break;
if (!extras.count(n) && !want.count(n) && n != whoami) {
dout(10) << " adding random peer osd." << n << dendl;
extras.insert(n);
_add_heartbeat_peer(n);
}
n = get_osdmap()->get_next_up_osd_after(n);
if (n == next)
break; // came full circle; stop
}

// too many?
for (set<int>::iterator p = extras.begin();
(int)heartbeat_peers.size() > cct->_conf->osd_heartbeat_min_peers && p != extras.end();
++p) {
if (want.count(*p))
continue;
_remove_heartbeat_peer(*p);
}

dout(10) << "maybe_update_heartbeat_peers " << heartbeat_peers.size() << " peers, extras " << extras << dendl;
}
pg创建的时候

在 函数 handle_pg_create 里最后也调用了 maybe_update_heartbeat_peers();

1
2
3
4
5
6
7
8
9
10
void OSD::handle_pg_create(OpRequestRef op)
{
const MOSDPGCreate *m = static_cast<const MOSDPGCreate*>(op->get_req());
ceph_assert(m->get_type() == MSG_OSD_PG_CREATE);

dout(10) << "handle_pg_create " << *m << dendl;
//........................

maybe_update_heartbeat_peers();
}
osdmap 变更时候
1
2
3
4
5
6
7
void OSD::handle_osd_map(MOSDMap *m)
{
//..................
if (is_active() || is_waiting_for_healthy())
maybe_update_heartbeat_peers();
//..................
}

接收数据

在 osd初始化时,创建四个关于心跳的 message,(上文有提到), Message 类中 处理消息的方法 handle 会注册到分发中心

1
2
3
4
5
6
7
8
9
10
11
12
//heartbeat_dispatcher 是一个结构体,里面有处理心跳信息的函数

int OSD::init()
{
...
//注册dispatcher, heartbeat_dispatcher 处理心跳信息的函数
hb_front_client_messenger->add_dispatcher_head(&heartbeat_dispatcher);
hb_back_client_messenger->add_dispatcher_head(&heartbeat_dispatcher);
hb_front_server_messenger->add_dispatcher_head(&heartbeat_dispatcher);
hb_back_server_messenger->add_dispatcher_head(&heartbeat_dispatcher);
...
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// heartbeat_dispatch
// osd 收到心跳消息 的类型是 MSG_OSD_PING ,执行 handle_osd_ping
bool OSD::heartbeat_dispatch(Message *m)
{
dout(30) << "heartbeat_dispatch " << m << dendl;
switch (m->get_type()) {

case CEPH_MSG_PING:
dout(10) << "ping from " << m->get_source_inst() << dendl;
m->put();
break;

case MSG_OSD_PING:
handle_osd_ping(static_cast<MOSDPing*>(m));
break;

default:
dout(0) << "dropping unexpected message " << *m << " from " << m->get_source_inst() << dendl;
m->put();
}

return true;
}

这里分析下 当 osd 收到心跳包后 handle_osd_ping是怎么处理的
处理的代码很长,主要可以分为三部分: 心跳通知、心跳回复、osd dowm,从这三个case 分析下

image-20220920104648238

处理心跳通知
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
//收到的是心跳通知 
case MOSDPing::PING:
{
//debug 模式
//这里判断是否超时
if (!cct->get_heartbeat_map()->is_healthy()) {
dout(10) << "internal heartbeat not healthy, dropping ping request" << dendl;
break;
}
//回复 心跳通知
Message *r = new MOSDPing(monc->get_fsid(),
curmap->get_epoch(),
MOSDPing::PING_REPLY, m->stamp,
cct->_conf->osd_heartbeat_min_size);
m->get_connection()->send_message(r);
//验证 发送方的osd是否 为 up
if (curmap->is_up(from)) {
// 从收到消息中 更新osd peer的epoch
service.note_peer_epoch(from, m->map_epoch);
if (is_active()) {
ConnectionRef con = service.get_con_osd_cluster(from, curmap->get_epoch());
if (con) {
service.share_map_peer(from, con.get());
}
}
//没有在 osdmap中发现你
} else if (!curmap->exists(from) ||
curmap->get_down_at(from) > m->map_epoch) {
// tell them they have died
Message *r = new MOSDPing(monc->get_fsid(),
curmap->get_epoch(),
MOSDPing::YOU_DIED,
m->stamp,
cct->_conf->osd_heartbeat_min_size);
m->get_connection()->send_message(r);
}
}
处理心跳回复

处理函数太长了,大概看了下,主要功能是 根据时间戳更新 ping_history,更新peer对应的 HeartbeatInfo,以及 osd状态,这里没有心跳超时的检测,有一个定时任务专门负责

1

心跳超时检测 和上报消息

心跳超时检测 在 OSD::init()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
int OSD::init()
{
//.......
tick_timer.add_event_after(get_tick_interval(),
new C_Tick(this));
{
std::lock_guard l(tick_timer_lock);
//每隔 get_tick_interval()时间 调C_Tick_WithoutOSDLock类里的 osd->tick_without_osd_lock() 函数
tick_timer_without_osd_lock.add_event_after(get_tick_interval(),
new C_Tick_WithoutOSDLock(this));
}
//.......
}

class OSD::C_Tick_WithoutOSDLock : public Context {
OSD *osd;
public:
explicit C_Tick_WithoutOSDLock(OSD *o) : osd(o) {}
void finish(int r) override {
osd->tick_without_osd_lock();
}
};

这里详细介绍下 tick_without_osd_lock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
void OSD::tick_without_osd_lock()
{
//......
if (is_active() || is_waiting_for_healthy()) {
heartbeat_lock.Lock();
//关键函数 heartbeat_check
//heartbeat_check 会将 有错误的peer都会记录到 failure_queue
heartbeat_check();
heartbeat_lock.Unlock();

map_lock.get_read();
std::lock_guard l(mon_report_lock);

// mon report?
utime_t now = ceph_clock_now();
if (service.need_fullness_update() ||
now - last_mon_report > cct->_conf->osd_mon_report_interval) {
last_mon_report = now;
send_full_update();
//send_failures 会遍历 failure_queue,然后将错误的消息发送给 mon
send_failures();
}
map_lock.put_read();

epoch_t max_waiting_epoch = 0;
for (auto s : shards) {
max_waiting_epoch = std::max(max_waiting_epoch,
s->get_max_waiting_epoch());
}
if (max_waiting_epoch > get_osdmap()->get_epoch()) {
dout(20) << __func__ << " max_waiting_epoch " << max_waiting_epoch
<< ", requesting new map" << dendl;
osdmap_subscribe(superblock.newest_map + 1, false);
}
}

if (is_active()) {
.....
const auto now = ceph::coarse_mono_clock::now();
{
// 周期性的向 mon报告
std::lock_guard l{min_last_epoch_clean_lock};
const auto elapsed = now - last_sent_beacon;
if (chrono::duration_cast<chrono::seconds>(elapsed).count() >
cct->_conf->osd_beacon_report_interval) {
need_send_beacon = true;
}
}
if (need_send_beacon) {
//将自己当前的 eponch 发送给mon
send_beacon(now);
}
}
....
//更新 定时器任务
tick_timer_without_osd_lock.add_event_after(get_tick_interval(),
new C_Tick_WithoutOSDLock(this));
}
heartbeat_check
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
void OSD::heartbeat_check()
{
ceph_assert(heartbeat_lock.is_locked());
utime_t now = ceph_clock_now();

// check for incoming heartbeats (move me elsewhere?)
// 遍历 heartbeat_peers
for (map<int,HeartbeatInfo>::iterator p = heartbeat_peers.begin();
p != heartbeat_peers.end();
++p) {

if (p->second.first_tx == utime_t()) {
dout(30) << "heartbeat_check we haven't sent ping to osd." << p->first
<< " yet, skipping" << dendl;
continue;
}
.....
// 收到回复后,都会根据接收时间 记录 ping_history里
if (p->second.is_unhealthy(now)) {
utime_t oldest_deadline = p->second.ping_history.begin()->second.first;
//
if (p->second.last_rx_back == utime_t() ||
p->second.last_rx_front == utime_t()) {
// fail
//有错误的peer都会记录到 failure_queue,会统一发送给mon
failure_queue[p->first] = p->second.first_tx;
} else {
.....
// fail
failure_queue[p->first] = std::min(p->second.last_rx_back, p->second.last_rx_front);
}
}
}
}

mon 如何处理上报信息的

image-20220922164958665

image-20220922170235620

和osd接受心跳消息一样,在mon中 也是有一个dispatch 来处理信息,并且也是根据消息类型来处理
image-20220926110653634

最终会调用 paxos_service[PAXOS_OSDMAP]->dispatch(op); paxos_service 在 mon初始化时候就已经构造好了,可以看到处理reset重新指向了OSDMonitor的类,所以说处理报错消息是由OSDMonitor类的函数处理的;
image-20220926145352556

看看 paxos_service[PAXOS_OSDMAP]->dispatch(op) 是怎么处理的
对于上报消息并不是直接处理,而是分为两个步骤

  • 预处理 preprocess_query
  • 更行状态 prepare_update
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
bool PaxosService::dispatch(MonOpRequestRef op)
{
........
// 预处理 操作
// preprocess
if (preprocess_query(op))
return true; // easy!
...........
.......
// update
if (!prepare_update(op)) {
// no changes made.
return true;
}

if (need_immediate_propose) {
dout(10) << __func__ << " forced immediate propose" << dendl;
need_immediate_propose = false;
propose_pending();
return true;
}

......
}

先说下 preprocess_query

preprocess_query 函数中处理了很多种类型消息,其中 MSG_OSD_FAILURE 对应的处理函数是 preprocess_failure

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
bool OSDMonitor::preprocess_query(MonOpRequestRef op)
{
op->mark_osdmon_event(__func__);
Message *m = op->get_req();
switch (m->get_type()) {
// READs
.....
// damp updates
case MSG_OSD_MARK_ME_DOWN:
return preprocess_mark_me_down(op);
case MSG_OSD_FULL:
return preprocess_full(op);
case MSG_OSD_FAILURE:
return preprocess_failure(op);
......
default:
ceph_abort();
return true;
}
}

最终 调用 preprocess_failure ,这里多次验证这个消息是否准确的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
bool OSDMonitor::preprocess_failure(MonOpRequestRef op)
{
op->mark_osdmon_event(__func__);
MOSDFailure *m = static_cast<MOSDFailure*>(op->get_req());
// who is target_osd
//获取osd id
int badboy = m->get_target_osd();
// check permissions
if (check_source(op, m->fsid))
goto didit;
dout(5) << "osd test "<< badboy << dendl;
// first, verify the reporting host is valid
//先从 osdmap中验证有无这个osd
if (m->get_orig_source().is_osd()) {
int from = m->get_orig_source().num();
if (!osdmap.exists(from) ||
!osdmap.get_addrs(from).legacy_equals(m->get_orig_source_addrs()) ||
(osdmap.is_down(from) && m->if_osd_failed())) {
dout(5) << "preprocess_failure from dead osd." << from
<< ", ignoring" << dendl;
send_incremental(op, m->get_epoch()+1);
goto didit;
}
}
// weird?
//看osd状态
if (osdmap.is_down(badboy)) {
dout(5) << "preprocess_failure dne(/dup?): osd." << m->get_target_osd()
<< " " << m->get_target_addrs()
<< ", from " << m->get_orig_source() << dendl;
if (m->get_epoch() < osdmap.get_epoch())
send_incremental(op, m->get_epoch()+1);
goto didit;
}
//核查上报错误osd 的地址
if (osdmap.get_addrs(badboy) != m->get_target_addrs()) {
dout(5) << "preprocess_failure wrong osd: report osd." << m->get_target_osd()
<< " " << m->get_target_addrs()
<< " != map's " << osdmap.get_addrs(badboy)
<< ", from " << m->get_orig_source() << dendl;
if (m->get_epoch() < osdmap.get_epoch())
send_incremental(op, m->get_epoch()+1);
goto didit;
}

// already reported?
//再次验证 验证epoch
if (osdmap.is_down(badboy) ||
osdmap.get_up_from(badboy) > m->get_epoch()) {
dout(5) << "preprocess_failure dup/old: osd." << m->get_target_osd()
<< " " << m->get_target_addrs()
<< ", from " << m->get_orig_source() << dendl;
if (m->get_epoch() < osdmap.get_epoch())
send_incremental(op, m->get_epoch()+1);
goto didit;
}

if (!can_mark_down(badboy)) {
dout(5) << "preprocess_failure ignoring report of osd."
<< m->get_target_osd() << " " << m->get_target_addrs()
<< " from " << m->get_orig_source() << dendl;
goto didit;
}
return false;
didit:
mon->no_reply(op);
return true;
}

验证之后执行 prepare_update,和preprocess_query 一样也是通过消息类型来执行对应的处理函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
bool OSDMonitor::prepare_update(MonOpRequestRef op)
{
op->mark_osdmon_event(__func__);
Message *m = op->get_req();
dout(7) << "prepare_update " << *m << " from " << m->get_orig_source_inst() << dendl;

switch (m->get_type()) {
// damp updates
.....
case MSG_OSD_FAILURE:
return prepare_failure(op);
.....
}

return false;
}

prepare_failure 函数将报告添加到 failure_info中,最后再 check_failure 执行标记osd down的错误

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
bool OSDMonitor::prepare_failure(MonOpRequestRef op)
{
op->mark_osdmon_event(__func__);
//转换消息类型
MOSDFailure *m = static_cast<MOSDFailure*>(op->get_req());
dout(1) << "prepare_failure osd." << m->get_target_osd()
<< " " << m->get_target_addrs()
<< " from " << m->get_orig_source()
<< " is reporting failure:" << m->if_osd_failed() << dendl;
//获取上报的osd
int target_osd = m->get_target_osd();
//
int reporter = m->get_orig_source().num();
ceph_assert(osdmap.is_up(target_osd));
ceph_assert(osdmap.get_addrs(target_osd) == m->get_target_addrs());

mon->no_reply(op);

if (m->if_osd_failed()) {
// calculate failure time
utime_t now = ceph_clock_now();
utime_t failed_since =
m->get_recv_stamp() - utime_t(m->failed_for, 0);

// add a report
if (m->is_immediate()) {
mon->clog->debug() << "osd." << m->get_target_osd()
<< " reported immediately failed by "
<< m->get_orig_source();
force_failure(target_osd, reporter);
return true;
}
mon->clog->debug() << "osd." << m->get_target_osd() << " reported failed by "
<< m->get_orig_source();

failure_info_t& fi = failure_info[target_osd];

// 在这个添加失败的报告
MonOpRequestRef old_op = fi.add_report(reporter, failed_since, op);
if (old_op) {
mon->no_reply(old_op);
}

return check_failure(now, target_osd, fi);
} else {
// remove the report
mon->clog->debug() << "osd." << m->get_target_osd()
<< " failure report canceled by "
<< m->get_orig_source();
if (failure_info.count(target_osd)) {
failure_info_t& fi = failure_info[target_osd];
MonOpRequestRef report_op = fi.cancel_report(reporter);
if (report_op) {
mon->no_reply(report_op);
}
if (fi.reporters.empty()) {
dout(10) << " removing last failure_info for osd." << target_osd
<< dendl;
failure_info.erase(target_osd);
} else {
dout(10) << " failure_info for osd." << target_osd << " now "
<< fi.reporters.size() << " reporters" << dendl;
}
} else {
dout(10) << " no failure_info for osd." << target_osd << dendl;
}
}

return false;
}

check_failure 函数并没有直接标记osd 为down,而是通过各种条件来调节 osd 失败的时间,然后再和 实际的osd_heartbeat_grace 做比较,以及上报数量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
bool OSDMonitor::check_failure(utime_t now, int target_osd, failure_info_t& fi)
{
// already pending failure?
if (pending_inc.new_state.count(target_osd) &&
pending_inc.new_state[target_osd] & CEPH_OSD_UP) {
dout(10) << " already pending failure" << dendl;
return true;
}

set<string> reporters_by_subtree;
//故障域
auto reporter_subtree_level = g_conf().get_val<string>("mon_osd_reporter_subtree_level");
//
utime_t orig_grace(g_conf()->osd_heartbeat_grace, 0);
utime_t max_failed_since = fi.get_failed_since();
utime_t failed_for = now - max_failed_since;

utime_t grace = orig_grace;
double my_grace = 0, peer_grace = 0;
double decay_k = 0;

//根据以往 osd的错误报告来 调整下 osd_heartbeat_grace 时间
if (g_conf()->mon_osd_adjust_heartbeat_grace) {
// decay_k 为 -0.000192541 ???
//::log(.5) -0.693147
double halflife = (double)g_conf()->mon_osd_laggy_halflife;
decay_k = ::log(.5) / halflife;

// scale grace period based on historical probability of 'lagginess'
//根据历史上的 "滞后性 "概率确定宽限期的规模
// (false positive failures due to slowness).

const osd_xinfo_t& xi = osdmap.get_xinfo(target_osd);
//计算 log ?
double decay = exp((double)failed_for * decay_k);
dout(1) << " halflife " << halflife << " decay_k " << decay_k
<< " failed_for " << failed_for << " decay " << decay << dendl;

my_grace = decay * (double)xi.laggy_interval * xi.laggy_probability;
grace += my_grace;
}

// consider the peers reporting a failure a proxy for a potential
// 'subcluster' over the overall cluster that is similarly
// laggy. this is clearly not true in all cases, but will sometimes
// help us localize the grace correction to a subset of the system
// (say, a rack with a bad switch) that is unhappy.
ceph_assert(fi.reporters.size());
//遍历上报错误
for (auto p = fi.reporters.begin(); p != fi.reporters.end();) {
// get the parent bucket whose type matches with "reporter_subtree_level".
// fall back to OSD if the level doesn't exist.
if (osdmap.exists(p->first)) {
//通过osdmap 获取完整的osd位置
auto reporter_loc = osdmap.crush->get_full_location(p->first);
//不在 指定的故障域
if (auto iter = reporter_loc.find(reporter_subtree_level);
iter == reporter_loc.end()) {
reporters_by_subtree.insert("osd." + to_string(p->first));

dout(1) <<"reporters_by_subtree, reporter_subtree_level:" <<reporter_subtree_level
<< " vname:" << "osd." + to_string(p->first) << dendl;

} else {
string name = get_phy_name(iter->second);
dout(1) <<"reporters_by_subtree, reporter_subtree_level:" <<reporter_subtree_level
<< " vname:" << iter->second << " pname:" << name << dendl;
reporters_by_subtree.insert(name);
}
if (g_conf()->mon_osd_adjust_heartbeat_grace) {
const osd_xinfo_t& xi = osdmap.get_xinfo(p->first);
//距离上次标记down的时间
utime_t elapsed = now - xi.down_stamp;
double decay = exp((double)elapsed * decay_k);
//被标记为滞后和恢复的平均时间间隔
peer_grace += decay * (double)xi.laggy_interval * xi.laggy_probability;
}
++p;
} else {
fi.cancel_report(p->first);;
p = fi.reporters.erase(p);
}
}

if (g_conf()->mon_osd_adjust_heartbeat_grace) {
peer_grace /= (double)fi.reporters.size();
grace += peer_grace;
}

dout(10) << " osd." << target_osd << " has "
<< fi.reporters.size() << " reporters, "
<< grace << " grace (" << orig_grace << " + " << my_grace
<< " + " << peer_grace << "), max_failed_since " << max_failed_since
<< dendl;

// 以上都是在调整 grace的时间

// 实际 osd报错的时间 和 调整过的grace 时间做比较
// 以及报错的上报数量
if (failed_for >= grace &&
reporters_by_subtree.size() >= g_conf().get_val<uint64_t>("mon_osd_min_down_reporters")) {
dout(1) << " we have enough reporters to mark osd." << target_osd
<< " down" << dendl;
// 将目标状态标记为 CEPH_OSD_UP,看注释是说 XORed onto previous state,异或?
pending_inc.new_state[target_osd] = CEPH_OSD_UP;

mon->clog->info() << "osd." << target_osd << " failed ("
<< osdmap.crush->get_full_location_ordered_string(
target_osd)
<< ") ("
<< (int)reporters_by_subtree.size()
<< " reporters from different "
<< reporter_subtree_level << " after "
<< failed_for << " >= grace " << grace << ")";
return true;
}
return false;
}

让后执行
bool PaxosService::dispatch(MonOpRequestRef op)

image-20220926140755087

image-20220926140802665

image-20220926140809506

OSDMonitor::preprocess_query(MonOpRequestRef op)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
bool OSDMonitor::preprocess_query(MonOpRequestRef op)
{
op->mark_osdmon_event(__func__);
//取得 message
Message *m = op->get_req();
dout(10) << "preprocess_query " << *m << " from " << m->get_orig_source_inst() << dendl;
//判断 message类性
switch (m->get_type()) {
.....
//osd上报错误的处理函数
case MSG_OSD_FAILURE:
return preprocess_failure(op);
.......
default:
ceph_abort();
return true;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
bool OSDMonitor::preprocess_failure(MonOpRequestRef op)
{
op->mark_osdmon_event(__func__);
MOSDFailure *m = static_cast<MOSDFailure*>(op->get_req());
// who is target_osd
int badboy = m->get_target_osd();

// check permissions
if (check_source(op, m->fsid))
goto didit;

// first, verify the reporting host is valid
if (m->get_orig_source().is_osd()) {
int from = m->get_orig_source().num();
if (!osdmap.exists(from) ||
!osdmap.get_addrs(from).legacy_equals(m->get_orig_source_addrs()) ||
(osdmap.is_down(from) && m->if_osd_failed())) {
dout(5) << "preprocess_failure from dead osd." << from
<< ", ignoring" << dendl;
send_incremental(op, m->get_epoch()+1);
goto didit;
}
}


// weird?
if (osdmap.is_down(badboy)) {
dout(5) << "preprocess_failure dne(/dup?): osd." << m->get_target_osd()
<< " " << m->get_target_addrs()
<< ", from " << m->get_orig_source() << dendl;
if (m->get_epoch() < osdmap.get_epoch())
send_incremental(op, m->get_epoch()+1);
goto didit;
}
if (osdmap.get_addrs(badboy) != m->get_target_addrs()) {
dout(5) << "preprocess_failure wrong osd: report osd." << m->get_target_osd()
<< " " << m->get_target_addrs()
<< " != map's " << osdmap.get_addrs(badboy)
<< ", from " << m->get_orig_source() << dendl;
if (m->get_epoch() < osdmap.get_epoch())
send_incremental(op, m->get_epoch()+1);
goto didit;
}

// already reported?
if (osdmap.is_down(badboy) ||
osdmap.get_up_from(badboy) > m->get_epoch()) {
dout(5) << "preprocess_failure dup/old: osd." << m->get_target_osd()
<< " " << m->get_target_addrs()
<< ", from " << m->get_orig_source() << dendl;
if (m->get_epoch() < osdmap.get_epoch())
send_incremental(op, m->get_epoch()+1);
goto didit;
}

if (!can_mark_down(badboy)) {
dout(5) << "preprocess_failure ignoring report of osd."
<< m->get_target_osd() << " " << m->get_target_addrs()
<< " from " << m->get_orig_source() << dendl;
goto didit;
}

dout(10) << "preprocess_failure new: osd." << m->get_target_osd()
<< " " << m->get_target_addrs()
<< ", from " << m->get_orig_source() << dendl;
return false;

didit:
mon->no_reply(op);
return true;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
bool OSDMonitor::check_failure(utime_t now, int target_osd, failure_info_t& fi)
{
// already pending failure?
if (pending_inc.new_state.count(target_osd) &&
pending_inc.new_state[target_osd] & CEPH_OSD_UP) {
dout(10) << " already pending failure" << dendl;
return true;
}

set<string> reporters_by_subtree;
//故障域
auto reporter_subtree_level = g_conf().get_val<string>("mon_osd_reporter_subtree_level");
//
utime_t orig_grace(g_conf()->osd_heartbeat_grace, 0);
utime_t max_failed_since = fi.get_failed_since();
utime_t failed_for = now - max_failed_since;

utime_t grace = orig_grace;
double my_grace = 0, peer_grace = 0;
double decay_k = 0;

//根据以往 osd的错误报告来 调整下 osd_heartbeat_grace 时间
if (g_conf()->mon_osd_adjust_heartbeat_grace) {
// decay_k 为 -0.000192541 ???
//::log(.5) -0.693147
double halflife = (double)g_conf()->mon_osd_laggy_halflife;
decay_k = ::log(.5) / halflife;

// scale grace period based on historical probability of 'lagginess'
//根据历史上的 "滞后性 "概率确定宽限期的规模
// (false positive failures due to slowness).

const osd_xinfo_t& xi = osdmap.get_xinfo(target_osd);
//计算 log ?
double decay = exp((double)failed_for * decay_k);
dout(1) << " halflife " << halflife << " decay_k " << decay_k
<< " failed_for " << failed_for << " decay " << decay << dendl;

my_grace = decay * (double)xi.laggy_interval * xi.laggy_probability;
grace += my_grace;
}
//遍历上报错误
for (auto p = fi.reporters.begin(); p != fi.reporters.end();) {
// get the parent bucket whose type matches with "reporter_subtree_level".
// fall back to OSD if the level doesn't exist.
if (osdmap.exists(p->first)) {
//通过osdmap 获取完整的osd位置
auto reporter_loc = osdmap.crush->get_full_location(p->first);
//不在 指定的故障域
if (auto iter = reporter_loc.find(reporter_subtree_level);
iter == reporter_loc.end()) {
reporters_by_subtree.insert("osd." + to_string(p->first));
} else {
string name = get_phy_name(iter->second);
reporters_by_subtree.insert(name);
}
if (g_conf()->mon_osd_adjust_heartbeat_grace) {
const osd_xinfo_t& xi = osdmap.get_xinfo(p->first);
//距离上次标记down的时间
utime_t elapsed = now - xi.down_stamp;
double decay = exp((double)elapsed * decay_k);
//被标记为滞后和恢复的平均时间间隔
peer_grace += decay * (double)xi.laggy_interval * xi.laggy_probability;
}
++p;
} else {
fi.cancel_report(p->first);;
p = fi.reporters.erase(p);
}
}

if (g_conf()->mon_osd_adjust_heartbeat_grace) {
peer_grace /= (double)fi.reporters.size();
grace += peer_grace;
}
// 以上都是在调整 grace的时间

// 实际 osd报错的时间 和 调整过的grace 时间做比较
// 以及报错的上报数量
if (failed_for >= grace &&
reporters_by_subtree.size() >= g_conf().get_val<uint64_t>("mon_osd_min_down_reporters")) {

// 将目标状态标记为 CEPH_OSD_UP,看注释是说 XORed onto previous state,异或?
pending_inc.new_state[target_osd] = CEPH_OSD_UP;
return true;
}
return false;
}

一类是同个PG下的OSD节点之间

另一类是OSD的左右两个相邻的节点,

可以用于节点间检测对方是否故障,以便及时发现故障节点后进入相应的故障处理流程。

每个OSD节点(守护进程)会监听public、cluster、front和back四个端口

  • public端口:监听来自Monitor和Client的连接。
  • cluster端口:监听来自OSD Peer的连接(同一个集群内的osd)。
  • front端口:供客户端连接集群使用的网卡, 这里临时给集群内部之间进行心跳。
  • back端口:供客集群内部使用的网卡。集群内部之间进行心跳。

每个 Ceph OSD 守护进程每 6 秒检查其他 Ceph OSD 守护进程的心跳。要更改心跳间隔,请在 Ceph 配置文件的 [osd] 部分下添加 osd heartbeat interval 设置,或者在运行时更改其值。

如果邻居 Ceph OSD 守护进程未在 20 秒宽限期内发送 heartbeat 数据包,Ceph OSD 守护进程可能会考虑相邻的 Ceph OSD 守护进程 停机。它将报告回 Ceph 监控器,后者将更新 Ceph 群集映射。若要更改此宽限期,可在 Ceph 配置文件的 [osd] 部分下添加 osd heartbeat 宽限期,或者在运行时设置其值。

image-20220914114020984

osd 之间心跳机制:osd之间会互相通信,通信间隔时间在随机 6s以内;如果在20s后没有收到osd没有收到相邻的osd心跳,会认为相邻的osd已经 dowm,并报告给 mon;

image-20220912181919117

osd和mon之间的心跳机制:

  • 向mon 上报 osd down信息,mon判断机制是收到两个以上某个osd dowm的信息

image-20220912182244414

  • 上报 osd 状态,变更信息,不管osd 有没有变更信息,每个120s都要上报一次

image-20220912182611844

  • 当 获取不到 邻居osd信息时,上报给mon获取新的 osdmap

image-20220912182528403

image-20220913174728093

image-20220907160040430

image-20220907165035868

\