Administrator
发布于 2022-08-20 / 11 阅读
0
0

ceph admin socket 源码分析

ceph admin socket 源码分析

test

预备知识

Unix domain socket 和 网络 socket的区别 Unix domain socket 和 网络 socket的区别

对于网络socket,大家应该比较熟悉,两个不同主机上的进程通信,就可以网络 socket来通信;对于同一台主机上的两个进程通信也可以使用socket,地址使用 127.0.0.1就可以实现,但是对于同一主机的两个进程通信而言,其数据还是需要通过网络协议栈(数据需要打包又要拆包…),这样效率并不高,后来引用 Unix domain socket(网络socket 的框架上发展出一种 IPC 机制) , 专门用于实现同一主机上的进程间通信 ,Unix domain socket 不需要经过网络协议栈,可以直接将数据从一个进程拷贝到另一个进程;

网络socket 使用ip地址加端口号确定 socket 地址,而 Unix domain socket 的地址是一个 socket 类型的文件路径, 这也是两者最大的区别! 其余使用基本相同。

admin socket 是什么?

Ceph 集群 中有很多守护进程,如每个osd都有一个守护进程,如果我们想获取进程运行时的配置参数我们可以使用 admin socket 查看,还可以获取进程运行的状态,以及集群修改配置,获取log等。

admin socket 获取进程信息的方式通过 Unix domain socket ,即我们上文提到的 IPC机制( inter-process communication 进程间通信),admin socket 初始化时候会生成一个 socket 类型的文件(调用 bind函数的时候),其文件路径 固定在了 /var/run/ceph/ 中,(这个文件可以理解为通信地址);每次使用 adminsocket 获取进程状态的时候,都需要附带这个文件路径(下面使用会提到)。

!image-20220911175540114admin socket 实现流程图

admin socket 怎么用?

简单使用

ceph daemon \[socket 类型的文件路径\] \[command\]


#通过 ceph daemon 可以查看到相应的守护进程的信息
#ceph中每个 守护进程都对应一个 asok 文件

[root@node29 ~]# ls /var/run/ceph/
ceph-client.radosgw.gateway.28290.93853901817120.asok  ceph-mon.mon-node29.asok  ceph-osd.1.asok
ceph-mgr.node29.asok                                   ceph-osd.0.asok           ceph-osd.2.asok

# asok类型的文件就是 进程启动时生成的

[root@node29 run]# ceph daemon /var/run/ceph/ceph-osd.0.asok config
no valid command found; 7 closest matches:
config set <var> <val> [<val>...]
config help {<var>}
config diff
config get <var>
config diff get <var>
config show
config unset <var>

#获取全部配置内容
[root@node29 run]# ceph daemon /var/run/ceph/ceph-osd.0.asok config show | head -10
{
    "name": "osd.0",
    "cluster": "ceph",
    "admin_socket": "/var/run/ceph/ceph-osd.0.asok",
    "admin_socket_mode": "",
    "auth_client_required": "cephx",
    "auth_cluster_required": "cephx",
   ......
}

#获取指定配置内容
[root@node29 run]#  ceph daemon /var/run/ceph/ceph-osd.0.asok config get admin_socket
{
    "admin_socket": "/var/run/ceph/ceph-osd.0.asok"
}

[root@node29 run]#

