Administrator
发布于 2023-03-11 / 4 阅读
0
0

rgwput流程-execute分析

rgwput流程-execute分析

前言

本文主题: 当我们用 s3cmd 上传文件是,rgw是如何处理的

!image-20230207101046899

对于s3cmd(s3 api) 的每个请求操作,再rgw 中都有对应的 handle op 来处理

如 put 请求, 对于这类请求操作有专门的类做处理 RGWPutObj::execute()

在处理之前的流程基本都是通用的,即 解析请求 -> 验证请求 -\> 生成对应hanle (流程多,单独一篇文章介绍)

对于每个 op主要处理流程都分为这个三个函数

!image-20230207151911942

主要数据结构以及类关系

rgw 对象分布

对象上传的内容存放在 rgw.data池中 对象内容 上传 5M 的数据!image-20230318165228693

rgw 会把数据切割成4M odj !image-20230318165728649

index pool : 存放 rgw 对象的索引信息!image-20230318170410994 主要分为两种:

.dir.\[桶id\].\[shard\_num\] : 这个是 会存放该桶所包含的索引(上传对象的文件名字)

!image-20230318171749283

\[桶id\]\_\[对象名字\] : 这里以 xattr 形式存放着 上传对象的元数据!image-20230318172247731

上传的方式又可以分为两种(data 池中存放的方式不同)

分段上传: 上传文件大小超过15M 后自动自动

!image-20230318174052299

这里规则是: RGW对象由一个首部RADOS对象和多个分段对象组成,每个分段对象又由multipart RADOS对象和分片RADOS对象组成

!image-20230318175806044

整体上传(小于15M):会以4M 的粒度写入rados

!image-20230318180735994

!image-20230318181020368

关键流程

RGWPutObj::pre_exec() ") RGWPutObj::pre\_exec()


void rgw_bucket_object_pre_exec(struct req_state *s)
{
  if (s->expect_cont)
    dump_continue(s);
//解析 bucket 名字,构造 回复 header
  dump_bucket_from_state(s);
}

RGWPutObj::execute() ") RGWPutObj::execute()

该函数中 前面做一些 校验,后面根据不同上传方式来 初始化不同的功能类


void RGWPutObj::execute()
{
//s->object  记录了对象属性 上传对象的信息
  if (s->object.empty()) {
    return;
  }
// 验证 桶是否存在
  if (!s->bucket_exists) {
     op_ret = -ERR_NO_SUCH_BUCKET;
     return;
  }
 // md5验证
  if (supplied_md5_b64) {
    ......
 }
 // 验证该bucket 是否有足够的限额
   if (!chunked_upload) {
   op_ret = store->check_quota(s->bucket_owner.get_id(), s->bucket,
             user_quota, bucket_quota, s->content_length);
 }
// 是否支持 etag (校验文件) 类似与MD5??
  if (supplied_etag) {
    strncpy(supplied_md5, supplied_etag, sizeof(supplied_md5) - 1);
    supplied_md5[sizeof(supplied_md5) - 1] = '\0';
  }
//分段上传的标志位置
// (比如超过15M 要分割wen, multipart_upload_id 在处理请求时就已经初始化完成了)
 const bool multipart = !multipart_upload_id.empty();

//接下来分为 三种方式  分段上传 , 追家
  rgw::AioThrottle aio(store->ctx()->_conf->rgw_put_obj_min_window_size);
  using namespace rgw::putobj;
  constexpr auto max_processor_size = std::max({sizeof(MultipartObjectProcessor),
                                               sizeof(AtomicObjectProcessor),
                                               sizeof(AppendObjectProcessor)});
  ceph::static_ptr<ObjectProcessor, max_processor_size> processor;
// 主要分为三种   分段上传, 追加,  原子上传

 //分段上传
 if (multipart) {
    RGWMPObj mp(s->object.name, multipart_upload_id);

    op_ret = get_multipart_info(store, s, mp.get_meta(), nullptr, nullptr, &upload_info);
//   .....
    pdest_placement = &upload_info.dest_placement;
    ldpp_dout(this, 20) << "dest_placement for part=" << upload_info.dest_placement << dendl;
    processor.emplace<MultipartObjectProcessor>(
        &aio, store, s->bucket_info, pdest_placement,
        s->owner.get_id(), obj_ctx, obj,
        multipart_upload_id, multipart_part_num, multipart_part_str);
  }

// 单个文件上传
{
    //是否开启版本管理
    if (s->bucket_info.versioning_enabled()) {
      if (!version_id.empty()) {
        obj.key.set_instance(version_id);
      } else {
        store->gen_rand_obj_instance_name(&obj);
        version_id = obj.key.instance;
      }
    }
    ldpp_dout(this, 20) << "other" << "other" << dendl;

//设置的存放规则
    s->dest_placement = store->amend_placement_rule(s->dest_placement);
    pdest_placement = &s->dest_placement;
    s->bucket_info.placement_rule = s->dest_placement;
//初始化  processor,  processor是一个管理 三种上传方式的类
    processor.emplace<AtomicObjectProcessor>(
        &aio, store, s->bucket_info, pdest_placement,
        s->bucket_owner.get_id(), obj_ctx, obj, olh_epoch, s->req_id);
  }
}

