rgwput流程-execute分析

Last updated on 3 months ago


前言

本文主题: 当我们用 s3cmd 上传文件是,rgw是如何处理的

image-20230207101046899

对于s3cmd(s3 api) 的每个请求操作,再rgw 中都有对应的 handle op 来处理
如 put 请求, 对于这类请求操作有专门的类做处理 RGWPutObj::execute()

在处理之前的流程基本都是通用的,即 解析请求 -> 验证请求 -> 生成对应hanle (流程多,单独一篇文章介绍)

对于每个 op主要处理流程都分为这个三个函数

image-20230207151911942

主要数据结构以及类关系

rgw 对象分布

对象上传的内容存放在 rgw.data池中 对象内容 上传 5M 的数据image-20230318165228693

rgw 会把数据切割成4M odj image-20230318165728649

index pool : 存放 rgw 对象的索引信息image-20230318170410994 主要分为两种:

.dir.[桶id].[shard_num] : 这个是 会存放该桶所包含的索引(上传对象的文件名字)
image-20230318171749283

[桶id]_[对象名字] : 这里以 xattr 形式存放着 上传对象的元数据image-20230318172247731

上传的方式又可以分为两种(data 池中存放的方式不同)

分段上传: 上传文件大小超过15M 后自动自动
image-20230318174052299

这里规则是: RGW对象由一个首部RADOS对象和多个分段对象组成,每个分段对象又由multipart RADOS对象和分片RADOS对象组成

image-20230318175806044

整体上传(小于15M):会以4M 的粒度写入rados

image-20230318180735994

image-20230318181020368

关键流程

RGWPutObj::pre_exec()

1
2
3
4
5
6
7
void rgw_bucket_object_pre_exec(struct req_state *s)
{
if (s->expect_cont)
dump_continue(s);
//解析 bucket 名字,构造 回复 header
dump_bucket_from_state(s);
}

RGWPutObj::execute()

该函数中 前面做一些 校验,后面根据不同上传方式来 初始化不同的功能类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
void RGWPutObj::execute()
{
//s->object 记录了对象属性 上传对象的信息
if (s->object.empty()) {
return;
}
// 验证 桶是否存在
if (!s->bucket_exists) {
op_ret = -ERR_NO_SUCH_BUCKET;
return;
}
// md5验证
if (supplied_md5_b64) {
......
}
// 验证该bucket 是否有足够的限额
if (!chunked_upload) {
op_ret = store->check_quota(s->bucket_owner.get_id(), s->bucket,
user_quota, bucket_quota, s->content_length);
}
// 是否支持 etag (校验文件) 类似与MD5??
if (supplied_etag) {
strncpy(supplied_md5, supplied_etag, sizeof(supplied_md5) - 1);
supplied_md5[sizeof(supplied_md5) - 1] = '\0';
}
//分段上传的标志位置
// (比如超过15M 要分割wen, multipart_upload_id 在处理请求时就已经初始化完成了)
const bool multipart = !multipart_upload_id.empty();

//接下来分为 三种方式 分段上传 , 追家
rgw::AioThrottle aio(store->ctx()->_conf->rgw_put_obj_min_window_size);
using namespace rgw::putobj;
constexpr auto max_processor_size = std::max({sizeof(MultipartObjectProcessor),
sizeof(AtomicObjectProcessor),
sizeof(AppendObjectProcessor)});
ceph::static_ptr<ObjectProcessor, max_processor_size> processor;
// 主要分为三种 分段上传, 追加, 原子上传

//分段上传
if (multipart) {
RGWMPObj mp(s->object.name, multipart_upload_id);

op_ret = get_multipart_info(store, s, mp.get_meta(), nullptr, nullptr, &upload_info);
// .....
pdest_placement = &upload_info.dest_placement;
ldpp_dout(this, 20) << "dest_placement for part=" << upload_info.dest_placement << dendl;
processor.emplace<MultipartObjectProcessor>(
&aio, store, s->bucket_info, pdest_placement,
s->owner.get_id(), obj_ctx, obj,
multipart_upload_id, multipart_part_num, multipart_part_str);
}

// 单个文件上传
{
//是否开启版本管理
if (s->bucket_info.versioning_enabled()) {
if (!version_id.empty()) {
obj.key.set_instance(version_id);
} else {
store->gen_rand_obj_instance_name(&obj);
version_id = obj.key.instance;
}
}
ldpp_dout(this, 20) << "other" << "other" << dendl;

//设置的存放规则
s->dest_placement = store->amend_placement_rule(s->dest_placement);
pdest_placement = &s->dest_placement;
s->bucket_info.placement_rule = s->dest_placement;
//初始化 processor, processor是一个管理 三种上传方式的类
processor.emplace<AtomicObjectProcessor>(
&aio, store, s->bucket_info, pdest_placement,
s->bucket_owner.get_id(), obj_ctx, obj, olh_epoch, s->req_id);
}
}

