【Rados Block Device】七、OSD原理分析-OSD模块

  OSD进程从网络收到客户端的读写请求后,交由OSD模块执行核心的请求处理逻辑,本篇博文将讨论OSD模块的实现原理。

OSD模块中对象、流程概览

  我们先整体来看一下OSD模块的对象和流程:

  回顾前文对SimpleMessenger的分析,我们看到ms_pipe_read线程从网络中收到请求消息后,通过fast_dispatch接口将消息分发给OSD对象进行处理。OSD对象针对接收到的每一个消息,都会将它们放入一个工作队列中(ShardedOpWQ,片式队列)。到这里,ms_pipe_read线程的分发动作就执行完了。对于一个片式队列,会有若干个处理线程,即图中的tp_osd_tp线程,每个处理线程负责处理不同分片中的消息。它们将各自分片中的消息取出后,找到每个消息对应的PG对象(Placement Group),进而将消息封装成操作(op)转给PG对象处理。PG对象针对读操作将直接从filestore中读出内容并返回响应消息给客户端;而对于写操作,PG对象将请求以事务(Transaction)的方式提交给PGBackend对象(本文主要讨论ReplicatedBackend),最终事务内的操作会转变成对filestore的操作(我们将在独立的博文中讨论filestore模块的实现原理)。

  为什么一个请求消息要在两个线程(ms_pipe_read和tp_osd_tp)间传递处理?其实这里体现了ceph一个核心的设计理念:流水线。将请求的处理分成多个步骤,每个步骤放在不同的线程中处理;请求从一个线程流动到下一个线程,类似工产里的流水流;这样可以大大提升处理请求的吞吐量(即每秒完成的请求数量)。那么时延呢?

OSD模块中类的概览

  下面我们来看看OSD模块中涉及的主要类及其关系:

  

  • OSD是核心类,它内部包含两个Messenger对象指针,分别指向cluster网络(cluster_messenger)和public网络(client_messenger)。store指向后端对象存储池,用来进行实际的对象存取操作。内部包含一个片式队列(ShardedOpWQ)和一个处理线程池(SharedThreadPool),OSD对象将请求消息放入片式队列中,再由不同的处理线程从队列中取出消息进行下一步处理。OSD中还包含全局的OSDMap和映射到本OSD的所有PG对象。
  • ShardedOpWQ类代表片式队列,number_shards是队列中总的分片数,每个分片都包含一个ShardedData,其内部有一个优先级队列用来接收请求消息(通过_enqueue操作)。每个片式队列都关联一个处理线程池,池中的每个线程都通过_process接口从对应队列中取出消息进行后续处理。
  • PrimaryLogPG类继承PG类,代表具体的Placement Group的一种实现。每个PrimaryLogPG对象包含一个pgbackend对象,该对象负责数据复本的处理。目前有两种数据复本的实现方式,Replicated(复制)和EC(校验码),分别对应ReplicatedBackend和ECBackend。

代码详解

1. 初始化过程

  OSD模块的初始化代码位于OSD:init中,代码流程比较锁碎。这里我们给出初始化调用栈,并作一些简要说明:

  • 在挂载后端ObjectStore后,OSD模块调用load_pgs开始加载后端ObjectStore中保存的pg对象。这里先枚举ObjectStore中所有的pg,例如对于filestore,将查找CURRENT目录下的所有子目录(每个子目录代表一个pg);然后打开该pg(生成具体的PG对象,如PrimaryLogPG)并读取pg状态信息。
  • 启动osd_op_tp线程池,开始对op_shardedwq中的消息进行处理。

  

ceph_osd.cc:main()
    \-OSD::init()
        |-OSD::load_pgs()
        |   |-ObjectStore::list_collections()
        |   |-OSD::_open_lock_pg()
        |   |-ObjectStore::open_collections()
        |   \-PG::read_state()
        \-osd_op_tp.start()

2. 请求处理过程

  我们先来分析一下ms_pipe_read中的请求消息分发流程,其栈心处理函数为OSD::ms_fast_dispatch:

OSD::ms_fast_dispatch()
    |-OpTracker::create_request()
    \-OSD::dispatch_session_waiting()
        \-OSD::enqueue_op()
            \-ShardedOpWQ::queue()
ceph/src/osd/OSD.cc:

void OSD::ms_fast_dispatch(Message *m)
{
    /*将消息封装成op*/
    OpRequestRef op = op_tracker.create_request<OpRequest, Message*>(m);
    ...
    
    if (m->get_connection()->has_features(CEPH_FEATUREMASK_RESEND_ON_SPLIT) ||
        m->get_type() != CEPH_MSG_OSD_OP) {
        // queue it directly
        ...
    } else {
        // legacy client, and this is an MOSDOp (the *only* fast dispatch
        // message that didn't have an explicit spg_t); we need to map
        // them to an spg_t while preserving delivery order.

        /*将op放入当前连接的会话上下文中,待获取到OSDMap后进行处理*/
        Session *session = static_cast<Session*>(m->get_connection()->get_priv());
        if (session) {
            {
                Mutex::Locker l(session->session_dispatch_lock);
                op->get();
                session->waiting_on_map.push_back(*op);
                OSDMapRef nextmap = service.get_nextmap_reserved();
                dispatch_session_waiting(session, nextmap);
                service.release_map(nextmap);
            }
            session->put();
        }
    } 
}

void OSD::dispatch_session_waiting(Session *session, OSDMapRef osdmap)
{
    /*遍历session中waiting_on_map中的每个op进行处理*/
    auto i = session->waiting_on_map.begin();
    while (i != session->waiting_on_map.end()) {
        OpRequestRef op = &(*i);
        const MOSDFastDispatchOp *m = static_cast<const MOSDFastDispatchOp*>(op->get_req());
        ...
        session->waiting_on_map.erase(i++);
        op->put();

        spg_t pgid;
        if (m->get_type() == CEPH_MSG_OSD_OP) {
            /*根据消息中记录的pg(对象名称的hash值)计算实际pg(根据pg数取余)*/
            pg_t actual_pgid = osdmap->raw_pg_to_pg(static_cast<const MOSDOp*>(m)->get_pg());
            if (!osdmap->get_primary_shard(actual_pgid, &pgid)) {
                continue;
            }
        } else {
            pgid = m->get_spg();
        }
        /*依据实际pgid将消息放入片式队列*/
        enqueue_op(pgid, op, m->get_map_epoch());
    }
    ...
}

void OSD::enqueue_op(spg_t pg, OpRequestRef& op, epoch_t epoch)
{
    ...
    op_shardedwq.queue(make_pair(pg, PGQueueable(op, epoch)));
}

  接下来我们看一下tp_osd_tp线程是如何处理分片中的请求,线程处理的核心函数是ShardedOpWQ::_process,其调用栈如下:

ShardedOpWQ::_process()
    |-OpQueue<>::dequeue()
    |-OSD::_look_up_pg()
    \-PGQueueable::run()
        \-PrimrayLogPG::do_request()
            \-PrimaryLogPG::do_op()
                \-PrimaryLogPG::execute_ctx()
                    |-PrimaryLogPG::prepare_transaction()
                    |   \-PrimaryLogPG::do_osd_ops()
                    \-PrimaryLogPG::issue_repop()
                        \-ReplicatedBackend::submit_transaction()
                            |-ReplicatedBackend::generate_transaction()
                            |-ReplicatedBackend::issue_op()
                            \-PrimaryLogPG::queue_transactions()
                                \-ObjectStore::queue_transactions()
ceph/src/osd/OSD.cc:

void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
{
    /*通过线程号取余的方式找到每个线程片时的分片,可能存在两个线程处理一个分片的情况*/
    uint32_t shard_index = thread_index % num_shards;
    ShardData *sdata = shard_list[shard_index];
    
    /*通过锁机制同步请求的放入与取出,以及多个线程并发取出的场景*/
    sdata->sdata_op_ordering_lock.Lock();
    ...

    /*取出一个操作对象到item*/
    pair<spg_t, PGQueueable> item = sdata->pqueue->dequeue();
    ...

    sdata->sdata_op_ordering_lock.Unlock();
    ...

    /*查找item对应的pg对象并为该pg加锁,类型为PrimaryLogPG*/
    pg = osd->_lookup_lock_pg(item.first);
    ...

    sdata->sdata_op_ordering_lock.Lock();
    ...

    /*对于查找到的pg对象会放入一个临时的slot结构中进行同步加锁*/
    qi = slot.to_process.front();
    slot.to_process.pop_front();
    ...

    sdata->sdata_op_ordering_lock.Unlock();
    ...

    /*调用实际的处理函数,最终将执行PrimaryLogPG::do_request()*/
    qi->run(osd, pg, tp_handle);
    ...

    /*解锁pg*/
    pg->unlock();
}

  每个PG在处理op时,会为当前op以及op操作的对象生成一个context,用来保存此次操作相关的信息:

