异步非阻塞模型:io_uring和完成端口

 Linux 内核 5.1 版本引入io_uring,这是linux下类似windows完成端口一种异步通讯框架,通过提交队列SQ、完成队列CQ实现内核自动化操作。同步非阻塞模型如epoll、select和异步模型最大区别是:有数据可用时,是否需要手工获取数据,异步模型在有数据到达时内核自动填充数据,并通过内存映射实现用户态和内核态之间零拷贝。 本文将分别给出两个框架的实现,重点介绍io_uring。

一、完成端口

#ifndef IOCPSERVER_H
#define IOCPSERVER_H
#define _WINSOCK_DEPRECATED_NO_WARNINGS
#include <winsock2.h>
#include <windows.h>
#include <string>
#include <vector>
#include <process.h>
#include <iostream>
#include <memory>
#include <mutex>
#include <condition_variable>
#include <thread>
using namespace std;
#define BUFFER_SIZE 1024
#define BACKLOG 10

typedef struct _ServerParams{
 SOCKET m_sock;
 HANDLE hCompletion;
 int id;
}ServerParams;

typedef struct _PerHandle
{
    SOCKET m_sock;
    sockaddr_in m_addr;
}PerHandle, * PtrPerHandle;

typedef struct _PerIO
{
    OVERLAPPED m_overlapped;
    char buf[BUFFER_SIZE];
    int m_operationType;

}PerIO, * PtrPerIO;

typedef struct my_proto_header{
 int payload_len;
 int total_len;
 int msgtype;
}MESSAGEHEADER;
enum OPCODE{
OP_READ= 1,
OP_WRITE= 2,
OP_ACCEPT= 3,
};

class iocpServer{
    class worker{
    private:
        iocpServer * iocpctl;
        int id;
    public:
      worker(iocpServer * _iocpctl, int _id ):iocpctl(_iocpctl),id(_id) { }
      void operator()(){
          this->Worker_Recieve_Thread(id);
         }
    void   Worker_Recieve_Thread( int id)
    {

        DWORD dwTrans;
        PtrPerHandle ptrPerHandle;
        PtrPerIO ptrPerIO;
        printf("worker %d running....\n",id);
        while (true)
        {
            /*阻塞线程直到有IO操作完成,并通过参数返回操作结果。*/
            BOOL ret = ::GetQueuedCompletionStatus(iocpctl->hCompletion, &dwTrans, (PULONG_PTR)&ptrPerHandle, (LPOVERLAPPED*)&ptrPerIO, WSA_INFINITE);

            if (!ret)
            {
                closesocket(ptrPerHandle->m_sock);
                delete(ptrPerHandle);
                delete(ptrPerIO);
                continue;
            }
            /*读或写数据为空*/
            if (dwTrans == 0 && (ptrPerIO->m_operationType == OP_READ || ptrPerIO->m_operationType == OP_WRITE))
            {
                closesocket(ptrPerHandle->m_sock);
                delete(ptrPerHandle);
                delete(ptrPerIO);
                continue;
            }
            switch (ptrPerIO->m_operationType)
            {
            case OP_READ:
            {

                printf("%d bytes recieved!\n",   dwTrans);
                while(1){
                if (dwTrans < sizeof(struct my_proto_header))   break;
                struct my_proto_header *hdr = (struct my_proto_header*) ptrPerIO->buf;
                int payload_len=hdr->payload_len ;
                int total_pkt_len = hdr->total_len;
                if(total_pkt_len==0 || payload_len==0) break;
                if (dwTrans >= total_pkt_len) {
                    char* messageBody  =new char [payload_len ]{0};
                 memcpy(messageBody, ptrPerIO->buf +sizeof(*hdr),payload_len);
                 cout<<"thread id"<< id <<":"<<messageBody<<endl;
                 delete [] messageBody;
                 int  remaining = dwTrans- total_pkt_len;
                 if (remaining > 0){
                   memmove(ptrPerIO->buf,  ptrPerIO->buf + total_pkt_len, remaining );
                   dwTrans = remaining;
                 }
                 else
                     break;
                }

                }
                WSABUF buf;
                buf.buf = ptrPerIO->buf;
                buf.len = BUFFER_SIZE;
                DWORD dwRecv;
                DWORD dwFlag = 0;
                ::WSARecv(ptrPerHandle->m_sock, &buf, 1, &dwRecv, &dwFlag, &ptrPerIO->m_overlapped, NULL);
            }
            break;
            case OP_WRITE:
            case OP_ACCEPT:
                break;
            }
        }

    }
    };
    class boss{
    private:
        iocpServer * iocpctl;
        int id;
    public:
      boss(iocpServer * _iocpctl, int _id):iocpctl(_iocpctl),id(_id){  }
      void operator()(){
          this->Boss_Accept_Thread();
         }
      void Boss_Accept_Thread(){
          printf("boss thread %d is running\n", id);
          while (true)
          {
              struct sockaddr_in clientAddr;
              int clientAddrLen = sizeof(clientAddr);
              SOCKET newClient = accept( iocpctl->myListen, (sockaddr*)&clientAddr, &clientAddrLen);
              /*将套接字与完成端口关联*/
              PtrPerHandle ptrPerHandle = new PerHandle();
              ptrPerHandle->m_sock = newClient;
              ptrPerHandle->m_addr = clientAddr;
              CreateIoCompletionPort((HANDLE)ptrPerHandle->m_sock, iocpctl->hCompletion, (ULONG_PTR)ptrPerHandle, 0);
              /*投放异步重叠IO*/
              PtrPerIO ptrPerIO = new PerIO();
              ptrPerIO->m_operationType = OP_READ;
              WSABUF buf;
              buf.buf = ptrPerIO->buf;
              buf.len = BUFFER_SIZE;
              DWORD dwRecv;
              DWORD dwFlag = 0;
              ::WSARecv(ptrPerHandle->m_sock, &buf, 1, &dwRecv, &dwFlag, &ptrPerIO->m_overlapped, NULL);
          }
      }
    };
public:
    iocpServer(int _bosscount,int _workercount,int _port):bosscount(_bosscount),wordercount(_workercount),port(_port){
    }
    ~iocpServer(){}