分段上传 :

对于一个大的RGW Object,会被切割成多个独立的RGW Object上传,称为multipart。multipar的优势是断点续传。s3接口默认切割大小为15MB(这个数值可以修改)

  • multipart: multipart分段首对象,

{bucket\_id}\_multipart{prefix}.{multipart\_id},其中 multipart_id 根据 manifest 计算;

  • shadow: 从属于multipart的分段对象,{bucket\_id}\_ _shadow_{prefix}.{multipart\_id}\_{shadow\_id}, shadow_id:根据 manifest.rule.part_sizemanifest.rule.stripe_max_size 计算。

整体上传:

相关概念

小于 4M :

数据存放在 .rgw.buckets.data 池中,会生成一个rados 对象(默认4M)

大于4M : 大于4M :

文件会分割成 4M大小rados

c2c8dcbe-1203-4be1-b58b-5dc19a95fca9.4194.1\_\_shadow\_file.ECv7yKnV7oudlcFrIWHTsJ5QrYBhNdd\_0

{桶id}\_\_shadow\_file.{upload\_id} \_{part\_num}

具体流程


  //是否开启桶版本验证
    if (s->bucket_info.versioning_enabled()) {
      if (!version_id.empty()) {
        obj.key.set_instance(version_id);
      } else {
        store->gen_rand_obj_instance_name(&obj);
        version_id = obj.key.instance;
      }
    }
    ldpp_dout(this, 20) << "atomic" << "other" << dendl;

    //pdest_placement = &s->dest_placement; // storage class
    s->dest_placement = store->amend_placement_rule(s->dest_placement);
    pdest_placement = &s->dest_placement;

    s->bucket_info.placement_rule = s->dest_placement;
    //这里是初始化不同上传功能类  用 processor类来管理
    processor.emplace<AtomicObjectProcessor>(
        &aio, store, s->bucket_info, pdest_placement,
        s->bucket_owner.get_id(), obj_ctx, obj, olh_epoch, s->req_id);
	}
//关键 ,在正式上传数据前做一些初始化操作
  op_ret = processor->prepare();

AtomicObjectProcessor::prepare() ") AtomicObjectProcessor::prepare()


