4. libceph.ko中messenger模块分析
messenger模块(信使)是libceph中相对比较独立的部分,旨在为上层各种网络客户端(如mon_client、osd_client)提供稳定可靠、有序的网络服务。messenger构建在网络TCP协议之上,虽然TCP协议本身是面向连接且可靠的网络协议,但是TCP连接有可能断开(broken,非长连接情况下会发生;长连接存在底层网络故障排除后上层应用感知不及时的问题)。为解决TCP短连接不可靠的问题,messenger通过间隔性地重连方案(back off,当有消息要发送时)或者等待方案(stand by,当无消息要发送时)来解决。同时会给每个发送的消息带上唯一的序列号(seq),用以区分是否为重复消息。
messenger模块内部架构如下图所示:
- 从发送流程来看(图中实线),应用程序(如rbd命令)调用ceph_con_send进行消息发送;该函数会唤醒发送连接对应的工作任务;在工作任务进程中,通过try_write函数调用kernel_sendmsg访问底层网络协议栈,最终触发网卡发包;
- 从接收流程来看(图中虚线),网卡收包后通过中断和协议栈回调唤醒工作任务;工作任务通过try_read调用kernel_recvmsg从网络协议栈中读取消息内容;最后调用连接初始化时指定的消息回调函数对消息进行处理。
4.1. 正常网络收发的处理
4.1.1 socket连接阶段
每当上层通过ceph_con_init创建一个ceph_connection对象(对底层TCP连接的封装,含有自身的状态变化)并调用ceph_con_open打开该连接后(连接状态变为PREOPEN),就会在内核工作队列中添加一项新的任务,并对该任务进行一次调度,其入口函数为con_work,代码框架如下所示:
linux/net/ceph/messenger.c:
static void con_work(struct work_struct *work)
{
struct ceph_connection *con = container_of(work, struct ceph_connection,
work.work);
...
while (true) {
...
ret = try_read(con); /*socket连接初始化阶段不进行任何工作*/
...
ret = try_write(con); /*建立socket连接并设定socket回调函数*/
...
break;
}
...
}
conection worker首次被调度时进入socket连接阶段,完成的主要工作是建立TCP socket连接,并将ceph_connection的状态置为CONNECTING:
linux/net/ceph/messenger.c:
static int try_write(struct ceph_connection *con)
{
...
if (con->state == CON_STATE_PREOPEN) { /*首次被调度时,连接状态为PREOPEN*/
BUG_ON(con->sock);
con->state = CON_STATE_CONNECTING; /*执行完下面这些动作后状态更新为CONNECTING*/
con_out_kvec_reset(con); /*重置当前连接con的发送缓冲区,对应con->out_kvec_* */
prepare_write_banner(con); /*将客户端的banner内容放入发送缓冲区*/
prepare_read_banner(con); /*准备接收服务端的banner内容*/
BUG_ON(con->in_msg);
con->in_tag = CEPH_MSGR_TAG_READY; /*重置消息接收状态为READY,表示可接收任意消息类别,不同类别由CEPH_MSGR_TAG_*进行区分 */
ret = ceph_tcp_connect(con); /*创建并连接socket,同时指定协议栈的回调函数*/
if (ret < 0) {
con->error_msg = "connect error";
goto out;
}
}
more_kvec:
...
if (con->out_kvec_left) {
/*调用底层kernel_sendmsg将当前连接con发送缓冲中的内容进行发送*/
ret = write_partial_kvec(con);
if (ret <= 0)
goto out;
}
...
}
static int ceph_tcp_connect(struct ceph_connection *con)
{
struct sockaddr_storage *paddr = &con->peer_addr.in_addr;
struct socket *sock;
int ret;
BUG_ON(con->sock);
ret = sock_create_kern(con->peer_addr.in_addr.ss_family, SOCK_STREAM,
IPPROTO_TCP, &sock);
...
sock->sk->sk_allocation = GFP_NOFS;
set_sock_callbacks(sock, con);
con_sock_state_connecting(con);
ret = sock->ops->connect(sock, (struct sockaddr *)paddr, sizeof(*paddr),
O_NONBLOCK);
...
con->sock = sock;
return 0;
}
static void set_sock_callbacks(struct socket *sock,
struct ceph_connection *con)
{
struct sock *sk = sock->sk;
sk->sk_user_data = con;
sk->sk_data_ready = ceph_sock_data_ready; /*协议栈收到包后会回调该函数*/
sk->sk_write_space = ceph_sock_write_space;
sk->sk_state_change = ceph_sock_state_change; /*协议栈改变socket状态时会回调该函数*/
}
static void ceph_sock_data_ready(struct sock *sk, int count_unused)
{
struct ceph_connection *con = sk->sk_user_data;
...
queue_con(con); /*再次唤醒工作任务*/
...
}
4.1.2 协商阶段
socket连接阶段最后,向服务端发送了客户端的banner,并等待服务端回复banner。当收到服务端回复banner后,connection worker再次被唤醒,此时主要完成客户端与服务端的信息协商:
linux/net/ceph/messenger.c:
static void con_work(struct work_struct *work)
{
struct ceph_connection *con = container_of(work, struct ceph_connection,
work.work);
...
while (true) {
...
ret = try_read(con); /*读取服务端返回的banner并进入协商阶段*/
...
ret = try_write(con); /*向服务端发送协商信息*/
...
break;
}
...
}
static int try_read(struct ceph_connection *con)
{
...
if (con->state == CON_STATE_CONNECTING) {
ret = read_partial_banner(con); /*读取服务端的banner信息*/
if (ret <= 0)
goto out;
ret = process_banner(con); /*对banner进行校验*/
if (ret < 0)
goto out;
con->state = CON_STATE_NEGOTIATING; /*校验通过后进入协商阶段*/
/*
* Received banner is good, exchange connection info.
* Do not reset out_kvec, as sending our banner raced
* with receiving peer banner after connect completed.
*/
ret = prepare_write_connect(con); /*准备全局连接号等协商信息*/
if (ret < 0)
goto out;
prepare_read_connect(con); /*准备接收服务端返回的协商信息*/
/* Send connection info before awaiting response */
goto out;
}
...
}
4.1.3 正常打开阶段
收到服务端的协商消息后,connection worker再次被唤醒,进行服务端协商消息的处理并进入正常打开阶段可收发消息:
linux/net/ceph/messenger.c:
static void con_work(struct work_struct *work)
{
struct ceph_connection *con = container_of(work, struct ceph_connection,
work.work);
...
while (true) {
...
ret = try_read(con); /*读取服务端协商信息并进入正常打开阶段,可正常接收消息*/
...
ret = try_write(con); /*可正常发送消息*/
...
break;
}
...
}
static int try_read(struct ceph_connection *con)
{
...
if (con->state == CON_STATE_NEGOTIATING) {
ret = read_partial_connect(con); /*读取服务端协商信息到in_reply中*/
if (ret <= 0)
goto out;
ret = process_connect(con); /*处理协商消息并进入正常打开阶段*/
if (ret < 0)
goto out;
goto more;
}
...
}
static int process_connect(struct ceph_connection *con)
{
...
switch (con->in_reply.tag) { /*in_reply中记录服务端返回的协商信息*/
...
case CEPH_MSGR_TAG_SEQ:
case CEPH_MSGR_TAG_READY:
...
/*记录服务端返回的协商消息并将连接状态置为OPEN*/
con->state = CON_STATE_OPEN;
con->auth_retry = 0; /* we authenticated; clear flag */
con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq);
con->connect_seq++;
con->peer_features = server_feat;
...
con->delay = 0; /* reset backoff memory */
if (con->in_reply.tag == CEPH_MSGR_TAG_SEQ) {
prepare_write_seq(con);
prepare_read_seq(con);
} else {
prepare_read_tag(con);
}
break;
}
}
4.1.4 消息收发过程
客户端与服务端进行正常消息收发时,总是先在网络连接上发送一个字节的tag,再发送实际的消息。原因是两者可以通过tag来明确消息的具体类别和解析格式。网络流上的数据大体如下图所示:
我们先来看看消息的接收:
linux/net/ceph/messenger.c:
static int try_read(struct ceph_connection *con)
{
...
if (con->in_tag == CEPH_MSGR_TAG_READY) { /*READY代表准备好接收具体的tag内容*/
/*
* what's next?
*/
ret = ceph_tcp_recvmsg(con->sock, &con->in_tag, 1); /*从网络中接收一个字节的tag*/
if (ret <= 0)
goto out;
/*根据tag内容准备接收后续的消息*/
switch (con->in_tag) {
case CEPH_MSGR_TAG_MSG:
prepare_read_message(con);
break;
case CEPH_MSGR_TAG_ACK:
prepare_read_ack(con);
break;
case CEPH_MSGR_TAG_CLOSE:
con_close_socket(con);
con->state = CON_STATE_CLOSED;
goto out;
default:
goto bad_tag;
}
}
/*如果tag为MSG,则表示后续为一个实际的message消息,开始接收消息内容并调用回调函数*/
if (con->in_tag == CEPH_MSGR_TAG_MSG) {
ret = read_partial_message(con); /*注意,这里通过回调alloc_msg找到ceph_osd_request中分配好的r_reply*/
...
if (con->in_tag == CEPH_MSGR_TAG_READY)
goto more;
process_message(con);
if (con->state == CON_STATE_OPEN)
prepare_read_tag(con);
goto more;
}
/*如果tag为ACK或SEQ,则表示后续为一个服务端返回的确认号,可以释放已确认的发送消息*/
if (con->in_tag == CEPH_MSGR_TAG_ACK ||
con->in_tag == CEPH_MSGR_TAG_SEQ) {
/*
* the final handshake seq exchange is semantically
* equivalent to an ACK
*/
ret = read_partial_ack(con);
...
process_ack(con);
goto more;
}
...
}
接下来再来看看消息的发送:
linux/net/ceph/messenger.c:
static int try_write(struct ceph_connection *con)
{
more:
...
more_kvec:
if (con->out_kvec_left) { /*将发送缓冲区中的内容通过网络进行发送*/
ret = write_partial_kvec(con);
if (ret <= 0)
goto out;
}
/* msg pages? */
if (con->out_msg) { /*如果已选定当前发送消息out_msg且还未完成发送,则发送消息包含的数据*/
if (con->out_msg_done) {
ceph_msg_put(con->out_msg);
con->out_msg = NULL; /* we're done with this one */
goto do_next;
}
ret = write_partial_message_data(con);
if (ret == 1)
goto more_kvec; /* we need to send the footer, too! */
if (ret == 0)
goto out;
if (ret < 0) {
dout("try_write write_partial_message_data err %d\n", ret);
goto out;
}
}
do_next:
if (con->state == CON_STATE_OPEN) {
/* is anything else pending? */
if (!list_empty(&con->out_queue)) {
/*如果发送队列out_queue中有待发送的消息,则取出一个消息放到out_msg中并发送其头部内容(消息中包含的数据在前面代码中发送)*/
prepare_write_message(con);
goto more;
}
if (con->in_seq > con->in_seq_acked) {
/*如果当前收到消息的序列号大于已经确认的序列号,则准备发送新的确认号*/
prepare_write_ack(con);
goto more;
}
...
}
...
}
最后我们来看看ceph_msg的结构定义,发送者负责将请求内容填入ceph_msg以供网络层发送;网络层接收消息后也先填入ceph_msg再交由上层应用处理响应:
linux/include/linux/ceph/messenger.h:
struct ceph_msg {
struct ceph_msg_header hdr; /* header */
struct ceph_msg_footer footer; /* footer */
struct kvec front; /* unaligned blobs of message */
struct ceph_buffer *middle;
size_t data_length;
struct list_head data;
struct ceph_msg_data_cursor cursor;
struct ceph_connection *con;
struct list_head list_head; /* links for connection lists */
struct kref kref;
bool front_is_vmalloc;
bool more_to_follow;
bool needs_out_seq;
int front_alloc_len;
unsigned long ack_stamp; /* tx: when we were acked */
struct ceph_msgpool *pool;
};
linux/include/linux/ceph/msgr.h:
struct ceph_msg_header {
__le64 seq; /* message seq# for this session */
__le64 tid; /* transaction id */
__le16 type; /* message type */
__le16 priority; /* priority. higher value == higher priority */
__le16 version; /* version of message encoding */
__le32 front_len; /* bytes in main payload */
__le32 middle_len;/* bytes in middle payload */
__le32 data_len; /* bytes of data payload */
__le16 data_off; /* sender: include full offset;
receiver: mask against ~PAGE_MASK */
struct ceph_entity_name src;
__le32 reserved;
__le32 crc; /* header crc32c */
} __attribute__ ((packed));
linux/include/linux/ceph/ceph_fs.h:
/*
* message types
*/
/* misc */
#define CEPH_MSG_SHUTDOWN 1
#define CEPH_MSG_PING 2
/* client <-> monitor */
#define CEPH_MSG_MON_MAP 4
#define CEPH_MSG_MON_GET_MAP 5
#define CEPH_MSG_STATFS 13
#define CEPH_MSG_STATFS_REPLY 14
#define CEPH_MSG_MON_SUBSCRIBE 15
#define CEPH_MSG_MON_SUBSCRIBE_ACK 16
#define CEPH_MSG_AUTH 17
#define CEPH_MSG_AUTH_REPLY 18
#define CEPH_MSG_MON_GET_VERSION 19
#define CEPH_MSG_MON_GET_VERSION_REPLY 20
/* client <-> mds */
#define CEPH_MSG_MDS_MAP 21
#define CEPH_MSG_CLIENT_SESSION 22
#define CEPH_MSG_CLIENT_RECONNECT 23
#define CEPH_MSG_CLIENT_REQUEST 24
#define CEPH_MSG_CLIENT_REQUEST_FORWARD 25
#define CEPH_MSG_CLIENT_REPLY 26
#define CEPH_MSG_CLIENT_CAPS 0x310
#define CEPH_MSG_CLIENT_LEASE 0x311
#define CEPH_MSG_CLIENT_SNAP 0x312
#define CEPH_MSG_CLIENT_CAPRELEASE 0x313
/* pool ops */
#define CEPH_MSG_POOLOP_REPLY 48
#define CEPH_MSG_POOLOP 49
/* osd */
#define CEPH_MSG_OSD_MAP 41
#define CEPH_MSG_OSD_OP 42
#define CEPH_MSG_OSD_OPREPLY 43
#define CEPH_MSG_WATCH_NOTIFY 44
4.2. TCP连接故障后的处理
messenger模块在两种情况下可以获知TCP连接故障:一种是底层网络协议栈通知socket状态改变;另外一种是在通过socket进行网络收发包时,返回错误信息。
4.2.1 网络协议栈通过回调上报故障
在第一种情况下,网络协议栈在发现TCP连接故障后,通过调用回调ceph_sock_state_change函数通知messenger底层socket处于关闭状态。ceph_sock_state_change函数在更新socket状态后会重新调度connection work:
linux/net/ceph/messenger.c:
static void ceph_sock_state_change(struct sock *sk)
{
struct ceph_connection *con = sk->sk_user_data;
switch (sk->sk_state) {
case TCP_CLOSE: /*发现底层socket已关闭*/
dout("%s TCP_CLOSE\n", __func__);
case TCP_CLOSE_WAIT:
dout("%s TCP_CLOSE_WAIT\n", __func__);
con_sock_state_closing(con);
con_flag_set(con, CON_FLAG_SOCK_CLOSED); /*将连接状态置为关闭*/
queue_con(con); /*重新唤醒工作任务*/
break;
case TCP_ESTABLISHED:
...
break;
default: /* Everything else is uninteresting */
break;
}
}
工作任务被调度后,发现连接状态被关闭,进入故障处理流程:
linux/net/ceph/messenger.c:
static void con_work(struct work_struct *work)
{
struct ceph_connection *con = container_of(work, struct ceph_connection,
work.work);
...
while (true) {
if ((fault = con_sock_closed(con))) {
/*工作任务被调度时首先判断连接是否处于关闭状态,如果已关则跳转出循环进行故障处理*/
dout("%s: con %p SOCK_CLOSED\n", __func__, con);
break;
}
...
ret = try_read(con);
...
ret = try_write(con);
...
break;
}
/*下面是故障处理逻辑*/
if (fault)
con_fault(con);
...
}
4.2.2 通过网络收发函数返回结果获知连接故障
每个连接的工作任务在收发过程中,如果底层函数返回错误,也可获知网络连接故障:
linux/net/ceph/messenger.c:
static void con_work(struct work_struct *work)
{
struct ceph_connection *con = container_of(work, struct ceph_connection,
work.work);
...
while (true) {
...
ret = try_read(con);
if (ret < 0) { /*收包时底层返回错误*/
if (ret == -EAGAIN)
continue;
con->error_msg = "socket error on read";
fault = true;
break;
}
ret = try_write(con);
if (ret < 0) { /*发包时底层返回错误*/
if (ret == -EAGAIN)
continue;
con->error_msg = "socket error on write";
fault = true;
}
break;
}
/*下面是故障处理逻辑*/
if (fault)
con_fault(con);
...
}
4.2.3 socket连接故障处理逻辑
连接故障的处理有两种方式:一种是back off,即延时重连;别一种是stand by,即等待有新消息时再重试:
static void con_fault(struct ceph_connection *con)
{
...
con_close_socket(con); /*关闭底层socket*/
...
if (con->in_msg) {
/*如果有新接收到消息,则释放该消息*/
BUG_ON(con->in_msg->con != con);
con->in_msg->con = NULL;
ceph_msg_put(con->in_msg);
con->in_msg = NULL;
con->ops->put(con);
}
/*对于已经发送但还未收到对方确认的消息,我们需要在网络重连后对它们进行重发*/
/* Requeue anything that hasn't been acked */
list_splice_init(&con->out_sent, &con->out_queue);
/* If there are no messages queued or keepalive pending, place
* the connection in a STANDBY state */
if (list_empty(&con->out_queue) &&
!con_flag_test(con, CON_FLAG_KEEPALIVE_PENDING)) {
/*如果发送队列为空且不需要发送心跳消息时,将当前连接置为stand by状态*/
dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con);
con_flag_clear(con, CON_FLAG_WRITE_PENDING);
con->state = CON_STATE_STANDBY;
} else {
/*如果还有待发送的消息,那么偿试等待一段时间后重连;等待时间每次翻倍*/
/* retry after a delay. */
con->state = CON_STATE_PREOPEN;
if (con->delay == 0)
con->delay = BASE_DELAY_INTERVAL;
else if (con->delay < MAX_DELAY_INTERVAL)
con->delay *= 2;
con_flag_set(con, CON_FLAG_BACKOFF);
queue_con(con);
}
}
当连接处理stand by状态时,如果有新的消息通过ceph_con_send发送,其内部会清除stand by状态并重置为PREOPEN以偿试进行重连。
转载请注明:吴斌的博客 » 【Rados Block Device】五、Client内核RBD驱动分析-网络信使messenger