#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <pthread.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <netinet/in.h>
#define PORT 8080
#define BUFFER_SIZE 1024
#define MAX_EVENTS 64
// 设置socket为非阻塞模式
void set_nonblocking(int fd) {
int flags = fcntl(fd, F_GETFL, 0);
fcntl(fd, F_SETFL, flags | O_NONBLOCK);
}
void *worker_thread(void *arg) {
int epoll_fd = *(int *)arg;
struct epoll_event events[MAX_EVENTS];
char buffer[BUFFER_SIZE];
while (1) {
// epoll_wait是阻塞调用,等待事件发生
int n_events = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
for (int i = 0; i < n_events; i++) {
int fd = events[i].data.fd;
if (events[i].events & EPOLLIN) {
int bytes_read = read(fd, buffer, BUFFER_SIZE);
if (bytes_read > 0) {
// 有数据,回显
write(fd, buffer, bytes_read);
}
else if (bytes_read == 0 || (bytes_read < 0 && errno != EAGAIN)) {
// 客户端断开或错误
printf("Client disconnected\n");
epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, NULL);
close(fd);
}
}
}
}
return NULL;
}
int main() {
int server_fd;
struct sockaddr_in address;
int addrlen = sizeof(address);
// 创建socket
if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0) {
perror("Socket creation failed");
exit(EXIT_FAILURE);
}
// 设置socket选项
int opt = 1;
if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt))) {
perror("Setsockopt failed");
exit(EXIT_FAILURE);
}
// 设置为非阻塞
set_nonblocking(server_fd);
// 绑定地址和端口
address.sin_family = AF_INET;
address.sin_addr.s_addr = INADDR_ANY;
address.sin_port = htons(PORT);
if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0) {
perror("Bind failed");
exit(EXIT_FAILURE);
}
// 监听连接
if (listen(server_fd, 10) < 0) {
perror("Listen failed");
exit(EXIT_FAILURE);
}
printf("Server listening on port %d\n", PORT);
// 创建epoll实例
int epoll_fd = epoll_create1(0);
if (epoll_fd == -1) {
perror("Epoll creation failed");
exit(EXIT_FAILURE);
}
// 添加服务器socket到epoll
struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.fd = server_fd;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server_fd, &ev) == -1) {
perror("Failed to add server socket to epoll");
exit(EXIT_FAILURE);
}
// 创建多个工作线程
pthread_t worker_ids[4]; // 例如4个线程
for (int i = 0; i < 4; i++) {
if (pthread_create(&worker_ids[i], NULL, worker_thread, &epoll_fd) != 0) {
perror("Worker thread creation failed");
exit(EXIT_FAILURE);
}
}
// 主线程负责接受新连接
struct epoll_event events[MAX_EVENTS];
while (1) {
int n_events = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
for (int i = 0; i < n_events; i++) {
if (events[i].data.fd == server_fd) {
// 新连接到达
int client_fd;
while ((client_fd = accept(server_fd, (struct sockaddr *)&address, (socklen_t*)&addrlen)) > 0) {
set_nonblocking(client_fd);
// 添加到epoll
struct epoll_event ev;
ev.events = EPOLLIN | EPOLLET; // 边缘触发
ev.data.fd = client_fd;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, client_fd, &ev) == -1) {
perror("Failed to add client to epoll");
close(client_fd);
} else {
printf("New client connected\n");
}
}
if (client_fd == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
perror("Accept failed");
}
}
}
}
return 0;
}