DropsBrowse Pastes
Login with GitHub

异步

March 28th, 2025Views: 17(1 unique)C
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <pthread.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <liburing.h>

#define PORT 8080
#define BUFFER_SIZE 1024
#define QUEUE_DEPTH 256
#define READ_EVENT 0
#define ACCEPT_EVENT 1
#define WRITE_EVENT 2

struct conn_info {
    int fd;
    int event_type;
    char buffer[BUFFER_SIZE];
    size_t length;
};

struct io_uring ring;

// 提交一个接受连接的请求
void submit_accept_request(int server_fd, struct sockaddr *client_addr, socklen_t *client_len) {
    struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);
    struct conn_info *conn_i = malloc(sizeof(*conn_i));
    
    conn_i->event_type = ACCEPT_EVENT;
    
    io_uring_prep_accept(sqe, server_fd, client_addr, client_len, 0);
    io_uring_sqe_set_data(sqe, conn_i);
}

// 提交一个读取数据的请求
void submit_read_request(int fd) {
    struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);
    struct conn_info *conn_i = malloc(sizeof(*conn_i));
    
    conn_i->event_type = READ_EVENT;
    conn_i->fd = fd;
    
    io_uring_prep_recv(sqe, fd, conn_i->buffer, BUFFER_SIZE, 0);
    io_uring_sqe_set_data(sqe, conn_i);
}

// 提交一个写入数据的请求
void submit_write_request(struct conn_info *conn_i) {
    struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);
    
    conn_i->event_type = WRITE_EVENT;
    
    io_uring_prep_send(sqe, conn_i->fd, conn_i->buffer, conn_i->length, 0);
    io_uring_sqe_set_data(sqe, conn_i);
}

void *worker_thread(void *arg) {
    struct io_uring_cqe *cqe;
    struct sockaddr_in client_addr;
    socklen_t client_len = sizeof(client_addr);
    int server_fd = *(int *)arg;
    
    // 提交初始accept请求
    submit_accept_request(server_fd, (struct sockaddr *)&client_addr, &client_len);
    io_uring_submit(&ring);
    
    while (1) {
        int ret = io_uring_wait_cqe(&ring, &cqe);
        if (ret < 0) {
            perror("io_uring_wait_cqe");
            continue;
        }
        
        struct conn_info *conn_i = io_uring_cqe_get_data(cqe);
        
        if (conn_i->event_type == ACCEPT_EVENT) {
            int client_fd = cqe->res;
            
            if (client_fd >= 0) {
                printf("New client connected\n");
                
                // 为新客户端提交读取请求
                submit_read_request(client_fd);
                
                // 继续接受新连接
                submit_accept_request(server_fd, (struct sockaddr *)&client_addr, &client_len);
            } else {
                fprintf(stderr, "Accept failed: %s\n", strerror(-client_fd));
            }
            
            free(conn_i);
        } 
        else if (conn_i->event_type == READ_EVENT) {
            int bytes_read = cqe->res;
            
            if (bytes_read <= 0) {
                // 客户端断开或错误
                close(conn_i->fd);
                free(conn_i);
            } else {
                // 有数据,提交写入请求(回显)
                conn_i->length = bytes_read;
                submit_write_request(conn_i);
            }
        } 
        else if (conn_i->event_type == WRITE_EVENT) {
            int bytes_written = cqe->res;
            
            if (bytes_written <= 0) {
                // 写入失败
                close(conn_i->fd);
                free(conn_i);
            } else {
                // 写入成功,继续读取
                int fd = conn_i->fd;
                free(conn_i);
                submit_read_request(fd);
            }
        }
        
        io_uring_cqe_seen(&ring, cqe);
        io_uring_submit(&ring);
    }
    
    return NULL;
}

int main() {
    int server_fd;
    struct sockaddr_in address;
    
    // 创建socket
    if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0) {
        perror("Socket creation failed");
        exit(EXIT_FAILURE);
    }
    
    // 设置socket选项
    int opt = 1;
    if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt))) {
        perror("Setsockopt failed");
        exit(EXIT_FAILURE);
    }
    
    // 绑定地址和端口
    address.sin_family = AF_INET;
    address.sin_addr.s_addr = INADDR_ANY;
    address.sin_port = htons(PORT);
    
    if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0) {
        perror("Bind failed");
        exit(EXIT_FAILURE);
    }
    
    // 监听连接
    if (listen(server_fd, 10) < 0) {
        perror("Listen failed");
        exit(EXIT_FAILURE);
    }
    
    printf("Server listening on port %d\n", PORT);
    
    // 初始化io_uring
    io_uring_queue_init(QUEUE_DEPTH, &ring, 0);
    
    // 创建工作线程
    pthread_t worker_ids[4]; // 例如4个线程
    for (int i = 0; i < 4; i++) {
        if (pthread_create(&worker_ids[i], NULL, worker_thread, &server_fd) != 0) {
            perror("Worker thread creation failed");
            exit(EXIT_FAILURE);
        }
    }
    
    // 等待工作线程(实际上不会到达这里)
    for (int i = 0; i < 4; i++) {
        pthread_join(worker_ids[i], NULL);
    }
    
    io_uring_queue_exit(&ring);
    close(server_fd);
    
    return 0;
}