ceph/src/osd/PrimaryLogPG.cc:

void PrimaryLogPG::do_request(OpRequestRef& op, ThreadPool::TPHandle &handle)
{
    .../*针对op进行一系列的有效性判断*/

    switch (op->get_req()->get_type()) {
    /*针对不同的请求类型进行不同的处理*/
    case CEPH_MSG_OSD_OP:
        do_op(op);
        break;
    ...
    }
}

void PrimaryLogPG::do_op(OpRequestRef& op)
{
    .../*还是一堆锁碎的状态检查*/
    
    MOSDOp *m = static_cast<MOSDOp*>(op->get_nonconst_req());
    ...
    
    /*在当前PG中查找或生成一个新的对象上下文,用来保存对象修改的相关信息*/
    ObjectContextRef obc;
    int r = find_object_context(
            oid, &obc, can_create,
            m->has_flag(CEPH_OSD_FLAG_MAP_SNAP_CLONE),
            &missing_oid);
    ...
    
    /*生成一个新op上下文*/
    OpContext *ctx = new OpContext(op, m->get_reqid(), &m->ops, obc, this);
    ...

    /*执行该op上下文*/
    execute_ctx(ctx);
    ...
}

  PG在执行op上下文时,会使用事务的方式保证多个修改操作的原子性。另外事务具有多个层级,PG中使用PGTransaction;底层ObjectStore中会使用ObjectStore::Transaction进行进一步封装。

ceph/src/osd/PrimaryLogPG.cc:

void PrimaryLogPG::execute_ctx(OpContext *ctx)
{
    OpRequestRef op = ctx->op; /*上下文上包含的op*/
    const MOSDOp *m = static_cast<const MOSDOp*>(op->get_req()); /*op对应的原始请求消息*/
    ObjectContextRef obc = ctx->obc; /*op操作的对象上下文*/
    const hobject_t& soid = obc->obs.oi.soid; /*对象id*/

    ctx->op_t.reset(new PGTransaction); /*设置一个新的PGTransaction事务对象*/
    ...

    /*将上下文中的多个子操作放入到事务中,普通读写只有一个子操作*/
    int result = prepare_transaction(ctx);
    ...

    /*对于读操作,在prepare_transaction中将完成对象的读取,这里将直接返回响应消息给客户端*/
    if ((ctx->op_t->empty() || result < 0) && !ctx->update_log_only) {
        ...
        complete_read_ctx(result, ctx);
        return;
    }
    
    /*以下均针对写操作*/
    
    /*注册所有复本写操作均完成后的回调函数,该函数将发送响应给客户端*/
    ctx->register_on_commit(...);
    ...

    /*生成一组复本操作,并将这些操作发射出去:本地复本将写到后端ObjectStore,远端复本将通过网络消息发送并等待响应*/
    ceph_tid_t rep_tid = osd->get_tid();

    RepGather *repop = new_repop(ctx, obc, rep_tid);

    issue_repop(repop, ctx);
    eval_repop(repop);
    repop->put();
}

int PrimaryLogPG::prepare_transaction(OpContext *ctx)
{
    ...
    int result = do_osd_ops(ctx, *ctx->ops);
    ...
}

int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
{
    ...
    /*循环处理每个子操作*/
    for (vector<OSDOp>::iterator p = ops.begin(); p != ops.end(); ++p, ctx->current_osd_subop_num++) {
        OSDOp& osd_op = *p;
        ceph_osd_op& op = osd_op.op;

        switch (op.op) {
        ...
        case CEPH_OSD_OP_READ:
            ++ctx->num_read;
            result = do_read(ctx, osd_op);
            break;
        ...
        case CEPH_OSD_OP_WRITE:
            ++ctx->num_write;
            t->write(soid, op.extent.offset, op.extent.length, osd_op.indata, op.flags);
            break;
        ...
        }
    }
}

int PrimaryLogPG::do_read(OpContext *ctx, OSDOp& osd_op)
{
    ...

    /*针对读请求,通过后端同步读接口直接读取对象内容*/
    int r = pgbackend->objects_read_sync(
            soid, op.extent.offset, op.extent.length, op.flags, &osd_op.outdata);
    ...
}

ceph/src/osd/PGTransaction.h:

