【Rados Block Device】五、Client内核RBD驱动分析-网络信使messenger

4. libceph.ko中messenger模块分析

  messenger模块(信使)是libceph中相对比较独立的部分,旨在为上层各种网络客户端(如mon_client、osd_client)提供稳定可靠、有序的网络服务。messenger构建在网络TCP协议之上,虽然TCP协议本身是面向连接且可靠的网络协议,但是TCP连接有可能断开(broken,非长连接情况下会发生;长连接存在底层网络故障排除后上层应用感知不及时的问题)。为解决TCP短连接不可靠的问题,messenger通过间隔性地重连方案(back off,当有消息要发送时)或者等待方案(stand by,当无消息要发送时)来解决。同时会给每个发送的消息带上唯一的序列号(seq),用以区分是否为重复消息。

  messenger模块内部架构如下图所示:

  • 从发送流程来看(图中实线),应用程序(如rbd命令)调用ceph_con_send进行消息发送;该函数会唤醒发送连接对应的工作任务;在工作任务进程中,通过try_write函数调用kernel_sendmsg访问底层网络协议栈,最终触发网卡发包;
  • 从接收流程来看(图中虚线),网卡收包后通过中断和协议栈回调唤醒工作任务;工作任务通过try_read调用kernel_recvmsg从网络协议栈中读取消息内容;最后调用连接初始化时指定的消息回调函数对消息进行处理。

4.1. 正常网络收发的处理

4.1.1 socket连接阶段

  每当上层通过ceph_con_init创建一个ceph_connection对象(对底层TCP连接的封装,含有自身的状态变化)并调用ceph_con_open打开该连接后(连接状态变为PREOPEN),就会在内核工作队列中添加一项新的任务,并对该任务进行一次调度,其入口函数为con_work,代码框架如下所示:

linux/net/ceph/messenger.c:

static void con_work(struct work_struct *work)
{
    struct ceph_connection *con = container_of(work, struct ceph_connection,
        work.work);

    ...
    while (true) {
        ...
        ret = try_read(con); /*socket连接初始化阶段不进行任何工作*/
        ...
        ret = try_write(con); /*建立socket连接并设定socket回调函数*/
        ...
        break;
    }
    ...
}

  conection worker首次被调度时进入socket连接阶段,完成的主要工作是建立TCP socket连接,并将ceph_connection的状态置为CONNECTING:

linux/net/ceph/messenger.c:

static int try_write(struct ceph_connection *con)
{
    ...

    if (con->state == CON_STATE_PREOPEN) { /*首次被调度时,连接状态为PREOPEN*/
        BUG_ON(con->sock);
        con->state = CON_STATE_CONNECTING; /*执行完下面这些动作后状态更新为CONNECTING*/

        con_out_kvec_reset(con); /*重置当前连接con的发送缓冲区,对应con->out_kvec_* */
        prepare_write_banner(con); /*将客户端的banner内容放入发送缓冲区*/
        prepare_read_banner(con); /*准备接收服务端的banner内容*/

        BUG_ON(con->in_msg);
        con->in_tag = CEPH_MSGR_TAG_READY; /*重置消息接收状态为READY,表示可接收任意消息类别,不同类别由CEPH_MSGR_TAG_*进行区分 */
        ret = ceph_tcp_connect(con); /*创建并连接socket,同时指定协议栈的回调函数*/
        if (ret < 0) {
            con->error_msg = "connect error";
            goto out;
        }
    }

more_kvec:
    ...
    if (con->out_kvec_left) { 
        /*调用底层kernel_sendmsg将当前连接con发送缓冲中的内容进行发送*/
        ret = write_partial_kvec(con);
        if (ret <= 0)
            goto out;
    }
    ...
}

static int ceph_tcp_connect(struct ceph_connection *con)
{
    struct sockaddr_storage *paddr = &con->peer_addr.in_addr;
    struct socket *sock;
    int ret;

    BUG_ON(con->sock);
    ret = sock_create_kern(con->peer_addr.in_addr.ss_family, SOCK_STREAM,
        IPPROTO_TCP, &sock);
    ...
    sock->sk->sk_allocation = GFP_NOFS;

    set_sock_callbacks(sock, con);

    con_sock_state_connecting(con);
    ret = sock->ops->connect(sock, (struct sockaddr *)paddr, sizeof(*paddr),
        O_NONBLOCK);
    ...
    con->sock = sock;
    return 0;
}