分段上传 :

对于一个大的RGW Object,会被切割成多个独立的RGW Object上传,称为multipart。multipar的优势是断点续传。s3接口默认切割大小为15MB(这个数值可以修改)

  • multipart: multipart分段首对象,

    {bucket_id}_multipart{prefix}.{multipart_id},其中multipart_id根据manifest计算;

  • shadow: 从属于multipart的分段对象,{bucket_id}_shadow{prefix}.{multipart_id}_{shadow_id},shadow_id:根据manifest.rule.part_sizemanifest.rule.stripe_max_size计算。

整体上传:

相关概念

小于 4M :

数据存放在 .rgw.buckets.data 池中,会生成一个rados 对象(默认4M)

大于4M :

文件会分割成 4M大小rados

c2c8dcbe-1203-4be1-b58b-5dc19a95fca9.4194.1__shadow_file.ECv7yKnV7oudlcFrIWHTsJ5QrYBhNdd_0
{桶id}__shadow_file.{upload_id} _{part_num}

具体流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
  //是否开启桶版本验证
if (s->bucket_info.versioning_enabled()) {
if (!version_id.empty()) {
obj.key.set_instance(version_id);
} else {
store->gen_rand_obj_instance_name(&obj);
version_id = obj.key.instance;
}
}
ldpp_dout(this, 20) << "atomic" << "other" << dendl;

//pdest_placement = &s->dest_placement; // storage class
s->dest_placement = store->amend_placement_rule(s->dest_placement);
pdest_placement = &s->dest_placement;

s->bucket_info.placement_rule = s->dest_placement;
//这里是初始化不同上传功能类 用 processor类来管理
processor.emplace<AtomicObjectProcessor>(
&aio, store, s->bucket_info, pdest_placement,
s->bucket_owner.get_id(), obj_ctx, obj, olh_epoch, s->req_id);
}
//关键 ,在正式上传数据前做一些初始化操作
op_ret = processor->prepare();

AtomicObjectProcessor::prepare()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
int int AtomicObjectProcessor::prepare()
{
uint64_t max_head_chunk_size;
uint64_t head_max_size;
uint64_t chunk_size = 0;
uint64_t alignment;
rgw_pool head_pool;

// 获取 上传对象存放的池
if (!store->get_obj_data_pool(bucket_info.placement_rule, head_obj, &head_pool)) {
ldout(store->ctx(), 0) << "fail to get head pool " << head_obj << ",rule=" << bucket_info.placement_rule << dendl;
return -EIO;
}
//设定 max_head_chunk_size值 (第一个 head)
int r = store->get_max_chunk_size(head_pool, &max_head_chunk_size, &alignment);
if (r < 0) {
return r;
}


manifest.set_trivial_rule(head_max_size, stripe_size);

/*change atomic obj oid, add obj-name*/
char buf[33];
gen_rand_alphanumeric(store->ctx(), buf, sizeof(buf) - 1);
string oid_prefix = ".";
oid_prefix.append(buf);
oid_prefix.append("_");

//生成 后缀 随机生成的内容
//c2c8dcbe-1203-4be1-b58b-5dc19a95fca9.4194.1__shadow_{file.ECv7yKnV7oudlcFrIWHTsJ5QrYBhNdd_}0
manifest.set_prefix( head_obj.key.name + oid_prefix);

r = manifest_gen.create_begin(store->ctx(), &manifest,
bucket_info.placement_rule,
&tail_placement_rule,
head_obj.bucket, head_obj);
if (r < 0) {
ldout(store->ctx(), 0) << "fail to create manifest " << head_obj << ",ret=" << r << dendl;
return r;
}
//生成 mainfest ? 关系head 文件
rgw_raw_obj stripe_obj = manifest_gen.get_cur_obj(store);

r = writer.set_stripe_obj(stripe_obj);
if (r < 0) {
ldout(store->ctx(), 0) << "fail to set_stripe_obj " << head_obj << ",ret=" << r << dendl;
return r;
}

set_head_chunk_size(head_max_size);
// initialize the processors
chunk = ChunkProcessor(&writer, chunk_size);
stripe = StripeProcessor(&chunk, this, head_max_size);
return 0;
}

循环读数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
do {
bufferlist data;
if (fst > lst)
break;
// 没有拷贝源
if (copy_source.empty()) {
// RGWPutObj_ObjStore::get_data(bufferlist& bl)
//每次都是抽出 4Md
len = get_data(data);
}
// 有拷贝源
else {
uint64_t cur_lst = min(fst + s->cct->_conf->rgw_max_chunk_size - 1, lst);
op_ret = get_data(fst, cur_lst, data);
if (op_ret < 0)
return;
len = data.length();
s->content_length += len;
fst += len;
}
if (len < 0) {
op_ret = len;
ldpp_dout(this, 20) << "get_data() returned ret=" << op_ret << dendl;
return;
} else if (len == 0) {
break;
}

if (need_calc_md5) {
hash.Update((const unsigned char *)data.c_str(), data.length());
}

/* update torrrent */ //计算 sh1值
torrent.update(data);
// HeadObjectProcessor::process 开始处理函数
op_ret = filter->process(std::move(data), ofs);
if (op_ret < 0) {
ldpp_dout(this, 20) << "processor->process() returned ret="
<< op_ret << dendl;
return;
}

ofs += len;
} while (len > 0);

filter->process(std::move(data), ofs); —-> HeadObjectProcessor::process

收到数据 现在这处理(关键处理函数)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
int HeadObjectProcessor::process(bufferlist&& data, uint64_t logical_offset)
{
//等数据 读完了 更新 flush转态
const bool flush = (data.length() == 0);
// capture the first chunk for special handling
// data_offset 记录了当前这个对象大小的偏移量
if (data_offset < head_chunk_size || data_offset == 0) {
if (flush) {
// flush partial chunk
//不足一个Chunk,在这里处理,数据从 head_data 读取。
ldout(store->ctx(), 20) << "process flush offset:" << data_offset << " data len:" << head_data.length() <<dendl;
return process_first_chunk(std::move(head_data), &processor);
}

auto remaining = head_chunk_size - data_offset;
auto count = std::min<uint64_t>(data.length(), remaining);
data.splice(0, count, &head_data);
data_offset += count;

//head 大小刚好 达到4M
if (data_offset == head_chunk_size) {
// process the first complete chunk
//这里只处理整个CHUNK的写入,不足一个Chunk的,数据暂存在 head_data里,在后续flush 操作中处理
ceph_assert(head_data.length() == head_chunk_size);

ldout(store->ctx(), 20) << "process first chunk, offset:" << data_offset << " data len:" << head_data.length() <<dendl;

int r = process_first_chunk(std::move(head_data), &processor);
if (r < 0) {
return r;
}
}
ldout(store->ctx(), 20) << " !!! "<<dendl;

if (data.length() == 0) { // avoid flushing stripe processor
return 0;
}
}

ceph_assert(processor); // process_first_chunk() must initialize

// send everything else through the processor
auto write_offset = data_offset;
data_offset += data.length();

ldout(store->ctx(), 20) << "process data, write offset:" << write_offset << " data len:" << data.length() <<dendl;
//StripeProcessor::process(bufferlist&& data, uint64_t offset)
return processor->process(std::move(data), write_offset);
}

HeadObjectProcessor::process 主要作用是 把首个4M的对象单独处理,此外将4M以后的 数据交给下一个 process (StripeProcessor::process)

4M 以后写数据流程 是 StripeProcessor::process -> ChunkProcessor::process() -> RadosWriter::process()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
int StripeProcessor::process(bufferlist&& data, uint64_t offset)
{

//这里的 bounds.first 和 bounds.second 是这这个范围 [first,second] 在
ceph_assert(offset >= bounds.first);
const bool flush = (data.length() == 0);

if (flush) {
return Pipe::process({}, offset - bounds.first);
}

auto max = bounds.second - offset;

while (data.length() > max) {
//max > 0 说r明不够一个 条带??
if (max > 0) {
bufferlist bl;
data.splice(0, max, &bl);

int r = Pipe::process(std::move(bl), offset - bounds.first);
if (r < 0) {
return r;
}
offset += max;
}

// flush the current chunk ChunkProcessor::process
int r = Pipe::process({}, offset - bounds.first); // -> ChunkProcessor::process ->
if (r < 0) {
return r;
}
// generate the next stripe
uint64_t stripe_size;
r = gen->next(offset, &stripe_size);
if (r < 0) {
return r;
}
ceph_assert(stripe_size > 0);

bounds.first = offset;
bounds.second = offset + stripe_size;

max = stripe_size; //max 为4M
}

if (data.length() == 0) { // don't flush the chunk here
return 0;
}
// offset - bounds.first = 0
return Pipe::process(std::move(data), offset - bounds.first);
}

接下来 以上传一个 10M 大小的文件例子,结合代码数据是怎么切割下发到 rados的

10M 的属于整体上传,会被切割成3份(在 上面所提到的do… while 中每次取 4M) ,分别是 4M 4M 2M

第一个4M

1
2
3
4
5
6
7
8
9
10
// data 为 4M  ofs 最开始的偏移量为 0 
filter->process(std::move(data), ofs); //(4M,0)
--->HeadObjectProcessor::process(bufferlist&& data, uint64_t logical_offset) // (4M,0)
//满足第一个对象 满足4M
--->process_first_chunk(std::move(head_data), &processor);
//直接入一个对象
--->writer.process(std::move(data), 0) // RadosWriter::process(4M,0)
---> op.write_full(data)
ofs += len; //更新偏移量 len 为data长度 此时 ofs 为4M

第二个 4M

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
//还是在这个循环中  又抽取了4M