    void  sync(){
            for(auto it=boss_threads_group.begin();it!=boss_threads_group.end();it++)
                    (*it).join();
           }

    void startServer(){

        WSADATA wsaData;
        if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0)
        {
            cout << "failed to load winsock !" << endl;
            exit(0);
        }

        // 创建完成端口对象
         hCompletion = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);
        /*创建服务线程,构建服务线程池*/
         for (int i = 0; i < wordercount; i++)  {
                this->worker_threads_group.push_back(std::thread (worker(this, i)) );
         }

        /*创建监听套接字,并监听连接*/
         myListen= socket(AF_INET, SOCK_STREAM, 0);
        struct sockaddr_in localAddr;
        localAddr.sin_family = AF_INET;
        localAddr.sin_port = ntohs(port);
        localAddr.sin_addr.s_addr =  htonl(INADDR_ANY);
        bind(myListen, (sockaddr*)&localAddr, sizeof(localAddr));
        listen(myListen, BACKLOG);
        for (int i = 0; i < bosscount; i++)
                this->boss_threads_group.push_back(std::thread (boss(this, i)) );


    }

private:
    int port;
    int bosscount;
    int wordercount;
     HANDLE hCompletion;
     SOCKET myListen;
     vector< std::thread> boss_threads_group;
     vector< std::thread> worker_threads_group;
};
#endif // IOCPSERVER_H

创建一个完成端口服务端代码为:

  iocpServer server(2,4,9999);
    server.startServer();
    server.sync();

以上创建一个服务端,2个线程用于接受连接,4个线程用于接收消息。

二、IO_URING

与epoll相比,io_uring具备高吞吐低占用的特点,但由于是异步框架,在延迟方面可能不如epoll等同步模型

#ifndef IO_URINGSERVER_H
#define IO_URINGSERVER_H
#include <memory>
#include <mutex>
#include <condition_variable>
#include <thread>
#include <iostream>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <liburing.h>
#include <random>‌‌丶
#include <unordered_map>‌‌
using namespace  std;
constexpr int  MAXCONNECTIONS =1024*10 *5;
constexpr int  ENTRIES_LENGTH	=	1024*4;
constexpr int  READ_ENTRIES_LENGTH	=	1024*4;
constexpr int   BUFFER_LENGTH	=	1024;
constexpr int   MAX_QUERY_COUNT 	=	1024;


enum {
    READ,
    WRITE,
    ACCEPT,
};
enum MESSAGETYPE {
    ARRAY=1,
    CLOSE=2,
    STOP=3,
};

struct conninfo {
    int connfd;
    int type;
    size_t recv_len ;
};
typedef struct my_proto_header{
    int payload_len;
    int total_len;
    int msgtype;
} MSGHEADER;