int int AtomicObjectProcessor::prepare()
{
  uint64_t max_head_chunk_size;
  uint64_t head_max_size;
  uint64_t chunk_size = 0;
  uint64_t alignment;
  rgw_pool head_pool;

 // 获取 上传对象存放的池
  if (!store->get_obj_data_pool(bucket_info.placement_rule, head_obj, &head_pool)) {
    ldout(store->ctx(), 0) << "fail to get head pool " << head_obj << ",rule=" << bucket_info.placement_rule << dendl;
    return -EIO;
  }
//设定 max_head_chunk_size值 (第一个 head)
  int r = store->get_max_chunk_size(head_pool, &max_head_chunk_size, &alignment);
  if (r < 0) {
    return r;
  }

  manifest.set_trivial_rule(head_max_size, stripe_size);

  /*change atomic obj oid, add obj-name*/
  char buf[33];
  gen_rand_alphanumeric(store->ctx(), buf, sizeof(buf) - 1);
  string oid_prefix = ".";
  oid_prefix.append(buf);
  oid_prefix.append("_");

  //生成 后缀 随机生成的内容
  //c2c8dcbe-1203-4be1-b58b-5dc19a95fca9.4194.1__shadow_{file.ECv7yKnV7oudlcFrIWHTsJ5QrYBhNdd_}0
  manifest.set_prefix( head_obj.key.name + oid_prefix);

  r = manifest_gen.create_begin(store->ctx(), &manifest,
                                bucket_info.placement_rule,
                                &tail_placement_rule,
                                head_obj.bucket, head_obj);
  if (r < 0) {
    ldout(store->ctx(), 0) << "fail to create manifest " << head_obj << ",ret=" << r << dendl;
    return r;
  }
//生成 mainfest ?  关系head 文件
  rgw_raw_obj stripe_obj = manifest_gen.get_cur_obj(store);

  r = writer.set_stripe_obj(stripe_obj);
  if (r < 0) {
    ldout(store->ctx(), 0) << "fail to set_stripe_obj " << head_obj << ",ret=" << r << dendl;
    return r;
  }

  set_head_chunk_size(head_max_size);
  // initialize the processors
  chunk = ChunkProcessor(&writer, chunk_size);
  stripe = StripeProcessor(&chunk, this, head_max_size);
  return 0;
}

循环读数据


do {
    bufferlist data;
    if (fst > lst)
      break;
    // 没有拷贝源
    if (copy_source.empty()) {
    // RGWPutObj_ObjStore::get_data(bufferlist& bl)
    //每次都是抽出 4Md
      len = get_data(data);
    }
    // 有拷贝源
    else {
      uint64_t cur_lst = min(fst + s->cct->_conf->rgw_max_chunk_size - 1, lst);
      op_ret = get_data(fst, cur_lst, data);
      if (op_ret < 0)
        return;
      len = data.length();
      s->content_length += len;
      fst += len;
    }
    if (len < 0) {
      op_ret = len;
      ldpp_dout(this, 20) << "get_data() returned ret=" << op_ret << dendl;
      return;
    } else if (len == 0) {
      break;
    }

    if (need_calc_md5) {
      hash.Update((const unsigned char *)data.c_str(), data.length());
    }

    /* update torrrent */ //计算 sh1值
    torrent.update(data);
    // HeadObjectProcessor::process 开始处理函数
    op_ret = filter->process(std::move(data), ofs);
    if (op_ret < 0) {
      ldpp_dout(this, 20) << "processor->process() returned ret="
          << op_ret << dendl;
      return;
    }

    ofs += len;
  } while (len > 0);

filter->process(std::move(data), ofs); —-\> HeadObjectProcessor::process

收到数据 现在这处理(关键处理函数)


int HeadObjectProcessor::process(bufferlist&& data, uint64_t logical_offset)
{
 //等数据 读完了 更新   flush转态
  const bool flush = (data.length() == 0);
  // capture the first chunk for special handling
  // data_offset 记录了当前这个对象大小的偏移量
  if (data_offset < head_chunk_size || data_offset == 0) {
    if (flush) {
      // flush partial chunk
      //不足一个Chunk,在这里处理,数据从 head_data 读取。
      ldout(store->ctx(), 20)  << "process flush offset:" << data_offset << " data len:" << head_data.length() <<dendl;
      return process_first_chunk(std::move(head_data), &processor);
    }

    auto remaining = head_chunk_size - data_offset;
    auto count = std::min<uint64_t>(data.length(), remaining);
    data.splice(0, count, &head_data);
    data_offset += count;

    //head 大小刚好 达到4M
    if (data_offset == head_chunk_size) {
      // process the first complete chunk
      //这里只处理整个CHUNK的写入,不足一个Chunk的,数据暂存在 head_data里,在后续flush 操作中处理
      ceph_assert(head_data.length() == head_chunk_size);

      ldout(store->ctx(), 20)  << "process first chunk, offset:" << data_offset << " data len:" << head_data.length() <<dendl;

      int r = process_first_chunk(std::move(head_data), &processor);
      if (r < 0) {
        return r;
      }
    }
      ldout(store->ctx(), 20)   << " !!!  "<<dendl;

    if (data.length() == 0) { // avoid flushing stripe processor
      return 0;
    }
  }

  ceph_assert(processor); // process_first_chunk() must initialize

  // send everything else through the processor
  auto write_offset = data_offset;
  data_offset += data.length();

  ldout(store->ctx(), 20)   << "process data, write offset:" << write_offset << " data len:" << data.length() <<dendl;
  //StripeProcessor::process(bufferlist&& data, uint64_t offset)
  return processor->process(std::move(data), write_offset);
}