#修改 指定配置内容
[root@node29 ceph]# ceph daemon /var/run/ceph/ceph-osd.0.asok config set "mon_osd_initial_require_min_compat_client" "jewel"
{
    "success": ""
}
......
[root@node29 ~]# ceph daemon /var/run/ceph/ceph-osd.1.asok perf dump | head -n15
{
    "AsyncMessenger::Worker-0": {
        "msgr_recv_messages": 2665895,
        "msgr_send_messages": 2665927,
        "msgr_recv_bytes": 853128095,
        "msgr_send_bytes": 4958397491,
        "msgr_created_connections": 13,
        "msgr_active_connections": 2,
        "msgr_running_total_time": 600.146200311,
        "msgr_running_send_time": 223.453728410,
        "msgr_running_recv_time": 152.217070353,
        "msgr_running_fast_dispatch_time": 77.218817120
    },

注意事项

  • ceph daemon 后面跟的文件路径一定要为绝对路径
  • 使用 admin sokcet修改参数,不一定马上生效,还需要结合实际情况分析

admin socket启动整体流程

admin socket 大致的启动流程

!image-20220830212314969

admin socket 执行命令流程

!image-20220830212347528

AdminSocket 类 AdminSocket

在代码中 admin socket和 AdminSocket 类有关系,在一开始初始化 CephContext (ceph的上下文)时候,就已经 new 出 AdminSocket类的实例。


CephContext::CephContext(uint32_t module_type_,
                         enum code_environment_t code_env,
                         int init_flags_){

//  ..............
  _admin_socket = new AdminSocket(this);
// ...............
  _admin_hook = new CephContextHook(this);
// 并添加了几个命令
  _admin_socket->register_command("assert", "assert", _admin_hook, "");
  _admin_socket->register_command("abort", "abort", _admin_hook, "");
  _admin_socket->register_command("perfcounters_dump", "perfcounters_dump", _admin_hook, "");
  _admin_socket->register_command("config show", "config show", _admin_hook, "dump current config settings");

//  ..............
}

register\_command 的参数可以推断出 是添加命令并注册命令对应的回调函数,这里结合代码概述下


//先看参数类型
//string_view 是和有着string一样功能,但没有涉及内存的额外分配(c++17特性)
// AdminSocketHook 也是一个类, call 是纯虚函数,说明肯定有某个类继承并重写 call
// hook后续会单独分析

class AdminSocketHook {
public:
  virtual bool call(std::string_view command, const cmdmap_t& cmdmap,
		    std::string_view format, bufferlist& out) = 0;
  virtual ~AdminSocketHook() {}
};

int AdminSocket::register_command(std::string_view command,
				  std::string_view cmddesc,
				  AdminSocketHook *hook,
				  std::string_view help)
{
  int ret;
  std::unique_lock l(lock);
// hooks 是AdminSocket 类里的一个 map,用来存命令的
// std::map<std::string, hook_info, std::less<>> hooks;
// hook_info 是一个包含函数的结构体
// 首先在 map中查有无 相对应的命令
  auto i = hooks.find(command);
  if (i != hooks.cend()) {
    ldout(m_cct, 5) << "register_command " << command << " hook " << hook
		    << " EEXIST" << dendl;
    ret = -EEXIST;
  } else {
    ldout(m_cct, 5) << "register_command " << command << " hook " << hook
		    << dendl;
    //  加入 将 命令和对应的AdminSocketHook加入 是AdminSocket中
    hooks.emplace_hint(i,
		       std::piecewise_construct,
		       std::forward_as_tuple(command),
		       std::forward_as_tuple(hook, cmddesc, help));
    ret = 0;
  }
  return ret;
}

admin socket 启动流程

一开始就提到过 用户是通过socket来获取ceph中配置信息的,那肯定是有初始化流程,结合代码看看其启动流程

有一个函数在 ceph\_osd 模块中出现了很多次 common\_init\_finish (出现了七次),

!image-20220829233025193

但其实跳转进去看,最多会执行一次,因为里面有个标志位;这个函数 主要是启动日志服务线程和 admin socket的服务线程,而多次调用是我猜是确保服务线程启动;

!image-20220829233801487

最终在 里看到 AdminSocket 的init函数

!image-20220830002333423

AdminSocket::init 函数主要是启动 AdminSocket 线程,既然是启了线程,这个线程应该是 osd 的守护进程启动的,在系统看 果然是有 admin\_socket;

!image-20220830003109748

AdminSocket::init


//代码有点多,只留关键的代码
bool AdminSocket::init(const std::string& path)
{
  hrp("admin socket 初始化")
  /* Set up things for the new thread */
  std::string err;
  int pipe_rd = -1, pipe_wr = -1;
  err = create_shutdown_pipe(&pipe_rd, &pipe_wr);

  /* Create socket */
  int sock_fd;
  err = bind_and_listen(path, &sock_fd);

  /* Create new thread */
  m_sock_fd = sock_fd;
  m_shutdown_rd_fd = pipe_rd;
  m_shutdown_wr_fd = pipe_wr;
  m_path = path;
  hrp(m_path);
  version_hook = std::make_unique<VersionHook>();
  register_command("0", "0", version_hook.get(), "");
  .....

  th = make_named_thread("admin_socket", &AdminSocket::entry, this);
  add_cleanup_file(m_path.c_str());
  return true;
}

从 AdminSocket::init 可以看的,主要有 三个部分组成,创建 pipe、socket,以及启动 线程

AdminSocket::create\_shutdown\_pipe

创建一个管道,在注释中可以了解到,线程会监听 读管道,当这有有数据的时候,会 优雅 的将线程 kill 掉,其实从名字也会可以推测出 创建 shutdown 管道


std::string AdminSocket::create_shutdown_pipe(int *pipe_rd, int *pipe_wr)
{
  int pipefd[2];
  if (pipe_cloexec(pipefd) < 0) {
    int e = errno;
    ostringstream oss;
    oss << "AdminSocket::create_shutdown_pipe error: " << cpp_strerror(e);
    return oss.str();
  }

  *pipe_rd = pipefd[0];
  *pipe_wr = pipefd[1];
  return "";
}

bind_and_listen(path, &sock_fd); ;") bind\_and\_listen(path, &sock\_fd);

函数主要工作是生成 socket,但这里的socket是单机版的socket( UNI Domian Socket ),和以前接触网络socket 不相同,不同之处如下所示(这也说明为什么 ceph daemon 后面要加个 asok文件!)

!image-20220829175922827

从 bind\_and\_listen 也可以看出确实是 用sockaddr\_un 结构体,其实网络socket也是可以实现的(地址回环),那为什么用 UNI Domian Socket 呢?

当我们使用 admin socket 读取或修改ceph 配置时候,都是需要在对应节点主机上操作,假如说ceph 集群中有 A、B、C三个几点,各自部署了 osd,A节点不能直接修改 B节点的osd配置(远程过去也行…),因为 那个 asok文件并不在A节点上,现在这些操作都是在同一台主机上,相当于 IPC(进程间通信),UNIX Domain Socket是不需要 经过网络协议栈,只需要将 应用层数据从一个进程拷贝到另一个进程, 这样效率比较高

!image-20220830111830154

接下来步骤就是 bind 和 listen 了,和 网络socket建立没多大区别….

一个 socket 只能bind一次, 在 UNI Domian Socket 调用bind的时候,会根据 address 给出的路径生成 socket文件

start thread start thread

启动线程用来监听socket


 /* Create new thread */
// 将上生成的 fd 保存到 类成员中
  m_sock_fd = sock_fd;
  m_shutdown_rd_fd = pipe_rd;
  m_shutdown_wr_fd = pipe_wr;
//这里path 是  asok文件路径
  m_path = path;
  hrp(m_path);
//VersionHook 继承了 AdminSocketHook 并实现 call接口
  version_hook = std::make_unique<VersionHook>();
  register_command("0", "0", version_hook.get(), "");
  register_command("version", "version", version_hook.get(), "get ceph version");
  register_command("git_version", "git_version", version_hook.get(),
		   "get git sha1");

//help_hook 也继承了 AdminSocketHook 并实现 call接口
  help_hook = std::make_unique<HelpHook>(this);
  register_command("help", "help", help_hook.get(),
		   "list available commands");
//同上, 这里可以出 针对不同的命令,用不同的类进行call 重写
  getdescs_hook = std::make_unique<GetdescsHook>(this);
  register_command("get_command_descriptions", "get_command_descriptions",
		   getdescs_hook.get(), "list available commands");

// 最后启动线程
// 函数设置了 线程名字,和线程启动时调用的函数 &AdminSocket::entry
  th = make_named_thread("admin_socket", &AdminSocket::entry, this);
  add_cleanup_file(m_path.c_str());

AdminSocket::entry

admin\_socket 线程运行的函数,和常规写法一样用一个 while循环,里面用 poll 来检测两个 m\_sock\_fd,和 m\_shutdown\_rd\_fd(就是最初创建 pipe那个 )的事件响应。


void AdminSocket::entry() noexcept
{
.....
  while (true) {
    struct pollfd fds[2];
    fds[0].fd = m_sock_fd;
    fds[0].events = POLLIN | POLLRDBAND;
    fds[1].fd = m_shutdown_rd_fd;
    fds[1].events = POLLIN | POLLRDBAND;

    int ret = poll(fds, 2, -1);
    if (ret < 0) {
      int err = errno;
      if (err == EINTR) {
	continue;
      }
      lderr(m_cct) << "AdminSocket: poll(2) error: '"
		   << cpp_strerror(err) << dendl;
      return;
    }

    if (fds[0].revents & POLLIN) {
      // Send out some data
      do_accept();
    }
    if (fds[1].revents & POLLIN) {
      // Parent wants us to shut down
      return;
    }
  }
  ldout(m_cct, 5) << "entry exit" << dendl;
}

AdminSocket::do_accept() ") AdminSocket::do\_accept()

当 m\_sock\_fd 有时间响应时候,会执行 do\_accept(); 看函数名应该是 对收到的数据做些处理

accept 返回一个 fd,这里是用while 逐个字符读取 客户端传来的命令

!image-20220830170649879

while里面做了些处理,结合日志,可以看到最终读出来的格式是字符串是 json 格式的

!image-20220830170949647

最后字符传到 execute\_command 函数中

!image-20220830174834047

现在 知道传进来的字符串 c 是一条 json字符串, 在 execute\_command 函数中,会将 json解析到map中(目的是 提取value)

!image-20220830175309370

接下来是从 map中获取我们需要的 命令, 假如说 c是 {“prefix”,”config show”} ,那么 match为 config show

!image-20220830175420009

在文章的 整体流程部分提到了 ,cct初始化时,会注册一些函数到 hooks 中 ,现在就通过 传进来的参数在 hooks中查找对应的回调,hooks也是一个map


std::unique_lock l(lock);
// 创建一个 hooks类型的 变量
  decltype(hooks)::iterator p;
// match 为参数命令
  while (match.size()) {
    p = hooks.find(match);
    if (p != hooks.cend())
      break;

    // drop right-most word
    size_t pos = match.rfind(' ');
    if (pos == std::string::npos) {
      match.clear();  // we fail
      break;
    } else {
      match.resize(pos);
    }
  }

  if (p == hooks.cend()) {
    lderr(m_cct) << "AdminSocket: request '" << cmd << "' not defined" << dendl;
    return false;
  }
  string args;
  if (match != cmd) {
    args = cmd.substr(match.length() + 1);
  }

  // Drop lock to avoid cycles in cases where the hook takes
  // the same lock that was held during calls to register/unregister,
  // and set in_hook to allow unregister to wait for us before
  // removing this hook.
  in_hook = true;
// p的key是 string, value是 struct hook_info,这个结构体包含了 hook
  auto match_hook = p->second.hook;
  l.unlock();
// match_hook 是 AdminSocketHook类 可以执行 call,也就是一开始注册的函数
  bool success = (validate(match, cmdmap, out) &&
      match_hook->call(match, cmdmap, format, out));
  l.lock();
  in_hook = false;
  in_hook_cond.notify_all();

执行完 注册的函数后,其输出的内容保存在 bufferlist类型的 out 中,最后写入 到新建连接的 connection\_fd中

!image-20220830200816019

客户端 如何发起连接和读数据的

当我们输入 ceph daemon 的时候,首先会先解析命令,ceph daemon 命令首先 会在 ceph.in 中的函数 maybe\_daemon\_command 被解析:

!image-20220912085729817

如果命令解析正确,最后会执行 admin\_socket 函数

!image-20220912090124891

admin\_socket 函数 在 ceph\_daemon.py 文件里

!image-20220912090349117

在 admin\_socket 函数中有个关键的函数 do\_sockio 用于发起连接,并发送命令:


def do_sockio(path, cmd_bytes):
    """ helper: do all the actual low-level stream I/O """
    #发起连接,connect的参数是 path,也就是 socket文件路径
    sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
    sock.connect(path)
    try:
        # 发送数据,即解析好的参数
        sock.sendall(cmd_bytes + b'\0')
        len_str = sock.recv(4)
        if len(len_str) < 4:
            raise RuntimeError("no data returned from admin socket")
        l, = struct.unpack(">I", len_str)
        sock_ret = b''
        # 读取数据
        got = 0
        while got < l:
            # recv() receives signed int, i.e max 2GB
            # workaround by capping READ_CHUNK_SIZE per call.
            want = min(l - got, READ_CHUNK_SIZE)
            bit = sock.recv(want)
            sock_ret += bit
            got += len(bit)

    except Exception as sock_e:
        raise RuntimeError('exception: ' + str(sock_e))
    return sock_ret


#命令 执行流程
#先获取命令集合
    try:
        cmd_json = do_sockio(asok_path,
                             b'{"prefix": "get_command_descriptions"}')

    except Exception as e:
        raise RuntimeError('exception getting command descriptions: ' + str(e))

# cmd和get_command_descriptions一样说明只是获取帮助
    if cmd == 'get_command_descriptions':
        return cmd_json

# 将命令集合 解析成字典
    sigdict = parse_json_funcsigs(cmd_json.decode('utf-8'), 'cli')
#查找对应的命令
    valid_dict = validate_command(sigdict, cmd)
    if not valid_dict:
        raise RuntimeError('invalid command')

    if format:
        valid_dict['format'] = format
# 在字典中查找到相应的命令,则调用 do_sockio
    try:
        ret = do_sockio(asok_path, json.dumps(valid_dict).encode('utf-8'))
    except Exception as e:
        raise RuntimeError('exception: ' + str(e))
    return ret

返回的结果最后写到终端上

!image-20220912094553017![](http://img.rui.vin/202209120946937.png)

执行对应的函数

以上,一个命令执行的整体流程就结束了,结合一个命令分析并做一个总结

比如说 config命令,注册时候 command 为 config show,命令描述也是,回调类是 \\CephContextHook \\_admin\_hook\\*,最后一个参数信息是帮助信息

!image-20220830202731573

和 类重写父类的 call,最终调用的 do\_comand 函数

!image-20220830203152555

do\_comand 里面一大推 if-else 做字符串匹配

!image-20220830203445568

_show_config \_show\_config

最终调用了 \_show\_config 函数

!image-20220830204814705

从上图可以看出,有些 参数是保存到Formatter \*f中,Formatter可以理解为 是一种格式化流;最后执行完回到 do\_commad 函数,会将 格式化好的数据刷新到到 out 中!image-20220830205231570

这里做个测试 : 我在 \_show\_config 中添加些内容,看下打印的结果怎样

!image-20220830205633149

自定义一个hook

尝试自己注册一个命令,这里回调函数还是 和show config 相同,添加了if 语句

!image-20220830211514358


评论