static void set_sock_callbacks(struct socket *sock,
    struct ceph_connection *con)
{
    struct sock *sk = sock->sk;
    sk->sk_user_data = con;
    sk->sk_data_ready = ceph_sock_data_ready; /*协议栈收到包后会回调该函数*/
    sk->sk_write_space = ceph_sock_write_space;
    sk->sk_state_change = ceph_sock_state_change; /*协议栈改变socket状态时会回调该函数*/
}

static void ceph_sock_data_ready(struct sock *sk, int count_unused)
{
    struct ceph_connection *con = sk->sk_user_data;

    ...
    queue_con(con); /*再次唤醒工作任务*/
    ...
}
4.1.2 协商阶段

  socket连接阶段最后,向服务端发送了客户端的banner,并等待服务端回复banner。当收到服务端回复banner后,connection worker再次被唤醒,此时主要完成客户端与服务端的信息协商:

linux/net/ceph/messenger.c:

static void con_work(struct work_struct *work)
{
    struct ceph_connection *con = container_of(work, struct ceph_connection,
        work.work);

    ...
    while (true) {
        ...
        ret = try_read(con); /*读取服务端返回的banner并进入协商阶段*/
        ...
        ret = try_write(con); /*向服务端发送协商信息*/
        ...
        break;
    }
    ...
}

static int try_read(struct ceph_connection *con)
{
    ...
    if (con->state == CON_STATE_CONNECTING) {
        ret = read_partial_banner(con); /*读取服务端的banner信息*/
        if (ret <= 0)
            goto out;
        ret = process_banner(con); /*对banner进行校验*/
        if (ret < 0)
            goto out;

        con->state = CON_STATE_NEGOTIATING; /*校验通过后进入协商阶段*/

        /*
         * Received banner is good, exchange connection info.
         * Do not reset out_kvec, as sending our banner raced
         * with receiving peer banner after connect completed.
         */
        ret = prepare_write_connect(con); /*准备全局连接号等协商信息*/
        if (ret < 0)
            goto out;
        prepare_read_connect(con); /*准备接收服务端返回的协商信息*/

        /* Send connection info before awaiting response */
        goto out;
    }
    ...
}
4.1.3 正常打开阶段

  收到服务端的协商消息后,connection worker再次被唤醒,进行服务端协商消息的处理并进入正常打开阶段可收发消息:

linux/net/ceph/messenger.c:

static void con_work(struct work_struct *work)
{
    struct ceph_connection *con = container_of(work, struct ceph_connection,
        work.work);

    ...
    while (true) {
        ...
        ret = try_read(con); /*读取服务端协商信息并进入正常打开阶段,可正常接收消息*/
        ...
        ret = try_write(con); /*可正常发送消息*/
        ...
        break;
    }
    ...
}

static int try_read(struct ceph_connection *con)
{
    ...
    if (con->state == CON_STATE_NEGOTIATING) {
        ret = read_partial_connect(con); /*读取服务端协商信息到in_reply中*/
        if (ret <= 0)
            goto out;
        ret = process_connect(con); /*处理协商消息并进入正常打开阶段*/
        if (ret < 0)
            goto out;
        goto more;
    }
    ...
}

static int process_connect(struct ceph_connection *con)
{
    ...
    switch (con->in_reply.tag) { /*in_reply中记录服务端返回的协商信息*/
        ...
    case CEPH_MSGR_TAG_SEQ:
    case CEPH_MSGR_TAG_READY:
        ...

        /*记录服务端返回的协商消息并将连接状态置为OPEN*/
    
        con->state = CON_STATE_OPEN;
        con->auth_retry = 0;    /* we authenticated; clear flag */
        con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq);
        con->connect_seq++;
        con->peer_features = server_feat;
        ...
        con->delay = 0;      /* reset backoff memory */

        if (con->in_reply.tag == CEPH_MSGR_TAG_SEQ) {
            prepare_write_seq(con);
            prepare_read_seq(con);
        } else {
            prepare_read_tag(con);
        }
        break;
    }
}
4.1.4 消息收发过程

  客户端与服务端进行正常消息收发时,总是先在网络连接上发送一个字节的tag,再发送实际的消息。原因是两者可以通过tag来明确消息的具体类别和解析格式。网络流上的数据大体如下图所示:

  我们先来看看消息的接收:

linux/net/ceph/messenger.c:

static int try_read(struct ceph_connection *con)
{
    ...

    if (con->in_tag == CEPH_MSGR_TAG_READY) { /*READY代表准备好接收具体的tag内容*/
        /*
         * what's next?
         */
        ret = ceph_tcp_recvmsg(con->sock, &con->in_tag, 1); /*从网络中接收一个字节的tag*/
        if (ret <= 0)
            goto out;

        /*根据tag内容准备接收后续的消息*/
        switch (con->in_tag) {
        case CEPH_MSGR_TAG_MSG:
            prepare_read_message(con);
            break;
        case CEPH_MSGR_TAG_ACK:
            prepare_read_ack(con);
            break;
        case CEPH_MSGR_TAG_CLOSE:
            con_close_socket(con);
            con->state = CON_STATE_CLOSED;
            goto out;
        default:
            goto bad_tag;
        }
    }

    /*如果tag为MSG,则表示后续为一个实际的message消息,开始接收消息内容并调用回调函数*/
    if (con->in_tag == CEPH_MSGR_TAG_MSG) {
        ret = read_partial_message(con); /*注意,这里通过回调alloc_msg找到ceph_osd_request中分配好的r_reply*/
        ...
        if (con->in_tag == CEPH_MSGR_TAG_READY)
            goto more;
        process_message(con);
        if (con->state == CON_STATE_OPEN)
            prepare_read_tag(con);
        goto more;
    }

    /*如果tag为ACK或SEQ,则表示后续为一个服务端返回的确认号,可以释放已确认的发送消息*/
    if (con->in_tag == CEPH_MSGR_TAG_ACK ||
        con->in_tag == CEPH_MSGR_TAG_SEQ) {
        /*
         * the final handshake seq exchange is semantically
         * equivalent to an ACK
         */
        ret = read_partial_ack(con);
        ...
        process_ack(con);
        goto more;
    }
    ...
}

  接下来再来看看消息的发送:

linux/net/ceph/messenger.c:

static int try_write(struct ceph_connection *con)
{
more:
    ...
more_kvec:    
    if (con->out_kvec_left) { /*将发送缓冲区中的内容通过网络进行发送*/
        ret = write_partial_kvec(con);
        if (ret <= 0)
            goto out;
    }

    /* msg pages? */
    if (con->out_msg) { /*如果已选定当前发送消息out_msg且还未完成发送,则发送消息包含的数据*/
        if (con->out_msg_done) {
            ceph_msg_put(con->out_msg);
            con->out_msg = NULL;   /* we're done with this one */
            goto do_next;
        }

        ret = write_partial_message_data(con);
        if (ret == 1)
            goto more_kvec;  /* we need to send the footer, too! */
        if (ret == 0)
            goto out;
        if (ret < 0) {
            dout("try_write write_partial_message_data err %d\n", ret);
            goto out;
        }
    }

do_next:
    if (con->state == CON_STATE_OPEN) {
        /* is anything else pending? */
        if (!list_empty(&con->out_queue)) {
            /*如果发送队列out_queue中有待发送的消息,则取出一个消息放到out_msg中并发送其头部内容(消息中包含的数据在前面代码中发送)*/
            prepare_write_message(con);
            goto more;
        }
        if (con->in_seq > con->in_seq_acked) {
            /*如果当前收到消息的序列号大于已经确认的序列号,则准备发送新的确认号*/
            prepare_write_ack(con);
            goto more;
        }
        ...
    }
    ...
}

  最后我们来看看ceph_msg的结构定义,发送者负责将请求内容填入ceph_msg以供网络层发送;网络层接收消息后也先填入ceph_msg再交由上层应用处理响应:

