#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <pthread.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <liburing.h>
#define PORT 8080
#define BUFFER_SIZE 1024
#define QUEUE_DEPTH 256
#define READ_EVENT 0
#define ACCEPT_EVENT 1
#define WRITE_EVENT 2
struct conn_info {
int fd;
int event_type;
char buffer[BUFFER_SIZE];
size_t length;
};
struct io_uring ring;
// 提交一个接受连接的请求
void submit_accept_request(int server_fd, struct sockaddr *client_addr, socklen_t *client_len) {
struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);
struct conn_info *conn_i = malloc(sizeof(*conn_i));
conn_i->event_type = ACCEPT_EVENT;
io_uring_prep_accept(sqe, server_fd, client_addr, client_len, 0);
io_uring_sqe_set_data(sqe, conn_i);
}
// 提交一个读取数据的请求
void submit_read_request(int fd) {
struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);
struct conn_info *conn_i = malloc(sizeof(*conn_i));
conn_i->event_type = READ_EVENT;
conn_i->fd = fd;
io_uring_prep_recv(sqe, fd, conn_i->buffer, BUFFER_SIZE, 0);
io_uring_sqe_set_data(sqe, conn_i);
}
// 提交一个写入数据的请求
void submit_write_request(struct conn_info *conn_i) {
struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);
conn_i->event_type = WRITE_EVENT;
io_uring_prep_send(sqe, conn_i->fd, conn_i->buffer, conn_i->length, 0);
io_uring_sqe_set_data(sqe, conn_i);
}
void *worker_thread(void *arg) {
struct io_uring_cqe *cqe;
struct sockaddr_in client_addr;
socklen_t client_len = sizeof(client_addr);
int server_fd = *(int *)arg;
// 提交初始accept请求
submit_accept_request(server_fd, (struct sockaddr *)&client_addr, &client_len);
io_uring_submit(&ring);
while (1) {
int ret = io_uring_wait_cqe(&ring, &cqe);
if (ret < 0) {
perror("io_uring_wait_cqe");
continue;
}
struct conn_info *conn_i = io_uring_cqe_get_data(cqe);
if (conn_i->event_type == ACCEPT_EVENT) {
int client_fd = cqe->res;
if (client_fd >= 0) {
printf("New client connected\n");
// 为新客户端提交读取请求
submit_read_request(client_fd);
// 继续接受新连接
submit_accept_request(server_fd, (struct sockaddr *)&client_addr, &client_len);
} else {
fprintf(stderr, "Accept failed: %s\n", strerror(-client_fd));
}
free(conn_i);
}
else if (conn_i->event_type == READ_EVENT) {
int bytes_read = cqe->res;
if (bytes_read <= 0) {
// 客户端断开或错误
close(conn_i->fd);
free(conn_i);
} else {
// 有数据,提交写入请求(回显)
conn_i->length = bytes_read;
submit_write_request(conn_i);
}
}
else if (conn_i->event_type == WRITE_EVENT) {
int bytes_written = cqe->res;
if (bytes_written <= 0) {
// 写入失败
close(conn_i->fd);
free(conn_i);
} else {
// 写入成功,继续读取
int fd = conn_i->fd;
free(conn_i);
submit_read_request(fd);
}
}
io_uring_cqe_seen(&ring, cqe);
io_uring_submit(&ring);
}
return NULL;
}
int main() {
int server_fd;
struct sockaddr_in address;
// 创建socket
if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0) {
perror("Socket creation failed");
exit(EXIT_FAILURE);
}
// 设置socket选项
int opt = 1;
if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt))) {
perror("Setsockopt failed");
exit(EXIT_FAILURE);
}
// 绑定地址和端口
address.sin_family = AF_INET;
address.sin_addr.s_addr = INADDR_ANY;
address.sin_port = htons(PORT);
if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0) {
perror("Bind failed");
exit(EXIT_FAILURE);
}
// 监听连接
if (listen(server_fd, 10) < 0) {
perror("Listen failed");
exit(EXIT_FAILURE);
}
printf("Server listening on port %d\n", PORT);
// 初始化io_uring
io_uring_queue_init(QUEUE_DEPTH, &ring, 0);
// 创建工作线程
pthread_t worker_ids[4]; // 例如4个线程
for (int i = 0; i < 4; i++) {
if (pthread_create(&worker_ids[i], NULL, worker_thread, &server_fd) != 0) {
perror("Worker thread creation failed");
exit(EXIT_FAILURE);
}
}
// 等待工作线程(实际上不会到达这里)
for (int i = 0; i < 4; i++) {
pthread_join(worker_ids[i], NULL);
}
io_uring_queue_exit(&ring);
close(server_fd);
return 0;
}