HeadObjectProcessor::process 主要作用是 把首个4M的对象单独处理,此外将4M以后的 数据交给下一个 process (StripeProcessor::process)

4M 以后写数据流程 是 StripeProcessor::process -> ChunkProcessor::process() -> RadosWriter::process()


int StripeProcessor::process(bufferlist&& data, uint64_t offset)
{

  //这里的 bounds.first 和 bounds.second  是这这个范围   [first,second]  在
  ceph_assert(offset >= bounds.first);
  const bool flush = (data.length() == 0);

  if (flush) {
    return Pipe::process({}, offset - bounds.first);
  }

  auto max = bounds.second - offset;

  while (data.length() > max) {
    //max > 0 说r明不够一个 条带??
    if (max > 0) {
      bufferlist bl;
      data.splice(0, max, &bl);

      int r = Pipe::process(std::move(bl), offset - bounds.first);
      if (r < 0) {
        return r;
      }
      offset += max;
    }

    // flush the current chunk  ChunkProcessor::process
    int r = Pipe::process({}, offset - bounds.first);  // -> ChunkProcessor::process ->
    if (r < 0) {
      return r;
    }
    // generate the next stripe
    uint64_t stripe_size;
    r = gen->next(offset, &stripe_size);
    if (r < 0) {
      return r;
    }
    ceph_assert(stripe_size > 0);

    bounds.first = offset;
    bounds.second = offset + stripe_size;

    max = stripe_size; //max 为4M
  }

  if (data.length() == 0) { // don't flush the chunk here
    return 0;
  }
  //  offset - bounds.first = 0
  return Pipe::process(std::move(data), offset - bounds.first);
}

接下来 以上传一个 10M 大小的文件例子,结合代码数据是怎么切割下发到 rados的

10M 的属于整体上传,会被切割成3份(在 上面所提到的do… while 中每次取 4M) ,分别是 4M 4M 2M

第一个4M


// data 为 4M  ofs 最开始的偏移量为 0
filter->process(std::move(data), ofs); //(4M,0)
--->HeadObjectProcessor::process(bufferlist&& data, uint64_t logical_offset)   //  (4M,0)
   //满足第一个对象 满足4M
    --->process_first_chunk(std::move(head_data), &processor);
	//直接入一个对象
		--->writer.process(std::move(data), 0)      // RadosWriter::process(4M,0)
    		--->  op.write_full(data)
ofs += len; //更新偏移量 len 为data长度  此时 ofs 为4M

第二个 4M


//还是在这个循环中  又抽取了4M