linux/include/linux/ceph/messenger.h:

struct ceph_msg {
    struct ceph_msg_header hdr;	/* header */
    struct ceph_msg_footer footer;	/* footer */
    struct kvec front;              /* unaligned blobs of message */
    struct ceph_buffer *middle;

    size_t				data_length;
    struct list_head		data;
    struct ceph_msg_data_cursor	cursor;

    struct ceph_connection *con;
    struct list_head list_head;	/* links for connection lists */

    struct kref kref;
    bool front_is_vmalloc;
    bool more_to_follow;
    bool needs_out_seq;
    int front_alloc_len;
    unsigned long ack_stamp;        /* tx: when we were acked */

    struct ceph_msgpool *pool;
};
linux/include/linux/ceph/msgr.h:

struct ceph_msg_header {
    __le64 seq;       /* message seq# for this session */
    __le64 tid;       /* transaction id */
    __le16 type;      /* message type */
    __le16 priority;  /* priority.  higher value == higher priority */
    __le16 version;   /* version of message encoding */

    __le32 front_len; /* bytes in main payload */
    __le32 middle_len;/* bytes in middle payload */
    __le32 data_len;  /* bytes of data payload */
    __le16 data_off;  /* sender: include full offset;
                        receiver: mask against ~PAGE_MASK */

    struct ceph_entity_name src;
    __le32 reserved;
    __le32 crc;       /* header crc32c */
} __attribute__ ((packed));
linux/include/linux/ceph/ceph_fs.h:

/*
 * message types
 */

/* misc */
#define CEPH_MSG_SHUTDOWN               1
#define CEPH_MSG_PING                   2

/* client <-> monitor */
#define CEPH_MSG_MON_MAP                4
#define CEPH_MSG_MON_GET_MAP            5
#define CEPH_MSG_STATFS                 13
#define CEPH_MSG_STATFS_REPLY           14
#define CEPH_MSG_MON_SUBSCRIBE          15
#define CEPH_MSG_MON_SUBSCRIBE_ACK      16
#define CEPH_MSG_AUTH			17
#define CEPH_MSG_AUTH_REPLY		18
#define CEPH_MSG_MON_GET_VERSION        19
#define CEPH_MSG_MON_GET_VERSION_REPLY  20

/* client <-> mds */
#define CEPH_MSG_MDS_MAP                21

#define CEPH_MSG_CLIENT_SESSION         22
#define CEPH_MSG_CLIENT_RECONNECT       23

#define CEPH_MSG_CLIENT_REQUEST         24
#define CEPH_MSG_CLIENT_REQUEST_FORWARD 25
#define CEPH_MSG_CLIENT_REPLY           26
#define CEPH_MSG_CLIENT_CAPS            0x310
#define CEPH_MSG_CLIENT_LEASE           0x311
#define CEPH_MSG_CLIENT_SNAP            0x312
#define CEPH_MSG_CLIENT_CAPRELEASE      0x313

/* pool ops */
#define CEPH_MSG_POOLOP_REPLY           48
#define CEPH_MSG_POOLOP                 49


/* osd */
#define CEPH_MSG_OSD_MAP                41
#define CEPH_MSG_OSD_OP                 42
#define CEPH_MSG_OSD_OPREPLY            43
#define CEPH_MSG_WATCH_NOTIFY           44

4.2. TCP连接故障后的处理

  messenger模块在两种情况下可以获知TCP连接故障:一种是底层网络协议栈通知socket状态改变;另外一种是在通过socket进行网络收发包时,返回错误信息。

4.2.1 网络协议栈通过回调上报故障

  在第一种情况下,网络协议栈在发现TCP连接故障后,通过调用回调ceph_sock_state_change函数通知messenger底层socket处于关闭状态。ceph_sock_state_change函数在更新socket状态后会重新调度connection work:

linux/net/ceph/messenger.c:

static void ceph_sock_state_change(struct sock *sk)
{
    struct ceph_connection *con = sk->sk_user_data;

    switch (sk->sk_state) {
    case TCP_CLOSE: /*发现底层socket已关闭*/
        dout("%s TCP_CLOSE\n", __func__);
    case TCP_CLOSE_WAIT:
        dout("%s TCP_CLOSE_WAIT\n", __func__);
        con_sock_state_closing(con);
        con_flag_set(con, CON_FLAG_SOCK_CLOSED); /*将连接状态置为关闭*/
        queue_con(con); /*重新唤醒工作任务*/
        break;
    case TCP_ESTABLISHED:
        ...
        break;
    default:	/* Everything else is uninteresting */
        break;
    }
}

  工作任务被调度后,发现连接状态被关闭,进入故障处理流程:

linux/net/ceph/messenger.c:

static void con_work(struct work_struct *work)
{
    struct ceph_connection *con = container_of(work, struct ceph_connection,
        work.work);

    ...
    while (true) {
        if ((fault = con_sock_closed(con))) {
            /*工作任务被调度时首先判断连接是否处于关闭状态,如果已关则跳转出循环进行故障处理*/
            dout("%s: con %p SOCK_CLOSED\n", __func__, con);
            break;
        }
        ...
        ret = try_read(con); 
        ...
        ret = try_write(con); 
        ...
        break;
    }

    /*下面是故障处理逻辑*/
    if (fault)
        con_fault(con);
    ...
}
4.2.2 通过网络收发函数返回结果获知连接故障

  每个连接的工作任务在收发过程中,如果底层函数返回错误,也可获知网络连接故障:

linux/net/ceph/messenger.c:

static void con_work(struct work_struct *work)
{
    struct ceph_connection *con = container_of(work, struct ceph_connection,
        work.work);

    ...
    while (true) {
        ...
        ret = try_read(con);
        if (ret < 0) { /*收包时底层返回错误*/
            if (ret == -EAGAIN)
                continue;
            con->error_msg = "socket error on read";
            fault = true;
            break;
        }
        
        ret = try_write(con); 
        if (ret < 0) { /*发包时底层返回错误*/
            if (ret == -EAGAIN)
                continue;
            con->error_msg = "socket error on write";
            fault = true;
        }

        break;
    }

    /*下面是故障处理逻辑*/
    if (fault)
        con_fault(con);
    ...
}
4.2.3 socket连接故障处理逻辑

  连接故障的处理有两种方式:一种是back off,即延时重连;别一种是stand by,即等待有新消息时再重试:


static void con_fault(struct ceph_connection *con)
{
    ...
    con_close_socket(con); /*关闭底层socket*/

    ...
    if (con->in_msg) {
        /*如果有新接收到消息,则释放该消息*/
        BUG_ON(con->in_msg->con != con);
        con->in_msg->con = NULL;
        ceph_msg_put(con->in_msg);
        con->in_msg = NULL;
        con->ops->put(con);
    }

    /*对于已经发送但还未收到对方确认的消息,我们需要在网络重连后对它们进行重发*/
    /* Requeue anything that hasn't been acked */
    list_splice_init(&con->out_sent, &con->out_queue);

    /* If there are no messages queued or keepalive pending, place
     * the connection in a STANDBY state */
    if (list_empty(&con->out_queue) &&
            !con_flag_test(con, CON_FLAG_KEEPALIVE_PENDING)) {

        /*如果发送队列为空且不需要发送心跳消息时,将当前连接置为stand by状态*/

        dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con);
        con_flag_clear(con, CON_FLAG_WRITE_PENDING);
        con->state = CON_STATE_STANDBY;
    } else {
        
        /*如果还有待发送的消息,那么偿试等待一段时间后重连;等待时间每次翻倍*/

        /* retry after a delay. */
        con->state = CON_STATE_PREOPEN;
        if (con->delay == 0)
            con->delay = BASE_DELAY_INTERVAL;
        else if (con->delay < MAX_DELAY_INTERVAL)
            con->delay *= 2;
        con_flag_set(con, CON_FLAG_BACKOFF);
        queue_con(con);
    }
}

  当连接处理stand by状态时,如果有新的消息通过ceph_con_send发送,其内部会清除stand by状态并重置为PREOPEN以偿试进行重连。


转载请注明:吴斌的博客 » 【Rados Block Device】五、Client内核RBD驱动分析-网络信使messenger