Administrator
发布于 2024-03-13 / 6 阅读
0
0

ceph中的cls调用流程

ceph中的cls调用流程

这次介绍下cls 的调用流程(类似于RPC)

以gc 流程为例; 删除 对象后,会在gc obj 上记录已经删除的信息,以 omap 形式存在,gc list 则可以看到 gc上的omap,这个功能实现就是通过用 cls 实现的, 所以 我们gdb 进去看看,调用流程


gdb -args ./bin/radosgw-admin gc list  --include-all

结合源码,我知道 是在 RGWGC::list 中实现的,所以 断点打在这个函数


(gdb) b RGWGC::list
Breakpoint 1 at 0x1257842: file /home/ceph/src/rgw/rgw_gc.cc, line 96.

(gdb) r
Starting program: /home/ceph/build/bin/radosgw-admin gc list --include-all
(gdb) bt
#0  cls_rgw_gc_list (io_ctx=..., oid="gc.0", marker="", max=1000, expired_only=false,
    entries=empty std::__cxx11::list, truncated=0x7fffffff96d8, next_marker="") at /home/ceph/src/cls/rgw/cls_rgw_client.cc:974

#1  0x00005555567ab936 in RGWGC::list (this=0x5555583cc4c0, index=0x7fffffff9ca0, marker="", max=1000, expired_only=false, result=empty std::__cxx11::list, truncated=0x7fffffff96d8)
    at /home/ceph/src/rgw/rgw_gc.cc:102

#2  0x000055555657cec8 in RGWRados::list_gc_objs (this=0x5555584d8000, index=0x7fffffff9ca0, marker="", max=1000,
    expired_only=false, result=empty std::__cxx11::list, truncated=0x7fffffff96d8)
    at /home/ceph/src/rgw/rgw_rados.cc:11378

#3  0x0000555555f6b13f in main (argc=4, argv=0x7fffffffda58) at /home/ceph/src/rgw/rgw_admin.cc:8048
(gdb)

cls\_rgw\_gc\_list

此时操作可以类比为 客户端 开始构造请求,每个cls 都有自己的数据结构, 构造的时候会 encode


int cls_rgw_gc_list(IoCtx& io_ctx, string& oid, string& marker, uint32_t max, bool expired_only,
                    list<cls_rgw_gc_obj_info>& entries, bool *truncated, string& next_marker)
{
  bufferlist in, out;

  cls_rgw_gc_list_op call;
  call.marker = marker;
  call.max = max;
  call.expired_only = expired_only;
  //把 参数打包
  encode(call, in);
  //操作那个 obj id, 那个cls 模块, 那个函数, 输入,输出
  int r = io_ctx.exec(oid, RGW_CLASS, RGW_GC_LIST, in, out);
  if (r < 0)
    return r;

// out 为结果
  cls_rgw_gc_list_ret ret;
...
  return r;
}

io\_ctx.exec 看看这个怎么调用的


int librados::IoCtx::exec(const std::string& oid, const char *cls, const char *method,
			  bufferlist& inbl, bufferlist& outbl)
{
  object_t obj(oid);
  return io_ctx_impl->exec(obj, cls, method, inbl, outbl);
}

因为是读操作,所以这也是 operate\_read


int librados::IoCtxImpl::exec(const object_t& oid, const char *cls, const char *method,
			      bufferlist& inbl, bufferlist& outbl)
{
  ::ObjectOperation rd;
  prepare_assert_ops(&rd);
  //构造op 类型
  rd.call(cls, method, inbl);
//准备发送
  return operate_read(oid, &rd, &outbl);
}

rd.call(cls, method, inbl);


void call(const char *cname, const char *method, bufferlist &indata) {
// 这里记下,是  CEPH_OSD_OP_CALL 类型的
  add_call(CEPH_OSD_OP_CALL, cname, method, indata, NULL, NULL, NULL);
}

void add_call(int op, const char *cname, const char *method,
bufferlist &indata,
bufferlist *outbl, Context *ctx, int *prval) {

  OSDOp& osd_op = add_op(op);
  unsigned p = ops.size() - 1;
  out_handler[p] = ctx;
  out_bl[p] = outbl;
  out_rval[p] = prval;
  // 那个cls 模块
  osd_op.op.cls.class_len = strlen(cname);
  // 那个 cls 函数
  osd_op.op.cls.method_len = strlen(method);
  osd_op.op.cls.indata_len = indata.length();
  osd_op.indata.append(cname, osd_op.op.cls.class_len);
  osd_op.indata.append(method, osd_op.op.cls.method_len);
  osd_op.indata.append(indata);
}


int librados::IoCtxImpl::operate_read(const object_t& oid,::ObjectOperation *o,bufferlist *pbl, int flags)
{
  if (!o->size())
    return 0;

  Mutex mylock("IoCtxImpl::operate_read::mylock");
  Cond cond;
  bool done;
  int r;
  version_t ver;

  Context *onack = new C_SafeCond(&mylock, &cond, &done, &r);

  int op = o->ops[0].op.op;
  ldout(client->cct, 10) << ceph_osd_op_name(op) << " oid=" << oid << " nspace=" << oloc.nspace << dendl;
  Objecter::Op *objecter_op = objecter->prepare_read_op(oid, oloc,
	                                      *o, snap_seq, pbl, flags,
	                                      onack, &ver);
  // 把op 发给 随机选取的osd TODO: 怎么选取osd 后面文章会讲解
  objecter->op_submit(objecter_op);
  // Objecter::_op_submit_with_budget
  // Objecter::_op_submit
  // Objecter::_send_op_account
  return r;
}

看看osd 这边怎么处理的,gbd进入osd , 对于收到的op ,都会经过 PrimaryLogPG::do\_osd\_ops 做处理


int PrimaryLogPG::do_osd_ops(){

switch (op.op) {
    case CEPH_OSD_OP_CALL:
  {
	string cname, mname;
	// 获取cls 参数
	bufferlist indata;
	try {
	  bp.copy(op.cls.class_len, cname);
	  bp.copy(op.cls.method_len, mname);
	  bp.copy(op.cls.indata_len, indata);
	} catch (buffer::error& e) {
	.....
	  break;
	}

	ClassHandler::ClassData *cls;
	result = osd->class_handler->open_class(cname, &cls);
	ceph_assert(result == 0);   // init_op_flags() already verified this works.

	ClassHandler::ClassMethod *method = cls->get_method(mname.c_str());

	bufferlist outdata;
	dout(10) << "call method " << cname << "." << mname << dendl;
	int prev_rd = ctx->num_read;
	int prev_wr = ctx->num_write;
	//执行cls 函数
	result = method->exec((cls_method_context_t)&ctx, indata, outdata);

	op.extent.length = outdata.length();
	osd_op.outdata.claim_append(outdata);
	dout(30) << "out dump: ";
	osd_op.outdata.hexdump(*_dout);
	*_dout << dendl;
      }
	}
}

总结:

cls 调用流程也是伴随着io流程走的,上层作为客户端,把cls 参数编码到 bufferlist(ceph 中很通用的数据结构)作为 op 发给 osd,osd在根据 call method 在内存找到对应的函数指针; 回头看的话,欸,这不就是类似一个rpc 调用吗….


评论