filter->process(std::move(data), ofs);  //(4M , 4M)
--->HeadObjectProcessor::process(bufferlist&& data, uint64_t logical_offset)   //  (4M,4M)
	//此时不是第一个对象 ,直接交由 StripeProcessor 处理
	---> processor->process(std::move(data), write_offset);  //StripeProcessor::process(4M, 4M)
        --->const bool flush = (data.length() == 0);  //  flush = false;
        --->auto max = bounds.second - offset;        //   max = 4M - 4M      bounds.second 是条带上限 默认 4M,详情见prepare
        --->  while (data.length() > max) {..} 		 //    进入循环  data.length() = 4M
        --->  Pipe::process({}, offset - bounds.first)//   ChunkProcessor::process({},4M)  data 为空,相当于没写数据
        --->  gen->next(offset, &stripe_size);        //   预先写好 manifest(后续再说)   ManifestObjectProcessor::next
        --->  bounds.first = offset; //更新strip 边界    bounds.first = 4M
        --->  bounds.second = offset + stripe_size;  // bounds.second = 8M
        --->  max = stripe_size; //max 为4M
	    --->  Pipe::process(std::move(data), offset - bounds.first);  // ChunkProcessor::process(4M,0)
			---> ChunkProcessor::process(bufferlist&& data, uint64_t offset) //
             ---> int64_t position = offset - chunk.length();    // 4M - 0  目前还没有给chunk 存数据  chunk.length() 为0
			---> flush = (data.length() == 0);     // flush =false
			---> chunk.claim_append(data);         //取出 data 数据 并清空data
			---> while (chunk.length() >= chunk_size)   // chunk.length() = 4 m  chunk_size 默认4M   详情见prepare
             ---> bufferlist bl;
   			---> chunk.splice(0, chunk_size, &bl);      // chunk 会被清除
             ---> int r = Pipe::process(std::move(bl), position);  // RadosWriter::process(4M,4M)
				---> op.write(offset, data)
ofs += len; //更新偏移量 len 为data长度  此时 ofs 为8M

剩下的数据 2M 剩下的数据 2M


filter->process(std::move(data), ofs);  //(2M , 8M)
--->HeadObjectProcessor::process(bufferlist&& data, uint64_t logical_offset)   //  (2M,8M)
    //此时不是第一个对象 ,直接交由 StripeProcessor 处理
	---> processor->process(std::move(data), write_offset);  //StripeProcessor::process(4M, 8M) offset = 8M
	    ---> const bool flush = (data.length() == 0);  //  flush = false;
	    ---> auto max = bounds.second - offset;		// mac = 8M - 8M
	    --->  while (data.length() > max) {..} 		 //    进入循环  data.length() = 2M
        --->  Pipe::process({}, offset - bounds.first)//   ChunkProcessor::process({},4M)  data 为空,相当于没写数据
        --->  gen->next(offset, &stripe_size);        //   预先写好 manifest(后续再说)   ManifestObjectProcessor::next
        --->  bounds.first = offset; //更新strip 边界    bounds.first = 8M
        --->  bounds.second = offset + stripe_size;  // bounds.second = 12M
        --->  max = stripe_size; //max 为4M
	    --->  Pipe::process(std::move(data), offset - bounds.first);  //
			---> ChunkProcessor::process(bufferlist&& data, uint64_t offset) // ChunkProcessor::process(2M,0)
			---> int64_t position = offset - chunk.length();    // 2M - 0  目前还没有给chunk 存数据  chunk.length() 为0
			---> flush = (data.length() == 0);     // flush =false
			---> chunk.claim_append(data);         //取出 data 数据 并清空data
			---> while (chunk.length() >= chunk_size)   // chunk.length() = 2m  没有进入循环 ,这个数据留在最后刷新

             ---> bufferlist bl;
   			---> chunk.splice(0, chunk_size, &bl);
             ---> int r = Pipe::process(std::move(bl), position);  // RadosWriter::process(4M,4M)
				---> op.write(offset, data)
ofs += len; //更新偏移量 len 为data长度  此时 ofs 为10M

                    // 数据读取完后 ,刷新 残留的数据
 // flush any data in filters
op_ret = filter->process({}, ofs);   //ofs  = 10M
--->HeadObjectProcessor::process(bufferlist&& data, uint64_t logical_offset)   //  (0,10M)
	---> processor->process(std::move(data), write_offset);  //StripeProcessor::process(0, 10M) offset = 10M
     	---> const bool flush = (data.length() == 0);  //  flush = true;
		---> Pipe::process({}, offset - bounds.first); // ChunkProcessor::process({},2M)     10M -8M
			---> ChunkProcessor::process({},2M)
             ---> const bool flush = (data.length() == 0);  //flush == true
			---> if (chunk.length() > 0)
             	 ---> Pipe::process(std::move(chunk), position);    // RadosWriter::process(2M,0)
 					---> op.write_full(data);

整体上传比较特殊的是 第一个 part 和 最后一段数据 (do….while 读取完才刷新)

接下介绍对象 元数据是如何存的(head )


评论