DropsBrowse Pastes
Login with GitHub

非阻塞

March 28th, 2025Views: 15(0 unique)C
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <pthread.h>
#include <sys/socket.h>
#include <netinet/in.h>

#define PORT 8080
#define BUFFER_SIZE 1024
#define MAX_CLIENTS 100

typedef struct {
    int client_fds[MAX_CLIENTS];
    int count;
} client_pool_t;

client_pool_t client_pool = {.count = 0};
pthread_mutex_t pool_mutex = PTHREAD_MUTEX_INITIALIZER;

// 设置socket为非阻塞模式
void set_nonblocking(int fd) {
    int flags = fcntl(fd, F_GETFL, 0);
    fcntl(fd, F_SETFL, flags | O_NONBLOCK);
}

void *worker_thread(void *arg) {
    char buffer[BUFFER_SIZE];
    
    while (1) {
        pthread_mutex_lock(&pool_mutex);
        
        // 遍历所有客户端,尝试非阻塞读取
        for (int i = 0; i < client_pool.count; i++) {
            int fd = client_pool.client_fds[i];
            
            int bytes_read = read(fd, buffer, BUFFER_SIZE);
            
            if (bytes_read > 0) {
                // 有数据,回显
                write(fd, buffer, bytes_read);
            } 
            else if (bytes_read == 0 || (bytes_read < 0 && errno != EAGAIN && errno != EWOULDBLOCK)) {
                // 客户端断开或错误
                close(fd);
                // 从池中移除
                client_pool.client_fds[i] = client_pool.client_fds[client_pool.count - 1];
                client_pool.count--;
                i--; // 调整索引以免跳过移动的客户端
            }
            // 对于EAGAIN/EWOULDBLOCK,表示暂无数据,继续下一个客户端
        }
        
        pthread_mutex_unlock(&pool_mutex);
        usleep(1000); // 短暂休眠以避免CPU占用过高
    }
    
    return NULL;
}

int main() {
    int server_fd;
    struct sockaddr_in address;
    int addrlen = sizeof(address);
    
    // 创建socket
    if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0) {
        perror("Socket creation failed");
        exit(EXIT_FAILURE);
    }
    
    // 设置socket选项
    int opt = 1;
    if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt))) {
        perror("Setsockopt failed");
        exit(EXIT_FAILURE);
    }
    
    // 设置为非阻塞
    set_nonblocking(server_fd);
    
    // 绑定地址和端口
    address.sin_family = AF_INET;
    address.sin_addr.s_addr = INADDR_ANY;
    address.sin_port = htons(PORT);
    
    if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0) {
        perror("Bind failed");
        exit(EXIT_FAILURE);
    }
    
    // 监听连接
    if (listen(server_fd, 10) < 0) {
        perror("Listen failed");
        exit(EXIT_FAILURE);
    }
    
    printf("Server listening on port %d\n", PORT);
    
    // 创建工作线程
    pthread_t worker_id;
    if (pthread_create(&worker_id, NULL, worker_thread, NULL) != 0) {
        perror("Worker thread creation failed");
        exit(EXIT_FAILURE);
    }
    
    // 主线程负责接受新连接
    while (1) {
        int client_fd = accept(server_fd, (struct sockaddr *)&address, (socklen_t*)&addrlen);
        
        if (client_fd < 0) {
            if (errno == EAGAIN || errno == EWOULDBLOCK) {
                // 没有新连接,继续尝试
                usleep(1000);
                continue;
            } else {
                perror("Accept failed");
                continue;
            }
        }
        
        // 设置客户端socket为非阻塞
        set_nonblocking(client_fd);
        
        // 添加到客户端池
        pthread_mutex_lock(&pool_mutex);
        if (client_pool.count < MAX_CLIENTS) {
            client_pool.client_fds[client_pool.count++] = client_fd;
            printf("New client connected. Total: %d\n", client_pool.count);
        } else {
            printf("Too many clients. Connection rejected.\n");
            close(client_fd);
        }
        pthread_mutex_unlock(&pool_mutex);
    }
    
    return 0;
}