/*针对写操作,会将几个关键参数放入事务的buffer_updates中*/
void write(
    const hobject_t &hoid,         ///< [in] object to write
    uint64_t off,                  ///< [in] off at which to write
    uint64_t len,                  ///< [in] len to write from bl
    bufferlist &bl,                ///< [in] bl to write will be claimed to len
    uint32_t fadvise_flags = 0     ///< [in] fadvise hint
) {
    auto &op = get_object_op_for_modify(hoid);
    op.buffer_updates.insert(
        off,
        len,
        ObjectOperation::BufferUpdate::Write{bl, fadvise_flags});
}

  针对写操作的多复本操作,将会提交给ReplicatedBackend对象进行处理:

ceph/src/osd/PrimaryLogPG.cc:

void PrimaryLogPG::issue_repop(RepGather *repop, OpContext *ctx)
{
    ...

    /*生成复本操作全部完成后的回调对象*/
    Context *on_all_commit = new C_OSD_RepopCommit(this, repop);
    Context *on_all_applied = new C_OSD_RepopApplied(this, repop);
    ...

    /*递交给ReplicatedBackend对象处理*/
    pgbackend->submit_transaction(
        soid,
        ctx->delta_stats,
        ctx->at_version,
        std::move(ctx->op_t),
        pg_trim_to,
        min_last_complete_ondisk,
        ctx->log,
        ctx->updated_hset_history,
        onapplied_sync,
        on_all_applied,
        on_all_commit,
        repop->rep_tid,
        ctx->reqid,
        ctx->op);
}

ceph/src/osd/ReplicatedBackend.cc:

void ReplicatedBackend::submit_transaction(
    const hobject_t &soid,
    const object_stat_sum_t &delta_stats,
    const eversion_t &at_version,
    PGTransactionUPtr &&_t,
    const eversion_t &trim_to,
    const eversion_t &roll_forward_to,
    const vector<pg_log_entry_t> &_log_entries,
    boost::optional<pg_hit_set_history_t> &hset_history,
    Context *on_local_applied_sync,
    Context *on_all_acked,
    Context *on_all_commit,
    ceph_tid_t tid,
    osd_reqid_t reqid,
    OpRequestRef orig_op)
{
    ObjectStore::Transaction op_t;
    ...
    
    /*将PGTransaction对象封装到ObjectStore:: Transaction中*/
    generate_transaction(
        t,
        coll,
        (get_osdmap()->require_osd_release < CEPH_RELEASE_KRAKEN),
        log_entries,
        &op_t,
        &added,
        &removed);

    /*生成一个新的代表当前操作对象的InProgressOp*/
    InProgressOp &op = in_progress_ops.insert(
        make_pair(
            tid,
            InProgressOp(
                tid, on_all_commit, on_all_acked,
                orig_op, at_version)
        )
    ).first->second;

    /*记录当前操作需要等待哪些OSD返回结果,包含本地OSD和远端OSD*/
    op.waiting_for_applied.insert(
        parent->get_actingbackfill_shards().begin(), parent->get_actingbackfill_shards().end());
    op.waiting_for_commit.insert(
        parent->get_actingbackfill_shards().begin(), parent->get_actingbackfill_shards().end());

    /*对于发往远端OSD的复本请求,通过网络发送CEPH_MSG_REPOP消息*/
    issue_op(
        soid,
        at_version,
        tid,
        reqid,
        trim_to,
        at_version,
        added.size() ? *(added.begin()) : hobject_t(),
        removed.size() ? *(removed.begin()) : hobject_t(),
        log_entries,
        hset_history,
        &op,
        op_t);

    /*对于发送本地OSD的复本请求*/
    /*先注册本地完成后的回调*/
    op_t.register_on_applied_sync(on_local_applied_sync);/*写到数据区并同步回刷之后触发*/
    op_t.register_on_applied(
        parent->bless_context(new C_OSD_OnOpApplied(this, &op)));/*写到数据区之后触发*/
    op_t.register_on_commit(
        parent->bless_context(new C_OSD_OnOpCommit(this, &op)));/*提交到日志区之后触发*/
    
    /*再将事务发送给后端存储池进行处理*/
    vector<ObjectStore::Transaction> tls;
    tls.push_back(std::move(op_t));

    parent->queue_transactions(tls, op.op); /*最终会调用不同ObjectStore的queue_transactions函数*/
}

  OSD模块整体分析完毕,后续我们将继续分析FileStore的实现原理。


转载请注明:吴斌的博客 » 【Rados Block Device】七、OSD原理分析-OSD模块