filter->process(std::move(data), ofs); //(4M , 4M)
--->HeadObjectProcessor::process(bufferlist&& data, uint64_t logical_offset) // (4M,4M)
//此时不是第一个对象 ,直接交由 StripeProcessor 处理
---> processor->process(std::move(data), write_offset); //StripeProcessor::process(4M, 4M)
--->const bool flush = (data.length() == 0); // flush = false;
--->auto max = bounds.second - offset; // max = 4M - 4M bounds.second 是条带上限 默认 4M,详情见prepare
---> while (data.length() > max) {..} // 进入循环 data.length() = 4M
---> Pipe::process({}, offset - bounds.first)// ChunkProcessor::process({},4M) data 为空,相当于没写数据
---> gen->next(offset, &stripe_size); // 预先写好 manifest(后续再说) ManifestObjectProcessor::next
---> bounds.first = offset; //更新strip 边界 bounds.first = 4M
---> bounds.second = offset + stripe_size; // bounds.second = 8M
---> max = stripe_size; //max 为4M
---> Pipe::process(std::move(data), offset - bounds.first); // ChunkProcessor::process(4M,0)
---> ChunkProcessor::process(bufferlist&& data, uint64_t offset) //
---> int64_t position = offset - chunk.length(); // 4M - 0 目前还没有给chunk 存数据 chunk.length() 为0
---> flush = (data.length() == 0); // flush =false
---> chunk.claim_append(data); //取出 data 数据 并清空data
---> while (chunk.length() >= chunk_size) // chunk.length() = 4 m chunk_size 默认4M 详情见prepare
---> bufferlist bl;
---> chunk.splice(0, chunk_size, &bl); // chunk 会被清除
---> int r = Pipe::process(std::move(bl), position); // RadosWriter::process(4M,4M)
---> op.write(offset, data)
ofs += len; //更新偏移量 len 为data长度 此时 ofs 为8M


剩下的数据 2M

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
filter->process(std::move(data), ofs);  //(2M , 8M)
--->HeadObjectProcessor::process(bufferlist&& data, uint64_t logical_offset) // (2M,8M)
//此时不是第一个对象 ,直接交由 StripeProcessor 处理
---> processor->process(std::move(data), write_offset); //StripeProcessor::process(4M, 8M) offset = 8M
---> const bool flush = (data.length() == 0); // flush = false;
---> auto max = bounds.second - offset; // mac = 8M - 8M
---> while (data.length() > max) {..} // 进入循环 data.length() = 2M
---> Pipe::process({}, offset - bounds.first)// ChunkProcessor::process({},4M) data 为空,相当于没写数据
---> gen->next(offset, &stripe_size); // 预先写好 manifest(后续再说) ManifestObjectProcessor::next
---> bounds.first = offset; //更新strip 边界 bounds.first = 8M
---> bounds.second = offset + stripe_size; // bounds.second = 12M
---> max = stripe_size; //max 为4M
---> Pipe::process(std::move(data), offset - bounds.first); //
---> ChunkProcessor::process(bufferlist&& data, uint64_t offset) // ChunkProcessor::process(2M,0)
---> int64_t position = offset - chunk.length(); // 2M - 0 目前还没有给chunk 存数据 chunk.length() 为0
---> flush = (data.length() == 0); // flush =false
---> chunk.claim_append(data); //取出 data 数据 并清空data
---> while (chunk.length() >= chunk_size) // chunk.length() = 2m 没有进入循环 ,这个数据留在最后刷新


---> bufferlist bl;
---> chunk.splice(0, chunk_size, &bl);
---> int r = Pipe::process(std::move(bl), position); // RadosWriter::process(4M,4M)
---> op.write(offset, data)
ofs += len; //更新偏移量 len 为data长度 此时 ofs 为10M



// 数据读取完后 ,刷新 残留的数据
// flush any data in filters
op_ret = filter->process({}, ofs); //ofs = 10M
--->HeadObjectProcessor::process(bufferlist&& data, uint64_t logical_offset) // (0,10M)
---> processor->process(std::move(data), write_offset); //StripeProcessor::process(0, 10M) offset = 10M
---> const bool flush = (data.length() == 0); // flush = true;
---> Pipe::process({}, offset - bounds.first); // ChunkProcessor::process({},2M) 10M -8M
---> ChunkProcessor::process({},2M)
---> const bool flush = (data.length() == 0); //flush == true
---> if (chunk.length() > 0)
---> Pipe::process(std::move(chunk), position); // RadosWriter::process(2M,0)
---> op.write_full(data);

整体上传比较特殊的是 第一个 part 和 最后一段数据 (do….while 读取完才刷新)

接下介绍对象 元数据是如何存的(head )