【Rados Block Device】六、OSD原理分析-SimpleMessenger模块

  OSD进程通过网络对Client提供服务,因此网络层是OSD中的基础层。本篇博文将讨论ceph中传统的SimpleMessenger实现原理。

SimpleMessenger模块中对象、流程概览

  如果将OSD进程的网络服务模式配置成SimpleMessenger,那么它采用的是POSIX标准网络接口来实现网络功能。也就是说,此时我们的OSD服务从代码实现流程上来说就是通过socket()->bind()->listen()->accept()进行连接建立,随后再通过每个连接进行网络消息的收发。虽然SimpleMessenger在实现过程中融入一些设计模式的抽象,但是抓住以上POSIX网络编程核心流程后将便于大家理解其实现机理。

  我们先整体来看一下SimpleMessenger的对象和流程:

  我们知道一个ceph集群中通常会有两个网络平面:public和cluster,那么每个OSD进程中就会对应产生两个messenger对象ms_public和ms_cluster。对于每个messenger对象(这里其实为SimpleMessenger对象),会产生一个名为ms_accepter的线程,messenger对象通过accepter成员变量指向该线程。ms_accepter线程是在完成socket()->bind()->listen()动作之后产生,它的主要作用就是一直监听Client端的网络连接请求。

  当某一个Client发起连接请求后,ms_accepter将调用accept()接受请求,并为该连接产生一个pipe对象。pipe对象是对TCP socket连接的封装,可以实现故障重连等可靠性增强特性。每个pipe对象会产生两个线程:一个叫ms_pipe_read线程,它一直在监听socket连接中的消息,如果发现有消息它将取出消息并将消息放入in_q中进行分发处理;另一个叫ms_pipe_write线程,它一直在等待out_q中被放入发送消息,如果发现out_q中有消息,它将把消息取出并通过socket连接将其发送出去。

  在消息接收处理的过程中,in_q队列指向的其实是SimpleMessenger对象中的DispatchQueue,也就是说对于同一个messenger中的多个pipe,它们接收的消息将被放入到同一个队列中等待处理。DispatchQueue分发消息时如果发现该消息可以被快速处理(fast dispatch)时,会将该消息直接分发给SimpleMessenger,由SimpleMessenger将消息分发给其内部的多个Dispatcher对象进行最终的消息处理。这里OSD模块中的OSD对象就是属于SimpleMessenger的一个Dispatcher,OSD对消息的处理属于OSD模块的内容,我们将在后续博文中介绍。DispatchQueue如果发现该消息无法被快速处理,则会将该消息交给DispatchQueue对应的DispatchThread处理。DispatchThread线程取出消息后会传递给messenger通过ms_deliver_dispatch进行普通处理,其实最终也会交给Dispatcher(如OSD对象)处理,只不过是在DispatchThread线程中被处理(fast dispatch是在ms_pipe_read线程中被处理,请大家注意对比)。

