ceph admin socket 源码分析

Last updated on 3 months ago

test

预备知识

Unix domain socket 和 网络 socket的区别

​ 对于网络socket,大家应该比较熟悉,两个不同主机上的进程通信,就可以网络 socket来通信;对于同一台主机上的两个进程通信也可以使用socket,地址使用 127.0.0.1就可以实现,但是对于同一主机的两个进程通信而言,其数据还是需要通过网络协议栈(数据需要打包又要拆包…),这样效率并不高,后来引用 Unix domain socket(网络socket 的框架上发展出一种 IPC 机制) ,专门用于实现同一主机上的进程间通信 ,Unix domain socket 不需要经过网络协议栈,可以直接将数据从一个进程拷贝到另一个进程;

​ 网络socket 使用ip地址加端口号确定 socket 地址,而 Unix domain socket 的地址是一个 socket 类型的文件路径, 这也是两者最大的区别! 其余使用基本相同。

admin socket 是什么?

​ Ceph 集群 中有很多守护进程,如每个osd都有一个守护进程,如果我们想获取进程运行时的配置参数我们可以使用 admin socket 查看,还可以获取进程运行的状态,以及集群修改配置,获取log等。

​ admin socket 获取进程信息的方式通过 Unix domain socket ,即我们上文提到的 IPC机制(inter-process communication 进程间通信),admin socket 初始化时候会生成一个 socket 类型的文件(调用 bind函数的时候),其文件路径 固定在了 /var/run/ceph/ 中,(这个文件可以理解为通信地址);每次使用 adminsocket 获取进程状态的时候,都需要附带这个文件路径(下面使用会提到)。

image-20220911175540114 admin socket 实现流程图

admin socket 怎么用?

简单使用

ceph daemon [socket 类型的文件路径] [command]

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
#通过 ceph daemon 可以查看到相应的守护进程的信息 
#ceph中每个 守护进程都对应一个 asok 文件

[root@node29 ~]# ls /var/run/ceph/
ceph-client.radosgw.gateway.28290.93853901817120.asok ceph-mon.mon-node29.asok ceph-osd.1.asok
ceph-mgr.node29.asok ceph-osd.0.asok ceph-osd.2.asok

# asok类型的文件就是 进程启动时生成的

[root@node29 run]# ceph daemon /var/run/ceph/ceph-osd.0.asok config
no valid command found; 7 closest matches:
config set <var> <val> [<val>...]
config help {<var>}
config diff
config get <var>
config diff get <var>
config show
config unset <var>

#获取全部配置内容
[root@node29 run]# ceph daemon /var/run/ceph/ceph-osd.0.asok config show | head -10
{
"name": "osd.0",
"cluster": "ceph",
"admin_socket": "/var/run/ceph/ceph-osd.0.asok",
"admin_socket_mode": "",
"auth_client_required": "cephx",
"auth_cluster_required": "cephx",
......
}

#获取指定配置内容
[root@node29 run]# ceph daemon /var/run/ceph/ceph-osd.0.asok config get admin_socket
{
"admin_socket": "/var/run/ceph/ceph-osd.0.asok"
}

[root@node29 run]#

