OSD进程通过网络对Client提供服务,因此网络层是OSD中的基础层。本篇博文将讨论ceph中传统的SimpleMessenger实现原理。
SimpleMessenger模块中对象、流程概览
如果将OSD进程的网络服务模式配置成SimpleMessenger,那么它采用的是POSIX标准网络接口来实现网络功能。也就是说,此时我们的OSD服务从代码实现流程上来说就是通过socket()->bind()->listen()->accept()进行连接建立,随后再通过每个连接进行网络消息的收发。虽然SimpleMessenger在实现过程中融入一些设计模式的抽象,但是抓住以上POSIX网络编程核心流程后将便于大家理解其实现机理。
我们先整体来看一下SimpleMessenger的对象和流程:
我们知道一个ceph集群中通常会有两个网络平面:public和cluster,那么每个OSD进程中就会对应产生两个messenger对象ms_public和ms_cluster。对于每个messenger对象(这里其实为SimpleMessenger对象),会产生一个名为ms_accepter的线程,messenger对象通过accepter成员变量指向该线程。ms_accepter线程是在完成socket()->bind()->listen()动作之后产生,它的主要作用就是一直监听Client端的网络连接请求。
当某一个Client发起连接请求后,ms_accepter将调用accept()接受请求,并为该连接产生一个pipe对象。pipe对象是对TCP socket连接的封装,可以实现故障重连等可靠性增强特性。每个pipe对象会产生两个线程:一个叫ms_pipe_read线程,它一直在监听socket连接中的消息,如果发现有消息它将取出消息并将消息放入in_q中进行分发处理;另一个叫ms_pipe_write线程,它一直在等待out_q中被放入发送消息,如果发现out_q中有消息,它将把消息取出并通过socket连接将其发送出去。
在消息接收处理的过程中,in_q队列指向的其实是SimpleMessenger对象中的DispatchQueue,也就是说对于同一个messenger中的多个pipe,它们接收的消息将被放入到同一个队列中等待处理。DispatchQueue分发消息时如果发现该消息可以被快速处理(fast dispatch)时,会将该消息直接分发给SimpleMessenger,由SimpleMessenger将消息分发给其内部的多个Dispatcher对象进行最终的消息处理。这里OSD模块中的OSD对象就是属于SimpleMessenger的一个Dispatcher,OSD对消息的处理属于OSD模块的内容,我们将在后续博文中介绍。DispatchQueue如果发现该消息无法被快速处理,则会将该消息交给DispatchQueue对应的DispatchThread处理。DispatchThread线程取出消息后会传递给messenger通过ms_deliver_dispatch进行普通处理,其实最终也会交给Dispatcher(如OSD对象)处理,只不过是在DispatchThread线程中被处理(fast dispatch是在ms_pipe_read线程中被处理,请大家注意对比)。
SimpleMessenger模块中类的概览
在了解了SimpleMessenger的实现流程后,我们再来看看它的类定义,从而理解它是如何实现抽象,以支持未来更灵活的扩展:
图中标红的类(Messenger、Dispatcher、DispatchQueue、Connection)属于Messenger模块抽象类(不属于某一种网络实现模式),不同的网络实现模式可以继承它们实现各自特有的功能。
- Messenger类是SimpleMessenger、AsyncMessenger、XioMessenger的父类,它描绘了messenger对象的功能框架。每个messenger对象包含两个Dispatcher链表,dispatchers代表普通处理对象,fast_dispatchers代表快速处理对象,以便实现针对不同消息类型采用不同处理方式的功能。Dispatcher对象是在初始流程中,通过add_dispatcher_header方法被加入到messenger对应的链表。create、bind、start、ready方法实现messenger对象的创建和初始化;wait等待messenger生命周期结束;shutdown关闭messenger对象。send_message实现了通过messenger对象发送一个消息的功能。ms_fast_dispatch实现消息的快速分发;ms_deliver_dispatch实现消息的普通分发;最终都将分发给messenger中的dispatchers进行处理。
- Dispatcher类是接收消息的处理者。不同模块可以实现各自不同的Dispatcher,以实现对消息的不同处理逻辑。只要在初始化时通过messenger对象的add_dispatcher_*方法被加入到messenger中,便可保证该Dispatcher可以接收到消息并进行处理。
- DispatchQueue类实现了一个分发队列。用户通过enqueue操作将消息放入队列,该队列可以按优先级对消息进行排序(PrioritizeQueue)。队列会产生一个专门的DispatchThread线程,由该线程负责从PrioritizedQueue中取出消息进行分发处理。此外,DispatchQueue也可通过can_fast_dispatch判断消息是否可以被快速处理,如果可以被快速处理,则直接调用fast_dispatch进行分发处理,否则调用enqueue进队列交由DispatchThread处理。
- Connection类是对网络连接的抽象。子类通过实现send_message方法提供不同的网络发送方案。
- SimpleMessenger类继承Messenger类实现基于POSIX网络接口的网络信使功能。它将实现Messenger类中的bind、start、ready方法,以完成网络socket的初始化。独有的Accepter对象将产生一个独立的线程监听网络连接请求。每当接受一个新连接时,都会生成一个新的Pipe对象,并通过add_accept_pipe方法将其加入到pipes列表中。SimpleMessenger包含了一个DispatchQueue来实现消息的普通分发和快速分发。
- Pipe类代表一个连接会话,每个连接会产生一个reader_thread线程(入口函数为reader()方法)和一个writer_thread线程(入口函数为writer())。reader方法负责从网络层接收消息放入in_q并进行顶层分发处理;writer方法负责将out_q中消息通过网络发送。accept方法在接受连接时调用,connect方法在发起连接请求时调用。
- Thread类是Common模块中的公共类,用来生成线程。create方法用来创建线程对象;entry方法为新线程的入口函数;join方法在等待子线程结束时调用。
代码详解
有了对SimpleMessenger模块中对象、流程和类的整体认识之后,我们再结合代码来深入理解其实现细节。
1. 初始化过程
SimpleMessenger模块的初始化在OSD进程的main函数中完成,整体调用栈如下所示:
ceph_osd.cc:main()
|-Messenger::create()
| \-SimpleMessenger::SimpleMessenger()
|-Messenger::bind() -> SimpleMessenger::bind()
| \-Accepter::bind()
| |-::socket()
| |-::bind()
| \-::listen()
|-OSD::OSD()
|-Messenger::start() -> SimpleMessenger::start()
|-OSD::init()
| \-Messenger::add_dispatcher_head()
| \-Messenger::ready() -> SimpleMessenger::ready()
| \-Accepter::start()
\-Messenger::wait() -> SimpleMessenger::wait()
我们先来看看ceph_osd.cc中main函数里和Messenger初始化相关的部分代码:
ceph/src/ceph_osd.cc:
int main(int argc, const char **argv)
{
...
/*这里我们假设public_msgr_type和cluster_msgr_type都为Simple*/
std::string public_msgr_type = g_conf->ms_public_type.empty() ? g_conf->get_val<std::string>("ms_type") : g_conf->ms_public_type;
std::string cluster_msgr_type = g_conf->ms_cluster_type.empty() ? g_conf->get_val<std::string>("ms_type") : g_conf->ms_cluster_type;
/*根据不同类型创建messenger对象,这里将创建两个SimpleMessenger对象*/
Messenger *ms_public = Messenger::create(g_ceph_context, public_msgr_type,
entity_name_t::OSD(whoami), "client",
getpid(),
Messenger::HAS_HEAVY_TRAFFIC |
Messenger::HAS_MANY_CONNECTIONS);
Messenger *ms_cluster = Messenger::create(g_ceph_context, cluster_msgr_type,
entity_name_t::OSD(whoami), "cluster",
getpid(),
Messenger::HAS_HEAVY_TRAFFIC |
Messenger::HAS_MANY_CONNECTIONS);
...
/*为messenger对象绑定IP地址,最终会调用Accepter::bind函数,由它调用系统的socket()、bind()和listen()函数*/
r = ms_public->bind(g_conf->public_addr);
...
r = ms_cluster->bind(g_conf->cluster_addr);
...
/*将创建的ms_public和ms_cluster传递给OSD构造函数,建立一个新的OSD对象*/
osd = new OSD(g_ceph_context,
store,
whoami,
ms_cluster,
ms_public,
ms_hb_front_client,
ms_hb_back_client,
ms_hb_front_server,
ms_hb_back_server,
ms_objecter,
&mc,
g_conf->osd_data,
g_conf->osd_journal);
/*启动messenger对象,针对SimpleMessenger,其内部细节我们暂不用关心*/
ms_public->start();
...
ms_cluster->start();
/*OSD执行初始化,下文将展开*/
osd->init();
...
/*整个初始化动作完成,OSD主线程进入等待状态*/
ms_public->wait();
ms_cluster->wait();
...
}
ceph/src/msg/Messenger.cc:
Messenger *Messenger::create(CephContext *cct, const string &type,
entity_name_t name, string lname,
uint64_t nonce, uint64_t cflags)
{
int r = -1;
if (type == "random") {
static std::random_device seed;
static std::default_random_engine random_engine(seed());
static Spinlock random_lock;
std::lock_guard<Spinlock> lock(random_lock);
std::uniform_int_distribution<> dis(0, 1);
r = dis(random_engine);
}
if (r == 0 || type == "simple")
return new SimpleMessenger(cct, name, std::move(lname), nonce);
else if (r == 1 || type.find("async") != std::string::npos)
return new AsyncMessenger(cct, name, type, std::move(lname), nonce);
#ifdef HAVE_XIO
else if ((type == "xio") &&
cct->check_experimental_feature_enabled("ms-type-xio"))
return new XioMessenger(cct, name, std::move(lname), nonce, cflags);
#endif
lderr(cct) << "unrecognized ms_type '" << type << "'" << dendl;
return nullptr;
}
我们再深入看看OSD初始化过程中与Messenger相关的部分代码:
ceph/src/osd/OSD.cc:
int OSD::init()
{
...
/*将OSD对象自身加到public messenger和cluster messenger的dispatchers中*/
client_messenger->add_dispatcher_head(this);
cluster_messenger->add_dispatcher_head(this);
...
}
ceph/src/msg/Messenger.h:
void add_dispatcher_head(Dispatcher *d) {
bool first = dispatchers.empty(); /*是否为添加到dispatchers链表中的第一个元素?*/
dispatchers.push_front(d); /*加入到dispatchers*/
if (d->ms_can_fast_dispatch_any()) /*如果可以进行fast dispatch则加入到fast_dispatchers中*/
fast_dispatchers.push_front(d);
if (first)
ready(); /*如果是首个加入到dispatchers中的对象,则调用messenger对象的ready()*/
}
ceph/src/msg/simple/SimpleMessenger.cc:
void SimpleMessenger::ready()
{
ldout(cct,10) << "ready " << get_myaddr() << dendl;
dispatch_queue.start(); /*拉起dispatch_queue对应的dispatch_thread*/
lock.Lock();
if (did_bind)
accepter.start(); /*拉起ms_accepter线程*/
lock.Unlock();
}
ceph/src/msg/simple/Accept.cc:
int Accepter::start()
{
ldout(msgr->cct,1) << __func__ << dendl;
// start thread
create("ms_accepter");
return 0;
}
2. 连接建立过程
SimpleMessenger对象初始化完成后,将拉起一个ms_accepter线程处理Client端的连接请求,该线程入口函数为Accepter::entry。
ceph/src/msg/simple/Accepter.cc:
void *Accepter::entry()
{
int errors = 0;
int ch;
struct pollfd pfd[2];
memset(pfd, 0, sizeof(pfd));
pfd[0].fd = listen_sd;
pfd[0].events = POLLIN | POLLERR | POLLNVAL | POLLHUP;
pfd[1].fd = shutdown_rd_fd;
pfd[1].events = POLLIN | POLLERR | POLLNVAL | POLLHUP;
while (!done) {
int r = poll(pfd, 2, -1); /*通过poll系统调用等待Client连接请求*/
...
if (done) break;
// accept
sockaddr_storage ss;
socklen_t slen = sizeof(ss);
int sd = ::accept(listen_sd, (sockaddr*)&ss, &slen); /*收到请求后,accept该连接*/
if (sd >= 0) {
...
msgr->add_accept_pipe(sd); /*向SimpleMessenger对象中添加pipe*/
} else {
...
}
}
}
ceph/src/msg/simple/SimpleMessenger.cc:
Pipe *SimpleMessenger::add_accept_pipe(int sd)
{
lock.Lock();
Pipe *p = new Pipe(this, Pipe::STATE_ACCEPTING, NULL);
p->sd = sd;
p->pipe_lock.Lock();
p->start_reader(); /*拉起pipe对象的ms_pipe_read线程*/
p->pipe_lock.Unlock();
pipes.insert(p);
accepting_pipes.insert(p);
lock.Unlock();
return p;
}
ceph/src/msg/simple/Pipe.cc:
void Pipe::start_reader()
{
if (reader_needs_join) {
reader_thread.join();
reader_needs_join = false;
}
reader_running = true;
reader_thread.create("ms_pipe_read", msgr->cct->_conf->ms_rwthread_stack_bytes);
}
3. 消息接收与分发过程
每个pipe对象的ms_pipe_read线程被拉起后,会进行会话的协商过程(可以回顾下前期Client端RBD的messenger分析博文);完成后将处于等待接收消息的状态:
ceph/src/msg/simple/Pipe.cc:
void Pipe::reader()
{
pipe_lock.Lock();
if (state == STATE_ACCEPTING) {
/*执行会话协商过程,协商成功后会拉起ms_pipe_write线程*/
accept();
}
while (state != STATE_CLOSED &&
state != STATE_CONNECTING) {
// sleep if (re)connecting
if (state == STATE_STANDBY) {
/*如果pipe状态为STANDBY,说明底层连接故障且暂无消息处理,则进入睡眠*/
cond.Wait(pipe_lock);
continue;
}
pipe_lock.Unlock();
char tag = -1;
/*先从网络连接中读取一个字节的tag*/
if (tcp_read((char*)&tag, 1) < 0) {
pipe_lock.Lock();
fault(true);
continue;
}
/*根据tag值进行不同的处理动作*/
if (tag == CEPH_MSGR_TAG_KEEPALIVE) {
...
continue;
}
if (tag == CEPH_MSGR_TAG_KEEPALIVE2) {
...
continue;
}
if (tag == CEPH_MSGR_TAG_KEEPALIVE2_ACK) {
...
continue;
}
if (tag == CEPH_MSGR_TAG_ACK) {
...
continue;
}
else if (tag == CEPH_MSGR_TAG_MSG) {
/*针对消息,先从网络中读取消息内容到m中*/
Message *m = 0;
int r = read_message(&m, auth_handler.get());
pipe_lock.Lock();
if (m->get_seq() <= in_seq) {
m->put();
continue;
}
m->set_connection(connection_state.get());
...
/*对消息进行预处理*/
in_q->fast_preprocess(m);
if (delay_thread) {
...
} else {
/*如果消息可以被快速处理,则走快速处理流程;否则就enqueue到in_q中交给dispatch_thread处理*/
if (in_q->can_fast_dispatch(m)) {
reader_dispatching = true;
pipe_lock.Unlock();
in_q->fast_dispatch(m);
pipe_lock.Lock();
reader_dispatching = false;
...
} else {
in_q->enqueue(m, m->get_priority(), conn_id);
}
}
}
...
}
...
}
消息的快速处理流程可以回头参考前文的对象、流程图。
4. 消息发送过程
任何模块想通过messenger发送消息时,都可以调用Messenger::send_mesage来完成。对于SimpleMessenger,其实现体位于SimpleMessenger.h,发送是一个异步过程,发送者只会将消息放入Pipe对象的out_q中,随后由ms_pipe_write线程完成向网络协议栈的发送。
发送者的调用栈如下:
sender[any thread]
\-SimpleMessenger::send_message()
\-SimpleMessenger::_send_message()
\-SimpleMessenger::submit_message()
\-Pipe::_send()
ceph/src/msg/simple/SimpleMessenger.h:
int send_message(Message *m, const entity_inst_t& dest) override {
return _send_message(m, dest);
}
int send_message(Message *m, Connection *con) {
return _send_message(m, con);
}
ceph/src/msg/simple/SimpleMessenger.cc:
int SimpleMessenger::_send_message(Message *m, Connection *con)
{
//set envelope
m->get_header().src = get_myname();
if (!m->get_priority()) m->set_priority(get_default_send_priority());
submit_message(m, static_cast<PipeConnection*>(con),
con->get_peer_addr(), con->get_peer_type(), false);
return 0;
}
void SimpleMessenger::submit_message(Message *m, PipeConnection *con,
const entity_addr_t& dest_addr, int dest_type,
bool already_locked)
{
...
if (con) {
Pipe *pipe = NULL;
bool ok = static_cast<PipeConnection*>(con)->try_get_pipe(&pipe);
...
while (pipe && ok) {
// we loop in case of a racing reconnect, either from us or them
pipe->pipe_lock.Lock(); // can't use a Locker because of the Pipe ref
if (pipe->state != Pipe::STATE_CLOSED) {
pipe->_send(m);
pipe->pipe_lock.Unlock();
pipe->put();
return;
}
}
...
}
}
ceph/src/msg/simple/Pipe.h:
void _send(Message *m) {
assert(pipe_lock.is_locked());
out_q[m->get_priority()].push_back(m);
cond.Signal();
}
ms_pipe_write线程入口函数为Pipe::writer,其调用栈如下:
Pipe::writer()
|-Pipe::_get_next_outgoing()
\-Pipe::write_message()
\-Pipe::do_sendmsg()
void Pipe::writer()
{
pipe_lock.Lock();
while (state != STATE_CLOSED) {
...
Message *m = _get_next_outgoing();
...
const ceph_msg_header& header = m->get_header();
const ceph_msg_footer& footer = m->get_footer();
bufferlist blist = m->get_payload();
blist.append(m->get_middle());
blist.append(m->get_data());
pipe_lock.Unlock();
write_message(header, footer, blist);
...
}
}
转载请注明:吴斌的博客 » 【Rados Block Device】六、OSD原理分析-SimpleMessenger模块