SimpleMessenger模块中类的概览

  在了解了SimpleMessenger的实现流程后,我们再来看看它的类定义,从而理解它是如何实现抽象,以支持未来更灵活的扩展:

  图中标红的类(Messenger、Dispatcher、DispatchQueue、Connection)属于Messenger模块抽象类(不属于某一种网络实现模式),不同的网络实现模式可以继承它们实现各自特有的功能。

  • Messenger类是SimpleMessenger、AsyncMessenger、XioMessenger的父类,它描绘了messenger对象的功能框架。每个messenger对象包含两个Dispatcher链表,dispatchers代表普通处理对象,fast_dispatchers代表快速处理对象,以便实现针对不同消息类型采用不同处理方式的功能。Dispatcher对象是在初始流程中,通过add_dispatcher_header方法被加入到messenger对应的链表。create、bind、start、ready方法实现messenger对象的创建和初始化;wait等待messenger生命周期结束;shutdown关闭messenger对象。send_message实现了通过messenger对象发送一个消息的功能。ms_fast_dispatch实现消息的快速分发;ms_deliver_dispatch实现消息的普通分发;最终都将分发给messenger中的dispatchers进行处理。
  • Dispatcher类是接收消息的处理者。不同模块可以实现各自不同的Dispatcher,以实现对消息的不同处理逻辑。只要在初始化时通过messenger对象的add_dispatcher_*方法被加入到messenger中,便可保证该Dispatcher可以接收到消息并进行处理。
  • DispatchQueue类实现了一个分发队列。用户通过enqueue操作将消息放入队列,该队列可以按优先级对消息进行排序(PrioritizeQueue)。队列会产生一个专门的DispatchThread线程,由该线程负责从PrioritizedQueue中取出消息进行分发处理。此外,DispatchQueue也可通过can_fast_dispatch判断消息是否可以被快速处理,如果可以被快速处理,则直接调用fast_dispatch进行分发处理,否则调用enqueue进队列交由DispatchThread处理。
  • Connection类是对网络连接的抽象。子类通过实现send_message方法提供不同的网络发送方案。
  • SimpleMessenger类继承Messenger类实现基于POSIX网络接口的网络信使功能。它将实现Messenger类中的bind、start、ready方法,以完成网络socket的初始化。独有的Accepter对象将产生一个独立的线程监听网络连接请求。每当接受一个新连接时,都会生成一个新的Pipe对象,并通过add_accept_pipe方法将其加入到pipes列表中。SimpleMessenger包含了一个DispatchQueue来实现消息的普通分发和快速分发。
  • Pipe类代表一个连接会话,每个连接会产生一个reader_thread线程(入口函数为reader()方法)和一个writer_thread线程(入口函数为writer())。reader方法负责从网络层接收消息放入in_q并进行顶层分发处理;writer方法负责将out_q中消息通过网络发送。accept方法在接受连接时调用,connect方法在发起连接请求时调用。
  • Thread类是Common模块中的公共类,用来生成线程。create方法用来创建线程对象;entry方法为新线程的入口函数;join方法在等待子线程结束时调用。

代码详解

  有了对SimpleMessenger模块中对象、流程和类的整体认识之后,我们再结合代码来深入理解其实现细节。

1. 初始化过程

  SimpleMessenger模块的初始化在OSD进程的main函数中完成,整体调用栈如下所示:

ceph_osd.cc:main()
    |-Messenger::create()
    |   \-SimpleMessenger::SimpleMessenger()
    |-Messenger::bind() -> SimpleMessenger::bind()
    |   \-Accepter::bind()
    |       |-::socket()
    |       |-::bind()
    |       \-::listen()
    |-OSD::OSD()
    |-Messenger::start() -> SimpleMessenger::start()
    |-OSD::init()
    |   \-Messenger::add_dispatcher_head()
    |       \-Messenger::ready() -> SimpleMessenger::ready()
    |           \-Accepter::start()
    \-Messenger::wait() -> SimpleMessenger::wait()
    

  我们先来看看ceph_osd.cc中main函数里和Messenger初始化相关的部分代码:

ceph/src/ceph_osd.cc:

int main(int argc, const char **argv)
{
    ...

    /*这里我们假设public_msgr_type和cluster_msgr_type都为Simple*/
    std::string public_msgr_type = g_conf->ms_public_type.empty() ? g_conf->get_val<std::string>("ms_type") : g_conf->ms_public_type;
    std::string cluster_msgr_type = g_conf->ms_cluster_type.empty() ? g_conf->get_val<std::string>("ms_type") : g_conf->ms_cluster_type;

    /*根据不同类型创建messenger对象,这里将创建两个SimpleMessenger对象*/
    Messenger *ms_public = Messenger::create(g_ceph_context, public_msgr_type,
        entity_name_t::OSD(whoami), "client",
        getpid(),
        Messenger::HAS_HEAVY_TRAFFIC |
        Messenger::HAS_MANY_CONNECTIONS);
    Messenger *ms_cluster = Messenger::create(g_ceph_context, cluster_msgr_type,
        entity_name_t::OSD(whoami), "cluster",
        getpid(),
        Messenger::HAS_HEAVY_TRAFFIC |
        Messenger::HAS_MANY_CONNECTIONS);

    ...

    /*为messenger对象绑定IP地址,最终会调用Accepter::bind函数,由它调用系统的socket()、bind()和listen()函数*/
    r = ms_public->bind(g_conf->public_addr);
    ...
    r = ms_cluster->bind(g_conf->cluster_addr);

    ...

    /*将创建的ms_public和ms_cluster传递给OSD构造函数,建立一个新的OSD对象*/
    osd = new OSD(g_ceph_context,
                store,
                whoami,
                ms_cluster,
                ms_public,
                ms_hb_front_client,
                ms_hb_back_client,
                ms_hb_front_server,
                ms_hb_back_server,
                ms_objecter,
                &mc,
                g_conf->osd_data,
                g_conf->osd_journal);

    /*启动messenger对象,针对SimpleMessenger,其内部细节我们暂不用关心*/
    ms_public->start();
    ...
    ms_cluster->start();

    /*OSD执行初始化,下文将展开*/
    osd->init();

    ...

    /*整个初始化动作完成,OSD主线程进入等待状态*/
    ms_public->wait();
    ms_cluster->wait();
    ...
}
ceph/src/msg/Messenger.cc:

Messenger *Messenger::create(CephContext *cct, const string &type,
        entity_name_t name, string lname,
        uint64_t nonce, uint64_t cflags)
{
    int r = -1;
    if (type == "random") {
        static std::random_device seed;
        static std::default_random_engine random_engine(seed());
        static Spinlock random_lock;

        std::lock_guard<Spinlock> lock(random_lock);
        std::uniform_int_distribution<> dis(0, 1);
        r = dis(random_engine);
    }
    if (r == 0 || type == "simple")
        return new SimpleMessenger(cct, name, std::move(lname), nonce);
    else if (r == 1 || type.find("async") != std::string::npos)
        return new AsyncMessenger(cct, name, type, std::move(lname), nonce);
#ifdef HAVE_XIO
    else if ((type == "xio") &&
            cct->check_experimental_feature_enabled("ms-type-xio"))
    return new XioMessenger(cct, name, std::move(lname), nonce, cflags);
#endif
    lderr(cct) << "unrecognized ms_type '" << type << "'" << dendl;
    return nullptr;
}

  我们再深入看看OSD初始化过程中与Messenger相关的部分代码:

ceph/src/osd/OSD.cc:

int OSD::init()
{
    ...

    /*将OSD对象自身加到public messenger和cluster messenger的dispatchers中*/
    client_messenger->add_dispatcher_head(this);
    cluster_messenger->add_dispatcher_head(this);

    ...
}
ceph/src/msg/Messenger.h:

void add_dispatcher_head(Dispatcher *d) { 
    bool first = dispatchers.empty(); /*是否为添加到dispatchers链表中的第一个元素?*/
    dispatchers.push_front(d); /*加入到dispatchers*/
    if (d->ms_can_fast_dispatch_any()) /*如果可以进行fast dispatch则加入到fast_dispatchers中*/
        fast_dispatchers.push_front(d);
    if (first)
        ready(); /*如果是首个加入到dispatchers中的对象,则调用messenger对象的ready()*/
}
ceph/src/msg/simple/SimpleMessenger.cc:

void SimpleMessenger::ready()
{
    ldout(cct,10) << "ready " << get_myaddr() << dendl;
    dispatch_queue.start(); /*拉起dispatch_queue对应的dispatch_thread*/

    lock.Lock();
    if (did_bind)
        accepter.start(); /*拉起ms_accepter线程*/
    lock.Unlock();
}
ceph/src/msg/simple/Accept.cc:

int Accepter::start()
{
    ldout(msgr->cct,1) << __func__ << dendl;

    // start thread
    create("ms_accepter");

    return 0;
}

2. 连接建立过程

  SimpleMessenger对象初始化完成后,将拉起一个ms_accepter线程处理Client端的连接请求,该线程入口函数为Accepter::entry。

ceph/src/msg/simple/Accepter.cc:

void *Accepter::entry()
{
    int errors = 0;
    int ch;

    struct pollfd pfd[2];
    memset(pfd, 0, sizeof(pfd));

    pfd[0].fd = listen_sd;
    pfd[0].events = POLLIN | POLLERR | POLLNVAL | POLLHUP;
    pfd[1].fd = shutdown_rd_fd;
    pfd[1].events = POLLIN | POLLERR | POLLNVAL | POLLHUP;

    while (!done) {
        int r = poll(pfd, 2, -1); /*通过poll系统调用等待Client连接请求*/

        ...

        if (done) break;

        // accept
        sockaddr_storage ss;
        socklen_t slen = sizeof(ss);
        int sd = ::accept(listen_sd, (sockaddr*)&ss, &slen); /*收到请求后,accept该连接*/
        if (sd >= 0) {
            ...
            msgr->add_accept_pipe(sd); /*向SimpleMessenger对象中添加pipe*/
        } else {
            ...
        }
    }
}
ceph/src/msg/simple/SimpleMessenger.cc:

Pipe *SimpleMessenger::add_accept_pipe(int sd)
{
    lock.Lock();
    Pipe *p = new Pipe(this, Pipe::STATE_ACCEPTING, NULL);
    p->sd = sd;
    p->pipe_lock.Lock();
    p->start_reader(); /*拉起pipe对象的ms_pipe_read线程*/
    p->pipe_lock.Unlock();
    pipes.insert(p);
    accepting_pipes.insert(p);
    lock.Unlock();
    return p;
}
ceph/src/msg/simple/Pipe.cc:

void Pipe::start_reader()
{
    if (reader_needs_join) {
        reader_thread.join();
        reader_needs_join = false;
    }
    reader_running = true;
    reader_thread.create("ms_pipe_read", msgr->cct->_conf->ms_rwthread_stack_bytes);
}

3. 消息接收与分发过程

  每个pipe对象的ms_pipe_read线程被拉起后,会进行会话的协商过程(可以回顾下前期Client端RBD的messenger分析博文);完成后将处于等待接收消息的状态:

ceph/src/msg/simple/Pipe.cc:

void Pipe::reader()
{
    pipe_lock.Lock();

    if (state == STATE_ACCEPTING) {
        /*执行会话协商过程,协商成功后会拉起ms_pipe_write线程*/
        accept();
    }

    while (state != STATE_CLOSED &&
            state != STATE_CONNECTING) {

        // sleep if (re)connecting
        if (state == STATE_STANDBY) {
            /*如果pipe状态为STANDBY,说明底层连接故障且暂无消息处理,则进入睡眠*/
            cond.Wait(pipe_lock);
            continue;
        }

        pipe_lock.Unlock();

        char tag = -1;
        /*先从网络连接中读取一个字节的tag*/
        if (tcp_read((char*)&tag, 1) < 0) {
            pipe_lock.Lock();
            fault(true);
            continue;
        }
        
        /*根据tag值进行不同的处理动作*/
        if (tag == CEPH_MSGR_TAG_KEEPALIVE) {
            ...
            continue;
        }
        if (tag == CEPH_MSGR_TAG_KEEPALIVE2) {
            ...
            continue;
        }
        if (tag == CEPH_MSGR_TAG_KEEPALIVE2_ACK) {
            ...
            continue;
        }
        if (tag == CEPH_MSGR_TAG_ACK) {
            ...
            continue;
        }
        else if (tag == CEPH_MSGR_TAG_MSG) {
            /*针对消息,先从网络中读取消息内容到m中*/
            Message *m = 0;
            int r = read_message(&m, auth_handler.get());

            pipe_lock.Lock();
            if (m->get_seq() <= in_seq) {
                m->put();
                continue;
            }
            m->set_connection(connection_state.get());
            ...

            /*对消息进行预处理*/
            in_q->fast_preprocess(m);
            if (delay_thread) {
                ...
            } else {
                /*如果消息可以被快速处理,则走快速处理流程;否则就enqueue到in_q中交给dispatch_thread处理*/
                if (in_q->can_fast_dispatch(m)) {
                    reader_dispatching = true;
                    pipe_lock.Unlock();
                    in_q->fast_dispatch(m);
                    pipe_lock.Lock();
                    reader_dispatching = false;
                    ...
                } else {
                    in_q->enqueue(m, m->get_priority(), conn_id);
                }            
            }
        }
        ...
    }
    ...
}

  消息的快速处理流程可以回头参考前文的对象、流程图。