#修改 指定配置内容
[root@node29 ceph]# ceph daemon /var/run/ceph/ceph-osd.0.asok config set "mon_osd_initial_require_min_compat_client" "jewel"
{
"success": ""
}
......
[root@node29 ~]# ceph daemon /var/run/ceph/ceph-osd.1.asok perf dump | head -n15
{
"AsyncMessenger::Worker-0": {
"msgr_recv_messages": 2665895,
"msgr_send_messages": 2665927,
"msgr_recv_bytes": 853128095,
"msgr_send_bytes": 4958397491,
"msgr_created_connections": 13,
"msgr_active_connections": 2,
"msgr_running_total_time": 600.146200311,
"msgr_running_send_time": 223.453728410,
"msgr_running_recv_time": 152.217070353,
"msgr_running_fast_dispatch_time": 77.218817120
},

注意事项

  • ceph daemon 后面跟的文件路径一定要为绝对路径
  • 使用 admin sokcet修改参数,不一定马上生效,还需要结合实际情况分析

admin socket启动整体流程

admin socket 大致的启动流程

image-20220830212314969

admin socket 执行命令流程

image-20220830212347528

AdminSocket

在代码中 admin socket和 AdminSocket 类有关系,在一开始初始化 CephContext (ceph的上下文)时候,就已经 new 出 AdminSocket类的实例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
CephContext::CephContext(uint32_t module_type_,
enum code_environment_t code_env,
int init_flags_){

// ..............
_admin_socket = new AdminSocket(this);
// ...............
_admin_hook = new CephContextHook(this);
// 并添加了几个命令
_admin_socket->register_command("assert", "assert", _admin_hook, "");
_admin_socket->register_command("abort", "abort", _admin_hook, "");
_admin_socket->register_command("perfcounters_dump", "perfcounters_dump", _admin_hook, "");
_admin_socket->register_command("config show", "config show", _admin_hook, "dump current config settings");

// ..............
}

register_command 的参数可以推断出 是添加命令并注册命令对应的回调函数,这里结合代码概述下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
//先看参数类型
//string_view 是和有着string一样功能,但没有涉及内存的额外分配(c++17特性)
// AdminSocketHook 也是一个类, call 是纯虚函数,说明肯定有某个类继承并重写 call
// hook后续会单独分析

class AdminSocketHook {
public:
virtual bool call(std::string_view command, const cmdmap_t& cmdmap,
std::string_view format, bufferlist& out) = 0;
virtual ~AdminSocketHook() {}
};

int AdminSocket::register_command(std::string_view command,
std::string_view cmddesc,
AdminSocketHook *hook,
std::string_view help)
{
int ret;
std::unique_lock l(lock);
// hooks 是AdminSocket 类里的一个 map,用来存命令的
// std::map<std::string, hook_info, std::less<>> hooks;
// hook_info 是一个包含函数的结构体
// 首先在 map中查有无 相对应的命令
auto i = hooks.find(command);
if (i != hooks.cend()) {
ldout(m_cct, 5) << "register_command " << command << " hook " << hook
<< " EEXIST" << dendl;
ret = -EEXIST;
} else {
ldout(m_cct, 5) << "register_command " << command << " hook " << hook
<< dendl;
// 加入 将 命令和对应的AdminSocketHook加入 是AdminSocket中
hooks.emplace_hint(i,
std::piecewise_construct,
std::forward_as_tuple(command),
std::forward_as_tuple(hook, cmddesc, help));
ret = 0;
}
return ret;
}

admin socket 启动流程

一开始就提到过 用户是通过socket来获取ceph中配置信息的,那肯定是有初始化流程,结合代码看看其启动流程

有一个函数在 ceph_osd 模块中出现了很多次 common_init_finish (出现了七次),
image-20220829233025193

但其实跳转进去看,最多会执行一次,因为里面有个标志位;这个函数 主要是启动日志服务线程和 admin socket的服务线程,而多次调用是我猜是确保服务线程启动;
image-20220829233801487

最终在 里看到 AdminSocket 的init函数

image-20220830002333423

AdminSocket::init 函数主要是启动 AdminSocket 线程,既然是启了线程,这个线程应该是 osd 的守护进程启动的,在系统看 果然是有 admin_socket;
image-20220830003109748

AdminSocket::init

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
//代码有点多,只留关键的代码
bool AdminSocket::init(const std::string& path)
{
hrp("admin socket 初始化")
/* Set up things for the new thread */
std::string err;
int pipe_rd = -1, pipe_wr = -1;
err = create_shutdown_pipe(&pipe_rd, &pipe_wr);

/* Create socket */
int sock_fd;
err = bind_and_listen(path, &sock_fd);


/* Create new thread */
m_sock_fd = sock_fd;
m_shutdown_rd_fd = pipe_rd;
m_shutdown_wr_fd = pipe_wr;
m_path = path;
hrp(m_path);
version_hook = std::make_unique<VersionHook>();
register_command("0", "0", version_hook.get(), "");
.....

th = make_named_thread("admin_socket", &AdminSocket::entry, this);
add_cleanup_file(m_path.c_str());
return true;
}

从 AdminSocket::init 可以看的,主要有 三个部分组成,创建 pipe、socket,以及启动 线程

AdminSocket::create_shutdown_pipe
创建一个管道,在注释中可以了解到,线程会监听 读管道,当这有有数据的时候,会优雅的将线程 kill 掉,其实从名字也会可以推测出 创建 shutdown 管道

1
2
3
4
5
6
7
8
9
10
11
12
13
14
std::string AdminSocket::create_shutdown_pipe(int *pipe_rd, int *pipe_wr)
{
int pipefd[2];
if (pipe_cloexec(pipefd) < 0) {
int e = errno;
ostringstream oss;
oss << "AdminSocket::create_shutdown_pipe error: " << cpp_strerror(e);
return oss.str();
}

*pipe_rd = pipefd[0];
*pipe_wr = pipefd[1];
return "";
}

bind_and_listen(path, &sock_fd);

函数主要工作是生成 socket,但这里的socket是单机版的socket( UNI Domian Socket ),和以前接触网络socket 不相同,不同之处如下所示(这也说明为什么ceph daemon 后面要加个 asok文件!)
image-20220829175922827

从 bind_and_listen 也可以看出确实是 用sockaddr_un 结构体,其实网络socket也是可以实现的(地址回环),那为什么用 UNI Domian Socket 呢?

当我们使用 admin socket 读取或修改ceph 配置时候,都是需要在对应节点主机上操作,假如说ceph 集群中有 A、B、C三个几点,各自部署了 osd,A节点不能直接修改 B节点的osd配置(远程过去也行…),因为 那个 asok文件并不在A节点上,现在这些操作都是在同一台主机上,相当于 IPC(进程间通信),UNIX Domain Socket是不需要 经过网络协议栈,只需要将 应用层数据从一个进程拷贝到另一个进程,这样效率比较高

image-20220830111830154

接下来步骤就是 bind 和 listen 了,和 网络socket建立没多大区别….
一个 socket 只能bind一次, 在 UNI Domian Socket 调用bind的时候,会根据 address 给出的路径生成 socket文件

start thread

启动线程用来监听socket

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
 /* Create new thread */
// 将上生成的 fd 保存到 类成员中
m_sock_fd = sock_fd;
m_shutdown_rd_fd = pipe_rd;
m_shutdown_wr_fd = pipe_wr;
//这里path 是 asok文件路径
m_path = path;
hrp(m_path);
//VersionHook 继承了 AdminSocketHook 并实现 call接口
version_hook = std::make_unique<VersionHook>();
register_command("0", "0", version_hook.get(), "");
register_command("version", "version", version_hook.get(), "get ceph version");
register_command("git_version", "git_version", version_hook.get(),
"get git sha1");

//help_hook 也继承了 AdminSocketHook 并实现 call接口
help_hook = std::make_unique<HelpHook>(this);
register_command("help", "help", help_hook.get(),
"list available commands");
//同上, 这里可以出 针对不同的命令,用不同的类进行call 重写
getdescs_hook = std::make_unique<GetdescsHook>(this);
register_command("get_command_descriptions", "get_command_descriptions",
getdescs_hook.get(), "list available commands");


// 最后启动线程
// 函数设置了 线程名字,和线程启动时调用的函数 &AdminSocket::entry
th = make_named_thread("admin_socket", &AdminSocket::entry, this);
add_cleanup_file(m_path.c_str());


AdminSocket::entry

admin_socket 线程运行的函数,和常规写法一样用一个 while循环,里面用 poll 来检测两个 m_sock_fd,和 m_shutdown_rd_fd(就是最初创建 pipe那个 )的事件响应。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
void AdminSocket::entry() noexcept
{
.....
while (true) {
struct pollfd fds[2];
fds[0].fd = m_sock_fd;
fds[0].events = POLLIN | POLLRDBAND;
fds[1].fd = m_shutdown_rd_fd;
fds[1].events = POLLIN | POLLRDBAND;

int ret = poll(fds, 2, -1);
if (ret < 0) {
int err = errno;
if (err == EINTR) {
continue;
}
lderr(m_cct) << "AdminSocket: poll(2) error: '"
<< cpp_strerror(err) << dendl;
return;
}

if (fds[0].revents & POLLIN) {
// Send out some data
do_accept();
}
if (fds[1].revents & POLLIN) {
// Parent wants us to shut down
return;
}
}
ldout(m_cct, 5) << "entry exit" << dendl;
}

AdminSocket::do_accept()

当 m_sock_fd 有时间响应时候,会执行 do_accept(); 看函数名应该是 对收到的数据做些处理

accept 返回一个 fd,这里是用while 逐个字符读取 客户端传来的命令

image-20220830170649879

while里面做了些处理,结合日志,可以看到最终读出来的格式是字符串是 json 格式的

image-20220830170949647

最后字符传到 execute_command 函数中

image-20220830174834047

现在 知道传进来的字符串 c 是一条 json字符串, 在 execute_command 函数中,会将 json解析到map中(目的是 提取value)
image-20220830175309370

接下来是从 map中获取我们需要的 命令, 假如说 c是 {“prefix”,”config show”} ,那么 match为 config show
image-20220830175420009

在文章的 整体流程部分提到了 ,cct初始化时,会注册一些函数到 hooks 中 ,现在就通过 传进来的参数在 hooks中查找对应的回调,hooks也是一个map

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
std::unique_lock l(lock);
// 创建一个 hooks类型的 变量
decltype(hooks)::iterator p;
// match 为参数命令
while (match.size()) {
p = hooks.find(match);
if (p != hooks.cend())
break;

// drop right-most word
size_t pos = match.rfind(' ');
if (pos == std::string::npos) {
match.clear(); // we fail
break;
} else {
match.resize(pos);
}
}

if (p == hooks.cend()) {
lderr(m_cct) << "AdminSocket: request '" << cmd << "' not defined" << dendl;
return false;
}
string args;
if (match != cmd) {
args = cmd.substr(match.length() + 1);
}

// Drop lock to avoid cycles in cases where the hook takes
// the same lock that was held during calls to register/unregister,
// and set in_hook to allow unregister to wait for us before
// removing this hook.
in_hook = true;
// p的key是 string, value是 struct hook_info,这个结构体包含了 hook
auto match_hook = p->second.hook;
l.unlock();
// match_hook 是 AdminSocketHook类 可以执行 call,也就是一开始注册的函数
bool success = (validate(match, cmdmap, out) &&
match_hook->call(match, cmdmap, format, out));
l.lock();
in_hook = false;
in_hook_cond.notify_all();

执行完 注册的函数后,其输出的内容保存在 bufferlist类型的 out中,最后写入 到新建连接的 connection_fd中
image-20220830200816019

客户端 如何发起连接和读数据的

当我们输入 ceph daemon 的时候,首先会先解析命令,ceph daemon 命令首先 会在ceph.in 中的函数maybe_daemon_command 被解析:

image-20220912085729817

如果命令解析正确,最后会执行 admin_socket 函数
image-20220912090124891

admin_socket 函数 在 ceph_daemon.py 文件里
image-20220912090349117

在 admin_socket 函数中有个关键的函数 do_sockio 用于发起连接,并发送命令:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
 
def do_sockio(path, cmd_bytes):
""" helper: do all the actual low-level stream I/O """
#发起连接,connect的参数是 path,也就是 socket文件路径
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.connect(path)
try:
# 发送数据,即解析好的参数
sock.sendall(cmd_bytes + b'\0')
len_str = sock.recv(4)
if len(len_str) < 4:
raise RuntimeError("no data returned from admin socket")
l, = struct.unpack(">I", len_str)
sock_ret = b''
# 读取数据
got = 0
while got < l:
# recv() receives signed int, i.e max 2GB
# workaround by capping READ_CHUNK_SIZE per call.
want = min(l - got, READ_CHUNK_SIZE)
bit = sock.recv(want)
sock_ret += bit
got += len(bit)

except Exception as sock_e:
raise RuntimeError('exception: ' + str(sock_e))
return sock_ret

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#命令 执行流程
#先获取命令集合
try:
cmd_json = do_sockio(asok_path,
b'{"prefix": "get_command_descriptions"}')

except Exception as e:
raise RuntimeError('exception getting command descriptions: ' + str(e))

# cmd和get_command_descriptions一样说明只是获取帮助
if cmd == 'get_command_descriptions':
return cmd_json

# 将命令集合 解析成字典
sigdict = parse_json_funcsigs(cmd_json.decode('utf-8'), 'cli')
#查找对应的命令
valid_dict = validate_command(sigdict, cmd)
if not valid_dict:
raise RuntimeError('invalid command')

if format:
valid_dict['format'] = format
# 在字典中查找到相应的命令,则调用 do_sockio
try:
ret = do_sockio(asok_path, json.dumps(valid_dict).encode('utf-8'))
except Exception as e:
raise RuntimeError('exception: ' + str(e))
return ret

返回的结果最后写到终端上
image-20220912094553017

执行对应的函数

以上,一个命令执行的整体流程就结束了,结合一个命令分析并做一个总结

比如说 config命令,注册时候 command 为 config show,命令描述也是,回调类是 **CephContextHook *_admin_hook**,最后一个参数信息是帮助信息

image-20220830202731573

和 类重写父类的 call,最终调用的 do_comand 函数

image-20220830203152555

do_comand 里面一大推 if-else 做字符串匹配
image-20220830203445568

_show_config

最终调用了 _show_config函数
image-20220830204814705

从上图可以看出,有些 参数是保存到Formatter *f中,Formatter可以理解为 是一种格式化流;最后执行完回到 do_commad 函数,会将 格式化好的数据刷新到到 out 中image-20220830205231570

这里做个测试 : 我在 _show_config 中添加些内容,看下打印的结果怎样
image-20220830205633149

自定义一个hook

尝试自己注册一个命令,这里回调函数还是 和show config 相同,添加了if 语句

image-20220830211514358