OSD进程从网络收到客户端的读写请求后,交由OSD模块执行核心的请求处理逻辑,本篇博文将讨论OSD模块的实现原理。
OSD模块中对象、流程概览
我们先整体来看一下OSD模块的对象和流程:
回顾前文对SimpleMessenger的分析,我们看到ms_pipe_read线程从网络中收到请求消息后,通过fast_dispatch接口将消息分发给OSD对象进行处理。OSD对象针对接收到的每一个消息,都会将它们放入一个工作队列中(ShardedOpWQ,片式队列)。到这里,ms_pipe_read线程的分发动作就执行完了。对于一个片式队列,会有若干个处理线程,即图中的tp_osd_tp线程,每个处理线程负责处理不同分片中的消息。它们将各自分片中的消息取出后,找到每个消息对应的PG对象(Placement Group),进而将消息封装成操作(op)转给PG对象处理。PG对象针对读操作将直接从filestore中读出内容并返回响应消息给客户端;而对于写操作,PG对象将请求以事务(Transaction)的方式提交给PGBackend对象(本文主要讨论ReplicatedBackend),最终事务内的操作会转变成对filestore的操作(我们将在独立的博文中讨论filestore模块的实现原理)。
为什么一个请求消息要在两个线程(ms_pipe_read和tp_osd_tp)间传递处理?其实这里体现了ceph一个核心的设计理念:流水线。将请求的处理分成多个步骤,每个步骤放在不同的线程中处理;请求从一个线程流动到下一个线程,类似工产里的流水流;这样可以大大提升处理请求的吞吐量(即每秒完成的请求数量)。那么时延呢?
OSD模块中类的概览
下面我们来看看OSD模块中涉及的主要类及其关系:
- OSD是核心类,它内部包含两个Messenger对象指针,分别指向cluster网络(cluster_messenger)和public网络(client_messenger)。store指向后端对象存储池,用来进行实际的对象存取操作。内部包含一个片式队列(ShardedOpWQ)和一个处理线程池(SharedThreadPool),OSD对象将请求消息放入片式队列中,再由不同的处理线程从队列中取出消息进行下一步处理。OSD中还包含全局的OSDMap和映射到本OSD的所有PG对象。
- ShardedOpWQ类代表片式队列,number_shards是队列中总的分片数,每个分片都包含一个ShardedData,其内部有一个优先级队列用来接收请求消息(通过_enqueue操作)。每个片式队列都关联一个处理线程池,池中的每个线程都通过_process接口从对应队列中取出消息进行后续处理。
- PrimaryLogPG类继承PG类,代表具体的Placement Group的一种实现。每个PrimaryLogPG对象包含一个pgbackend对象,该对象负责数据复本的处理。目前有两种数据复本的实现方式,Replicated(复制)和EC(校验码),分别对应ReplicatedBackend和ECBackend。
代码详解
1. 初始化过程
OSD模块的初始化代码位于OSD:init中,代码流程比较锁碎。这里我们给出初始化调用栈,并作一些简要说明:
- 在挂载后端ObjectStore后,OSD模块调用load_pgs开始加载后端ObjectStore中保存的pg对象。这里先枚举ObjectStore中所有的pg,例如对于filestore,将查找CURRENT目录下的所有子目录(每个子目录代表一个pg);然后打开该pg(生成具体的PG对象,如PrimaryLogPG)并读取pg状态信息。
- 启动osd_op_tp线程池,开始对op_shardedwq中的消息进行处理。
ceph_osd.cc:main()
\-OSD::init()
|-OSD::load_pgs()
| |-ObjectStore::list_collections()
| |-OSD::_open_lock_pg()
| |-ObjectStore::open_collections()
| \-PG::read_state()
\-osd_op_tp.start()
2. 请求处理过程
我们先来分析一下ms_pipe_read中的请求消息分发流程,其栈心处理函数为OSD::ms_fast_dispatch:
OSD::ms_fast_dispatch()
|-OpTracker::create_request()
\-OSD::dispatch_session_waiting()
\-OSD::enqueue_op()
\-ShardedOpWQ::queue()
ceph/src/osd/OSD.cc:
void OSD::ms_fast_dispatch(Message *m)
{
/*将消息封装成op*/
OpRequestRef op = op_tracker.create_request<OpRequest, Message*>(m);
...
if (m->get_connection()->has_features(CEPH_FEATUREMASK_RESEND_ON_SPLIT) ||
m->get_type() != CEPH_MSG_OSD_OP) {
// queue it directly
...
} else {
// legacy client, and this is an MOSDOp (the *only* fast dispatch
// message that didn't have an explicit spg_t); we need to map
// them to an spg_t while preserving delivery order.
/*将op放入当前连接的会话上下文中,待获取到OSDMap后进行处理*/
Session *session = static_cast<Session*>(m->get_connection()->get_priv());
if (session) {
{
Mutex::Locker l(session->session_dispatch_lock);
op->get();
session->waiting_on_map.push_back(*op);
OSDMapRef nextmap = service.get_nextmap_reserved();
dispatch_session_waiting(session, nextmap);
service.release_map(nextmap);
}
session->put();
}
}
}
void OSD::dispatch_session_waiting(Session *session, OSDMapRef osdmap)
{
/*遍历session中waiting_on_map中的每个op进行处理*/
auto i = session->waiting_on_map.begin();
while (i != session->waiting_on_map.end()) {
OpRequestRef op = &(*i);
const MOSDFastDispatchOp *m = static_cast<const MOSDFastDispatchOp*>(op->get_req());
...
session->waiting_on_map.erase(i++);
op->put();
spg_t pgid;
if (m->get_type() == CEPH_MSG_OSD_OP) {
/*根据消息中记录的pg(对象名称的hash值)计算实际pg(根据pg数取余)*/
pg_t actual_pgid = osdmap->raw_pg_to_pg(static_cast<const MOSDOp*>(m)->get_pg());
if (!osdmap->get_primary_shard(actual_pgid, &pgid)) {
continue;
}
} else {
pgid = m->get_spg();
}
/*依据实际pgid将消息放入片式队列*/
enqueue_op(pgid, op, m->get_map_epoch());
}
...
}
void OSD::enqueue_op(spg_t pg, OpRequestRef& op, epoch_t epoch)
{
...
op_shardedwq.queue(make_pair(pg, PGQueueable(op, epoch)));
}
接下来我们看一下tp_osd_tp线程是如何处理分片中的请求,线程处理的核心函数是ShardedOpWQ::_process,其调用栈如下:
ShardedOpWQ::_process()
|-OpQueue<>::dequeue()
|-OSD::_look_up_pg()
\-PGQueueable::run()
\-PrimrayLogPG::do_request()
\-PrimaryLogPG::do_op()
\-PrimaryLogPG::execute_ctx()
|-PrimaryLogPG::prepare_transaction()
| \-PrimaryLogPG::do_osd_ops()
\-PrimaryLogPG::issue_repop()
\-ReplicatedBackend::submit_transaction()
|-ReplicatedBackend::generate_transaction()
|-ReplicatedBackend::issue_op()
\-PrimaryLogPG::queue_transactions()
\-ObjectStore::queue_transactions()
ceph/src/osd/OSD.cc:
void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
{
/*通过线程号取余的方式找到每个线程片时的分片,可能存在两个线程处理一个分片的情况*/
uint32_t shard_index = thread_index % num_shards;
ShardData *sdata = shard_list[shard_index];
/*通过锁机制同步请求的放入与取出,以及多个线程并发取出的场景*/
sdata->sdata_op_ordering_lock.Lock();
...
/*取出一个操作对象到item*/
pair<spg_t, PGQueueable> item = sdata->pqueue->dequeue();
...
sdata->sdata_op_ordering_lock.Unlock();
...
/*查找item对应的pg对象并为该pg加锁,类型为PrimaryLogPG*/
pg = osd->_lookup_lock_pg(item.first);
...
sdata->sdata_op_ordering_lock.Lock();
...
/*对于查找到的pg对象会放入一个临时的slot结构中进行同步加锁*/
qi = slot.to_process.front();
slot.to_process.pop_front();
...
sdata->sdata_op_ordering_lock.Unlock();
...
/*调用实际的处理函数,最终将执行PrimaryLogPG::do_request()*/
qi->run(osd, pg, tp_handle);
...
/*解锁pg*/
pg->unlock();
}
每个PG在处理op时,会为当前op以及op操作的对象生成一个context,用来保存此次操作相关的信息:
ceph/src/osd/PrimaryLogPG.cc:
void PrimaryLogPG::do_request(OpRequestRef& op, ThreadPool::TPHandle &handle)
{
.../*针对op进行一系列的有效性判断*/
switch (op->get_req()->get_type()) {
/*针对不同的请求类型进行不同的处理*/
case CEPH_MSG_OSD_OP:
do_op(op);
break;
...
}
}
void PrimaryLogPG::do_op(OpRequestRef& op)
{
.../*还是一堆锁碎的状态检查*/
MOSDOp *m = static_cast<MOSDOp*>(op->get_nonconst_req());
...
/*在当前PG中查找或生成一个新的对象上下文,用来保存对象修改的相关信息*/
ObjectContextRef obc;
int r = find_object_context(
oid, &obc, can_create,
m->has_flag(CEPH_OSD_FLAG_MAP_SNAP_CLONE),
&missing_oid);
...
/*生成一个新op上下文*/
OpContext *ctx = new OpContext(op, m->get_reqid(), &m->ops, obc, this);
...
/*执行该op上下文*/
execute_ctx(ctx);
...
}
PG在执行op上下文时,会使用事务的方式保证多个修改操作的原子性。另外事务具有多个层级,PG中使用PGTransaction;底层ObjectStore中会使用ObjectStore::Transaction进行进一步封装。
ceph/src/osd/PrimaryLogPG.cc:
void PrimaryLogPG::execute_ctx(OpContext *ctx)
{
OpRequestRef op = ctx->op; /*上下文上包含的op*/
const MOSDOp *m = static_cast<const MOSDOp*>(op->get_req()); /*op对应的原始请求消息*/
ObjectContextRef obc = ctx->obc; /*op操作的对象上下文*/
const hobject_t& soid = obc->obs.oi.soid; /*对象id*/
ctx->op_t.reset(new PGTransaction); /*设置一个新的PGTransaction事务对象*/
...
/*将上下文中的多个子操作放入到事务中,普通读写只有一个子操作*/
int result = prepare_transaction(ctx);
...
/*对于读操作,在prepare_transaction中将完成对象的读取,这里将直接返回响应消息给客户端*/
if ((ctx->op_t->empty() || result < 0) && !ctx->update_log_only) {
...
complete_read_ctx(result, ctx);
return;
}
/*以下均针对写操作*/
/*注册所有复本写操作均完成后的回调函数,该函数将发送响应给客户端*/
ctx->register_on_commit(...);
...
/*生成一组复本操作,并将这些操作发射出去:本地复本将写到后端ObjectStore,远端复本将通过网络消息发送并等待响应*/
ceph_tid_t rep_tid = osd->get_tid();
RepGather *repop = new_repop(ctx, obc, rep_tid);
issue_repop(repop, ctx);
eval_repop(repop);
repop->put();
}
int PrimaryLogPG::prepare_transaction(OpContext *ctx)
{
...
int result = do_osd_ops(ctx, *ctx->ops);
...
}
int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
{
...
/*循环处理每个子操作*/
for (vector<OSDOp>::iterator p = ops.begin(); p != ops.end(); ++p, ctx->current_osd_subop_num++) {
OSDOp& osd_op = *p;
ceph_osd_op& op = osd_op.op;
switch (op.op) {
...
case CEPH_OSD_OP_READ:
++ctx->num_read;
result = do_read(ctx, osd_op);
break;
...
case CEPH_OSD_OP_WRITE:
++ctx->num_write;
t->write(soid, op.extent.offset, op.extent.length, osd_op.indata, op.flags);
break;
...
}
}
}
int PrimaryLogPG::do_read(OpContext *ctx, OSDOp& osd_op)
{
...
/*针对读请求,通过后端同步读接口直接读取对象内容*/
int r = pgbackend->objects_read_sync(
soid, op.extent.offset, op.extent.length, op.flags, &osd_op.outdata);
...
}
ceph/src/osd/PGTransaction.h:
/*针对写操作,会将几个关键参数放入事务的buffer_updates中*/
void write(
const hobject_t &hoid, ///< [in] object to write
uint64_t off, ///< [in] off at which to write
uint64_t len, ///< [in] len to write from bl
bufferlist &bl, ///< [in] bl to write will be claimed to len
uint32_t fadvise_flags = 0 ///< [in] fadvise hint
) {
auto &op = get_object_op_for_modify(hoid);
op.buffer_updates.insert(
off,
len,
ObjectOperation::BufferUpdate::Write{bl, fadvise_flags});
}
针对写操作的多复本操作,将会提交给ReplicatedBackend对象进行处理:
ceph/src/osd/PrimaryLogPG.cc:
void PrimaryLogPG::issue_repop(RepGather *repop, OpContext *ctx)
{
...
/*生成复本操作全部完成后的回调对象*/
Context *on_all_commit = new C_OSD_RepopCommit(this, repop);
Context *on_all_applied = new C_OSD_RepopApplied(this, repop);
...
/*递交给ReplicatedBackend对象处理*/
pgbackend->submit_transaction(
soid,
ctx->delta_stats,
ctx->at_version,
std::move(ctx->op_t),
pg_trim_to,
min_last_complete_ondisk,
ctx->log,
ctx->updated_hset_history,
onapplied_sync,
on_all_applied,
on_all_commit,
repop->rep_tid,
ctx->reqid,
ctx->op);
}
ceph/src/osd/ReplicatedBackend.cc:
void ReplicatedBackend::submit_transaction(
const hobject_t &soid,
const object_stat_sum_t &delta_stats,
const eversion_t &at_version,
PGTransactionUPtr &&_t,
const eversion_t &trim_to,
const eversion_t &roll_forward_to,
const vector<pg_log_entry_t> &_log_entries,
boost::optional<pg_hit_set_history_t> &hset_history,
Context *on_local_applied_sync,
Context *on_all_acked,
Context *on_all_commit,
ceph_tid_t tid,
osd_reqid_t reqid,
OpRequestRef orig_op)
{
ObjectStore::Transaction op_t;
...
/*将PGTransaction对象封装到ObjectStore:: Transaction中*/
generate_transaction(
t,
coll,
(get_osdmap()->require_osd_release < CEPH_RELEASE_KRAKEN),
log_entries,
&op_t,
&added,
&removed);
/*生成一个新的代表当前操作对象的InProgressOp*/
InProgressOp &op = in_progress_ops.insert(
make_pair(
tid,
InProgressOp(
tid, on_all_commit, on_all_acked,
orig_op, at_version)
)
).first->second;
/*记录当前操作需要等待哪些OSD返回结果,包含本地OSD和远端OSD*/
op.waiting_for_applied.insert(
parent->get_actingbackfill_shards().begin(), parent->get_actingbackfill_shards().end());
op.waiting_for_commit.insert(
parent->get_actingbackfill_shards().begin(), parent->get_actingbackfill_shards().end());
/*对于发往远端OSD的复本请求,通过网络发送CEPH_MSG_REPOP消息*/
issue_op(
soid,
at_version,
tid,
reqid,
trim_to,
at_version,
added.size() ? *(added.begin()) : hobject_t(),
removed.size() ? *(removed.begin()) : hobject_t(),
log_entries,
hset_history,
&op,
op_t);
/*对于发送本地OSD的复本请求*/
/*先注册本地完成后的回调*/
op_t.register_on_applied_sync(on_local_applied_sync);/*写到数据区并同步回刷之后触发*/
op_t.register_on_applied(
parent->bless_context(new C_OSD_OnOpApplied(this, &op)));/*写到数据区之后触发*/
op_t.register_on_commit(
parent->bless_context(new C_OSD_OnOpCommit(this, &op)));/*提交到日志区之后触发*/
/*再将事务发送给后端存储池进行处理*/
vector<ObjectStore::Transaction> tls;
tls.push_back(std::move(op_t));
parent->queue_transactions(tls, op.op); /*最终会调用不同ObjectStore的queue_transactions函数*/
}
OSD模块整体分析完毕,后续我们将继续分析FileStore的实现原理。