4. 消息发送过程

  任何模块想通过messenger发送消息时,都可以调用Messenger::send_mesage来完成。对于SimpleMessenger,其实现体位于SimpleMessenger.h,发送是一个异步过程,发送者只会将消息放入Pipe对象的out_q中,随后由ms_pipe_write线程完成向网络协议栈的发送。

  发送者的调用栈如下:

sender[any thread]
    \-SimpleMessenger::send_message()
        \-SimpleMessenger::_send_message()
            \-SimpleMessenger::submit_message()
                \-Pipe::_send()
ceph/src/msg/simple/SimpleMessenger.h:

int send_message(Message *m, const entity_inst_t& dest) override {
    return _send_message(m, dest);
}

int send_message(Message *m, Connection *con) {
    return _send_message(m, con);
}
ceph/src/msg/simple/SimpleMessenger.cc:

int SimpleMessenger::_send_message(Message *m, Connection *con)
{
    //set envelope
    m->get_header().src = get_myname();

    if (!m->get_priority()) m->set_priority(get_default_send_priority());

    submit_message(m, static_cast<PipeConnection*>(con),
                con->get_peer_addr(), con->get_peer_type(), false);
    return 0;
}

void SimpleMessenger::submit_message(Message *m, PipeConnection *con,
                const entity_addr_t& dest_addr, int dest_type,
                bool already_locked)
{
    ...
    if (con) {
        Pipe *pipe = NULL;
        bool ok = static_cast<PipeConnection*>(con)->try_get_pipe(&pipe);
        ...
        while (pipe && ok) {
            // we loop in case of a racing reconnect, either from us or them
            pipe->pipe_lock.Lock(); // can't use a Locker because of the Pipe ref
            if (pipe->state != Pipe::STATE_CLOSED) {
                pipe->_send(m);
                pipe->pipe_lock.Unlock();
                pipe->put();
                return;
            }
        }
        ...
    }
}
ceph/src/msg/simple/Pipe.h:

void _send(Message *m) {
    assert(pipe_lock.is_locked());
    out_q[m->get_priority()].push_back(m);
    cond.Signal();
}

  ms_pipe_write线程入口函数为Pipe::writer,其调用栈如下:

Pipe::writer()
    |-Pipe::_get_next_outgoing()
    \-Pipe::write_message()
        \-Pipe::do_sendmsg()

void Pipe::writer()
{
    pipe_lock.Lock();
    while (state != STATE_CLOSED) {
        ...
        Message *m = _get_next_outgoing();
        ...
        const ceph_msg_header& header = m->get_header();
        const ceph_msg_footer& footer = m->get_footer();
        bufferlist blist = m->get_payload();
        blist.append(m->get_middle());
        blist.append(m->get_data());
        pipe_lock.Unlock();

        write_message(header, footer, blist);
        ...
    }
}


转载请注明:吴斌的博客 » 【Rados Block Device】六、OSD原理分析-SimpleMessenger模块