class iouringServer{
    class worker{
    private:
          iouringServer* ioctl;
          int id;
           bool running=true;
    public:
       worker(iouringServer* _ioctl, int _id):ioctl(_ioctl),id(_id){}
    void operator()(){
     this->Worker_Recieve_Thread(id);
    }
    void Worker_Recieve_Thread( int id){
       printf("thread %d start...\n",id);
      while(running){
      struct io_uring_cqe *cqe;
      io_uring_wait_cqe(ioctl->readrings.at(id).get(), &cqe);
      struct io_uring_cqe *cqes[MAX_QUERY_COUNT];
      int cqecount = io_uring_peek_batch_cqe(ioctl->readrings.at(id).get(), cqes, MAX_QUERY_COUNT);
      unsigned count = 0;
      vector<int> fds;
      for (int i = 0;i < cqecount;i ++) {
               cqe = cqes[i];
               count ++;
               struct conninfo ci;
               memcpy(&ci, &cqe->user_data, sizeof(ci));
               if (ci.type == READ) {
                   int bytes_read = cqe->res;
                   if (bytes_read == 0) {
                       close(ci.connfd);
                       printf("client %d closed\n", ci.connfd);
                       auto it = std::find(fds.begin(), fds.end(), ci.connfd );
                       if(  it!=fds.end())
                         fds.erase(it);

                   } else if (bytes_read < 0) {
                       close(ci.connfd);
                       auto it = std::find(fds.begin(), fds.end(), ci.connfd );
                       if(  it!=fds.end())
                              fds.erase(it);
                   } else {
                    ci.recv_len  = bytes_read;

                    while(1){
                     if (ci.recv_len < sizeof(struct my_proto_header))  break;
                      char *buffer = iouringServer::iouringBuffer[(ci.connfd-1) % (MAXCONNECTIONS-1)];
                      struct my_proto_header *hdr = (struct my_proto_header*)buffer;
                      int payload_len=hdr->payload_len ;
                      int total_pkt_len = hdr->total_len;
                      if(total_pkt_len==0 || payload_len==0) break;
                      if (ci.recv_len >= total_pkt_len) {

                     char* userbuf  =new char [payload_len ];
                     memcpy(userbuf, buffer +sizeof(*hdr),payload_len);
                      printf("thread %d receive message:%s \n",id,userbuf);
                     delete [] userbuf;
                     int  remaining = ci.recv_len- total_pkt_len;
                     if (remaining > 0){
                         memmove(buffer,  buffer + total_pkt_len, remaining );
                         ci.recv_len = remaining;
                      if(std::find(fds.begin(),fds.end(),ci.connfd)==fds.end())
                          fds.push_back(ci.connfd);
                     }
                     else
                     {
                      if(std::find(fds.begin(),fds.end(),ci.connfd)==fds.end())
                             fds.push_back(ci.connfd);
                         break;
                     }
                     }
                     }
                     memset(iouringServer::iouringBuffer[(ci.connfd-1) % (MAXCONNECTIONS-1)],0,BUFFER_LENGTH);
                   }

               }
           }
       for(auto it=fds.begin();it<fds.end();it++)
           ioctl->set_read_event(ioctl->readrings.at(id).get(), *it , iouringServer::iouringBuffer[(*it-1) % (MAXCONNECTIONS-1)], BUFFER_LENGTH, 0);
        fds.erase(fds.begin(), fds.end());
        io_uring_cq_advance(ioctl->readrings.at(id).get(), count);
        io_uring_submit( ioctl->readrings.at(id).get());
       }

    }
    };
    class boss{
    private:
        int listenfd;
        bool running=true;
        iouringServer* ioctl;
    public:
       boss(iouringServer* _ioctl,int _listenfd):ioctl(_ioctl),listenfd(_listenfd){}
       void operator()(){
           this->Boss_Accept_Thread(listenfd);
      }
       void Boss_Accept_Thread(int listenfd){
           struct io_uring ring;
           struct io_uring_params params;
           memset(&params, 0, sizeof(params));
           memset(&ring, 0, sizeof(ring));
           /*初始化params 和 ring*/
           io_uring_queue_init_params(ENTRIES_LENGTH, &ring, &params);
           struct sockaddr_in  clientaddr;
           socklen_t  client= sizeof(clientaddr);
           ioctl->set_accept_event(&ring, listenfd, (struct sockaddr*)&clientaddr, &client, 0);

           while(running){
               struct io_uring_cqe *cqe;
               io_uring_submit(&ring);
               io_uring_wait_cqe(&ring, &cqe);
               struct io_uring_cqe *cqes[MAX_QUERY_COUNT];
               int cqecount = io_uring_peek_batch_cqe(&ring, cqes, MAX_QUERY_COUNT);
               unsigned count = 0;
               for (int i = 0;i < cqecount;i ++) {
                   cqe = cqes[i];
                   count ++;
                   struct conninfo ci;
                   memcpy(&ci, &cqe->user_data, sizeof(ci));
                   if (ci.type == ACCEPT) {
                       int connfd = cqe->res;
                       char *buffer = iouringServer::iouringBuffer[(connfd-1) % (MAXCONNECTIONS-1)];
                       int index=ioctl->randomint(ioctl->workercount);
                       ioctl->set_read_event( ioctl->readrings.at(index).get(), connfd, buffer, BUFFER_LENGTH, 0);
                       io_uring_submit( ioctl->readrings.at(index).get());
                       struct sockaddr_in  clientaddr;
                       socklen_t client=sizeof (clientaddr);
                       ioctl->set_accept_event(&ring, listenfd, (struct sockaddr*)&clientaddr, &client, 0);
                   } else if (ci.type == READ) {

                   }
               }
               io_uring_cq_advance(&ring, count);
           }

       }
    };
public:
    iouringServer(int _bosscount,int _wordercount, int serverport):  booscount(_bosscount) ,workercount(_wordercount),port(serverport){
    }
    ~iouringServer(){}
    void  startIOUringServer(){
        int listenfd=initServer();
        if(listenfd>0){
            for(int i=0;i< workercount;i++){
              std::shared_ptr<struct io_uring >   ring=std::make_shared<struct io_uring >();
              io_uring_params  param  ;
              memset( &param, 0, sizeof(param));
              memset( ring.get(), 0, sizeof(struct io_uring ));
              io_uring_queue_init_params(READ_ENTRIES_LENGTH,  ring.get(),  &param  );
              readrings.push_back(ring);
              this->worker_threads_group.push_back(std::thread (worker(this, i)) );
            }

            for(int i=0;i< booscount;i++)
                   this->boss_threads_group.push_back(std::thread (boss(this,listenfd) ) );

        }

    }
   void  sync(){
         for(auto it=boss_threads_group.begin();it!=boss_threads_group.end();it++)
                 (*it).join();
        }
    void set_read_event(struct io_uring *ring, int fd, void *buf, size_t len, int flags) {
        struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
        io_uring_prep_recv(sqe, fd, buf, len, flags);
        struct conninfo ci = {
            .connfd = fd,
            .type = READ,
            .recv_len=0
        };
        memcpy(&sqe->user_data, &ci, sizeof(struct conninfo));
        return ;
    }
    void set_write_event(struct io_uring *ring, int fd, const void *buf, size_t len, int flags) {
        struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
        io_uring_prep_send(sqe, fd, buf, len, flags);
        struct conninfo ci = {
            .connfd = fd,
            .type = WRITE,
            .recv_len=0
        };
        memcpy(&sqe->user_data, &ci, sizeof(struct conninfo));
        return ;
    }
    void set_accept_event(struct io_uring *ring, int fd,
        struct sockaddr *cliaddr, socklen_t *clilen, unsigned flags) {
        struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
        io_uring_prep_accept(sqe, fd, cliaddr, clilen, flags);
        struct conninfo ci = {
            .connfd = fd,
            .type = ACCEPT ,
            .recv_len=0
        };
        memcpy(&sqe->user_data, &ci, sizeof(struct conninfo));
        return ;
    }
 
private:
    static     char  iouringBuffer[MAXCONNECTIONS][BUFFER_LENGTH] ;
    int workercount;
    int booscount;
    vector< std::thread> boss_threads_group;
    vector< std::thread> worker_threads_group;
    int port;
    vector< std::shared_ptr<struct io_uring > >readrings;
    int randomint(int UPPER){
        std::srand(std::time(0));
        int random_num = std::rand() % UPPER;
        return random_num;

    }
    int   initServer(){
       int listenfd= socket(AF_INET, SOCK_STREAM, 0);  //
       if (listenfd == -1) return  -1;
       struct sockaddr_in servaddr ;
       servaddr.sin_family = AF_INET;
       servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
       servaddr.sin_port = htons(port);
       if (-1 == bind(listenfd, (struct sockaddr*)&servaddr, sizeof(servaddr)))
           return -1;
       listen(listenfd, 10);
       return listenfd;

   }

};
char iouringServer::iouringBuffer[MAXCONNECTIONS][BUFFER_LENGTH]={0};
#endif // IO_URINGSERVER_H

由于异步框架是基于流模型,会出现粘包现象,通过定义数据头,告诉服务端数据包的大小:

-免费试读结束-
登录|注册后打赏作者吧! 0.8元