Linux 内核 5.1 版本引入io_uring,这是linux下类似windows完成端口一种异步通讯框架,通过提交队列SQ、完成队列CQ实现内核自动化操作。同步非阻塞模型如epoll、select和异步模型最大区别是:有数据可用时,是否需要手工获取数据,异步模型在有数据到达时内核自动填充数据,并通过内存映射实现用户态和内核态之间零拷贝。 本文将分别给出两个框架的实现,重点介绍io_uring。
一、完成端口
#ifndef IOCPSERVER_H
#define IOCPSERVER_H
#define _WINSOCK_DEPRECATED_NO_WARNINGS
#include <winsock2.h>
#include <windows.h>
#include <string>
#include <vector>
#include <process.h>
#include <iostream>
#include <memory>
#include <mutex>
#include <condition_variable>
#include <thread>
using namespace std;
#define BUFFER_SIZE 1024
#define BACKLOG 10
typedef struct _ServerParams{
SOCKET m_sock;
HANDLE hCompletion;
int id;
}ServerParams;
typedef struct _PerHandle
{
SOCKET m_sock;
sockaddr_in m_addr;
}PerHandle, * PtrPerHandle;
typedef struct _PerIO
{
OVERLAPPED m_overlapped;
char buf[BUFFER_SIZE];
int m_operationType;
}PerIO, * PtrPerIO;
typedef struct my_proto_header{
int payload_len;
int total_len;
int msgtype;
}MESSAGEHEADER;
enum OPCODE{
OP_READ= 1,
OP_WRITE= 2,
OP_ACCEPT= 3,
};
class iocpServer{
class worker{
private:
iocpServer * iocpctl;
int id;
public:
worker(iocpServer * _iocpctl, int _id ):iocpctl(_iocpctl),id(_id) { }
void operator()(){
this->Worker_Recieve_Thread(id);
}
void Worker_Recieve_Thread( int id)
{
DWORD dwTrans;
PtrPerHandle ptrPerHandle;
PtrPerIO ptrPerIO;
printf("worker %d running....\n",id);
while (true)
{
/*阻塞线程直到有IO操作完成,并通过参数返回操作结果。*/
BOOL ret = ::GetQueuedCompletionStatus(iocpctl->hCompletion, &dwTrans, (PULONG_PTR)&ptrPerHandle, (LPOVERLAPPED*)&ptrPerIO, WSA_INFINITE);
if (!ret)
{
closesocket(ptrPerHandle->m_sock);
delete(ptrPerHandle);
delete(ptrPerIO);
continue;
}
/*读或写数据为空*/
if (dwTrans == 0 && (ptrPerIO->m_operationType == OP_READ || ptrPerIO->m_operationType == OP_WRITE))
{
closesocket(ptrPerHandle->m_sock);
delete(ptrPerHandle);
delete(ptrPerIO);
continue;
}
switch (ptrPerIO->m_operationType)
{
case OP_READ:
{
printf("%d bytes recieved!\n", dwTrans);
while(1){
if (dwTrans < sizeof(struct my_proto_header)) break;
struct my_proto_header *hdr = (struct my_proto_header*) ptrPerIO->buf;
int payload_len=hdr->payload_len ;
int total_pkt_len = hdr->total_len;
if(total_pkt_len==0 || payload_len==0) break;
if (dwTrans >= total_pkt_len) {
char* messageBody =new char [payload_len ]{0};
memcpy(messageBody, ptrPerIO->buf +sizeof(*hdr),payload_len);
cout<<"thread id"<< id <<":"<<messageBody<<endl;
delete [] messageBody;
int remaining = dwTrans- total_pkt_len;
if (remaining > 0){
memmove(ptrPerIO->buf, ptrPerIO->buf + total_pkt_len, remaining );
dwTrans = remaining;
}
else
break;
}
}
WSABUF buf;
buf.buf = ptrPerIO->buf;
buf.len = BUFFER_SIZE;
DWORD dwRecv;
DWORD dwFlag = 0;
::WSARecv(ptrPerHandle->m_sock, &buf, 1, &dwRecv, &dwFlag, &ptrPerIO->m_overlapped, NULL);
}
break;
case OP_WRITE:
case OP_ACCEPT:
break;
}
}
}
};
class boss{
private:
iocpServer * iocpctl;
int id;
public:
boss(iocpServer * _iocpctl, int _id):iocpctl(_iocpctl),id(_id){ }
void operator()(){
this->Boss_Accept_Thread();
}
void Boss_Accept_Thread(){
printf("boss thread %d is running\n", id);
while (true)
{
struct sockaddr_in clientAddr;
int clientAddrLen = sizeof(clientAddr);
SOCKET newClient = accept( iocpctl->myListen, (sockaddr*)&clientAddr, &clientAddrLen);
/*将套接字与完成端口关联*/
PtrPerHandle ptrPerHandle = new PerHandle();
ptrPerHandle->m_sock = newClient;
ptrPerHandle->m_addr = clientAddr;
CreateIoCompletionPort((HANDLE)ptrPerHandle->m_sock, iocpctl->hCompletion, (ULONG_PTR)ptrPerHandle, 0);
/*投放异步重叠IO*/
PtrPerIO ptrPerIO = new PerIO();
ptrPerIO->m_operationType = OP_READ;
WSABUF buf;
buf.buf = ptrPerIO->buf;
buf.len = BUFFER_SIZE;
DWORD dwRecv;
DWORD dwFlag = 0;
::WSARecv(ptrPerHandle->m_sock, &buf, 1, &dwRecv, &dwFlag, &ptrPerIO->m_overlapped, NULL);
}
}
};
public:
iocpServer(int _bosscount,int _workercount,int _port):bosscount(_bosscount),wordercount(_workercount),port(_port){
}
~iocpServer(){}
void sync(){
for(auto it=boss_threads_group.begin();it!=boss_threads_group.end();it++)
(*it).join();
}
void startServer(){
WSADATA wsaData;
if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0)
{
cout << "failed to load winsock !" << endl;
exit(0);
}
// 创建完成端口对象
hCompletion = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);
/*创建服务线程,构建服务线程池*/
for (int i = 0; i < wordercount; i++) {
this->worker_threads_group.push_back(std::thread (worker(this, i)) );
}
/*创建监听套接字,并监听连接*/
myListen= socket(AF_INET, SOCK_STREAM, 0);
struct sockaddr_in localAddr;
localAddr.sin_family = AF_INET;
localAddr.sin_port = ntohs(port);
localAddr.sin_addr.s_addr = htonl(INADDR_ANY);
bind(myListen, (sockaddr*)&localAddr, sizeof(localAddr));
listen(myListen, BACKLOG);
for (int i = 0; i < bosscount; i++)
this->boss_threads_group.push_back(std::thread (boss(this, i)) );
}
private:
int port;
int bosscount;
int wordercount;
HANDLE hCompletion;
SOCKET myListen;
vector< std::thread> boss_threads_group;
vector< std::thread> worker_threads_group;
};
#endif // IOCPSERVER_H创建一个完成端口服务端代码为:
iocpServer server(2,4,9999); server.startServer(); server.sync();
以上创建一个服务端,2个线程用于接受连接,4个线程用于接收消息。
二、IO_URING
与epoll相比,io_uring具备高吞吐低占用的特点,但由于是异步框架,在延迟方面可能不如epoll等同步模型
#ifndef IO_URINGSERVER_H
#define IO_URINGSERVER_H
#include <memory>
#include <mutex>
#include <condition_variable>
#include <thread>
#include <iostream>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <liburing.h>
#include <random>丶
#include <unordered_map>
using namespace std;
constexpr int MAXCONNECTIONS =1024*10 *5;
constexpr int ENTRIES_LENGTH = 1024*4;
constexpr int READ_ENTRIES_LENGTH = 1024*4;
constexpr int BUFFER_LENGTH = 1024;
constexpr int MAX_QUERY_COUNT = 1024;
enum {
READ,
WRITE,
ACCEPT,
};
enum MESSAGETYPE {
ARRAY=1,
CLOSE=2,
STOP=3,
};
struct conninfo {
int connfd;
int type;
size_t recv_len ;
};
typedef struct my_proto_header{
int payload_len;
int total_len;
int msgtype;
} MSGHEADER;
class iouringServer{
class worker{
private:
iouringServer* ioctl;
int id;
bool running=true;
public:
worker(iouringServer* _ioctl, int _id):ioctl(_ioctl),id(_id){}
void operator()(){
this->Worker_Recieve_Thread(id);
}
void Worker_Recieve_Thread( int id){
printf("thread %d start...\n",id);
while(running){
struct io_uring_cqe *cqe;
io_uring_wait_cqe(ioctl->readrings.at(id).get(), &cqe);
struct io_uring_cqe *cqes[MAX_QUERY_COUNT];
int cqecount = io_uring_peek_batch_cqe(ioctl->readrings.at(id).get(), cqes, MAX_QUERY_COUNT);
unsigned count = 0;
vector<int> fds;
for (int i = 0;i < cqecount;i ++) {
cqe = cqes[i];
count ++;
struct conninfo ci;
memcpy(&ci, &cqe->user_data, sizeof(ci));
if (ci.type == READ) {
int bytes_read = cqe->res;
if (bytes_read == 0) {
close(ci.connfd);
printf("client %d closed\n", ci.connfd);
auto it = std::find(fds.begin(), fds.end(), ci.connfd );
if( it!=fds.end())
fds.erase(it);
} else if (bytes_read < 0) {
close(ci.connfd);
auto it = std::find(fds.begin(), fds.end(), ci.connfd );
if( it!=fds.end())
fds.erase(it);
} else {
ci.recv_len = bytes_read;
while(1){
if (ci.recv_len < sizeof(struct my_proto_header)) break;
char *buffer = iouringServer::iouringBuffer[(ci.connfd-1) % (MAXCONNECTIONS-1)];
struct my_proto_header *hdr = (struct my_proto_header*)buffer;
int payload_len=hdr->payload_len ;
int total_pkt_len = hdr->total_len;
if(total_pkt_len==0 || payload_len==0) break;
if (ci.recv_len >= total_pkt_len) {
char* userbuf =new char [payload_len ];
memcpy(userbuf, buffer +sizeof(*hdr),payload_len);
printf("thread %d receive message:%s \n",id,userbuf);
delete [] userbuf;
int remaining = ci.recv_len- total_pkt_len;
if (remaining > 0){
memmove(buffer, buffer + total_pkt_len, remaining );
ci.recv_len = remaining;
if(std::find(fds.begin(),fds.end(),ci.connfd)==fds.end())
fds.push_back(ci.connfd);
}
else
{
if(std::find(fds.begin(),fds.end(),ci.connfd)==fds.end())
fds.push_back(ci.connfd);
break;
}
}
}
memset(iouringServer::iouringBuffer[(ci.connfd-1) % (MAXCONNECTIONS-1)],0,BUFFER_LENGTH);
}
}
}
for(auto it=fds.begin();it<fds.end();it++)
ioctl->set_read_event(ioctl->readrings.at(id).get(), *it , iouringServer::iouringBuffer[(*it-1) % (MAXCONNECTIONS-1)], BUFFER_LENGTH, 0);
fds.erase(fds.begin(), fds.end());
io_uring_cq_advance(ioctl->readrings.at(id).get(), count);
io_uring_submit( ioctl->readrings.at(id).get());
}
}
};
class boss{
private:
int listenfd;
bool running=true;
iouringServer* ioctl;
public:
boss(iouringServer* _ioctl,int _listenfd):ioctl(_ioctl),listenfd(_listenfd){}
void operator()(){
this->Boss_Accept_Thread(listenfd);
}
void Boss_Accept_Thread(int listenfd){
struct io_uring ring;
struct io_uring_params params;
memset(¶ms, 0, sizeof(params));
memset(&ring, 0, sizeof(ring));
/*初始化params 和 ring*/
io_uring_queue_init_params(ENTRIES_LENGTH, &ring, ¶ms);
struct sockaddr_in clientaddr;
socklen_t client= sizeof(clientaddr);
ioctl->set_accept_event(&ring, listenfd, (struct sockaddr*)&clientaddr, &client, 0);
while(running){
struct io_uring_cqe *cqe;
io_uring_submit(&ring);
io_uring_wait_cqe(&ring, &cqe);
struct io_uring_cqe *cqes[MAX_QUERY_COUNT];
int cqecount = io_uring_peek_batch_cqe(&ring, cqes, MAX_QUERY_COUNT);
unsigned count = 0;
for (int i = 0;i < cqecount;i ++) {
cqe = cqes[i];
count ++;
struct conninfo ci;
memcpy(&ci, &cqe->user_data, sizeof(ci));
if (ci.type == ACCEPT) {
int connfd = cqe->res;
char *buffer = iouringServer::iouringBuffer[(connfd-1) % (MAXCONNECTIONS-1)];
int index=ioctl->randomint(ioctl->workercount);
ioctl->set_read_event( ioctl->readrings.at(index).get(), connfd, buffer, BUFFER_LENGTH, 0);
io_uring_submit( ioctl->readrings.at(index).get());
struct sockaddr_in clientaddr;
socklen_t client=sizeof (clientaddr);
ioctl->set_accept_event(&ring, listenfd, (struct sockaddr*)&clientaddr, &client, 0);
} else if (ci.type == READ) {
}
}
io_uring_cq_advance(&ring, count);
}
}
};
public:
iouringServer(int _bosscount,int _wordercount, int serverport): booscount(_bosscount) ,workercount(_wordercount),port(serverport){
}
~iouringServer(){}
void startIOUringServer(){
int listenfd=initServer();
if(listenfd>0){
for(int i=0;i< workercount;i++){
std::shared_ptr<struct io_uring > ring=std::make_shared<struct io_uring >();
io_uring_params param ;
memset( ¶m, 0, sizeof(param));
memset( ring.get(), 0, sizeof(struct io_uring ));
io_uring_queue_init_params(READ_ENTRIES_LENGTH, ring.get(), ¶m );
readrings.push_back(ring);
this->worker_threads_group.push_back(std::thread (worker(this, i)) );
}
for(int i=0;i< booscount;i++)
this->boss_threads_group.push_back(std::thread (boss(this,listenfd) ) );
}
}
void sync(){
for(auto it=boss_threads_group.begin();it!=boss_threads_group.end();it++)
(*it).join();
}
void set_read_event(struct io_uring *ring, int fd, void *buf, size_t len, int flags) {
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
io_uring_prep_recv(sqe, fd, buf, len, flags);
struct conninfo ci = {
.connfd = fd,
.type = READ,
.recv_len=0
};
memcpy(&sqe->user_data, &ci, sizeof(struct conninfo));
return ;
}
void set_write_event(struct io_uring *ring, int fd, const void *buf, size_t len, int flags) {
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
io_uring_prep_send(sqe, fd, buf, len, flags);
struct conninfo ci = {
.connfd = fd,
.type = WRITE,
.recv_len=0
};
memcpy(&sqe->user_data, &ci, sizeof(struct conninfo));
return ;
}
void set_accept_event(struct io_uring *ring, int fd,
struct sockaddr *cliaddr, socklen_t *clilen, unsigned flags) {
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
io_uring_prep_accept(sqe, fd, cliaddr, clilen, flags);
struct conninfo ci = {
.connfd = fd,
.type = ACCEPT ,
.recv_len=0
};
memcpy(&sqe->user_data, &ci, sizeof(struct conninfo));
return ;
}
private:
static char iouringBuffer[MAXCONNECTIONS][BUFFER_LENGTH] ;
int workercount;
int booscount;
vector< std::thread> boss_threads_group;
vector< std::thread> worker_threads_group;
int port;
vector< std::shared_ptr<struct io_uring > >readrings;
int randomint(int UPPER){
std::srand(std::time(0));
int random_num = std::rand() % UPPER;
return random_num;
}
int initServer(){
int listenfd= socket(AF_INET, SOCK_STREAM, 0); //
if (listenfd == -1) return -1;
struct sockaddr_in servaddr ;
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
servaddr.sin_port = htons(port);
if (-1 == bind(listenfd, (struct sockaddr*)&servaddr, sizeof(servaddr)))
return -1;
listen(listenfd, 10);
return listenfd;
}
};
char iouringServer::iouringBuffer[MAXCONNECTIONS][BUFFER_LENGTH]={0};
#endif // IO_URINGSERVER_H由于异步框架是基于流模型,会出现粘包现象,通过定义数据头,告诉服务端数据包的大小: