Linux C 语言实现 TCP 服务器线程池

以下是一个简单的基于 Linux 的 C 语言 TCP 服务器线程池实现,实现了一个简单的多线程 echo 服务器。

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <pthread.h>
#include <signal.h>

#define THREAD_POOL_SIZE 10 // 线程池大小
#define BUF_SIZE 1024 // 缓冲区大小

typedef struct {
    int fd; // 客户端 socket 文件描述符
    struct sockaddr_in addr; // 客户端地址
} client_t;

// 线程池任务
typedef struct {
    void (*func)(void *arg); // 任务函数指针
    void *arg; // 任务参数
} task_t;

// 线程池
typedef struct {
    pthread_t *threads; // 线程数组
    int thread_count; // 线程数量
    task_t *tasks; // 任务队列
    int task_count; // 任务数量
    int head; // 任务队列头
    int tail; // 任务队列尾
    int shutdown; // 是否关闭线程池
    int started; // 是否已经启动线程
    pthread_mutex_t lock; // 互斥锁
    pthread_cond_t not_empty; // 非空条件变量
    pthread_cond_t not_full; // 非满条件变量
} thread_pool_t;

// 线程池任务执行函数
void *thread_func(void *arg) {
    thread_pool_t *pool = (thread_pool_t *)arg;

    while (1) {
        pthread_mutex_lock(&pool->lock);

        // 等待任务队列非空或线程池关闭
        while (pool->task_count == 0 && !pool->shutdown) {
            pthread_cond_wait(&pool->not_empty, &pool->lock);
        }

        // 如果线程池关闭,则退出
        if (pool->shutdown) {
            pthread_mutex_unlock(&pool->lock);
            pthread_exit(NULL);
        }

        // 取出一个任务
        task_t task = pool->tasks[pool->head];
        pool->head = (pool->head + 1) % pool->thread_count;
        pool->task_count--;

        // 通知线程池非满
        pthread_cond_signal(&pool->not_full);

        pthread_mutex_unlock(&pool->lock);

        // 执行任务
        task.func(task.arg);
    }

    return NULL;
}

// 初始化线程池
void thread_pool_init(thread_pool_t *pool, int thread_count) {
    pool->thread_count = thread_count;
    pool->threads = (pthread_t *)malloc(sizeof(pthread_t) * thread_count);
    pool->tasks = (task_t *)malloc(sizeof(task_t) * thread_count);
    pool->task_count = 0;
    pool->head = 0;
    pool->tail = 0;
    pool->shutdown = 0;
    pool->started = 0;
    pthread_mutex_init(&pool->lock, NULL);
    pthread_cond_init(&pool->not_empty, NULL);
    pthread_cond_init(&pool->not_full, NULL);

    // 创建线程
    for (int i = 0; i < thread_count; i++) {
        pthread_create(&pool->threads[i], NULL, thread_func, pool);
    }
}

// 销毁线程池
void thread_pool_destroy(thread_pool_t *pool) {
    if (pool->shutdown) {
        return;
    }

    pool->shutdown = 1;

    // 唤醒所有等待任务的线程
    pthread_cond_broadcast(&pool->not_empty);

    // 等待所有线程退出
    for (int i = 0; i < pool->thread_count; i++) {
        pthread_join(pool->threads[i], NULL);
    }

    free(pool->threads);
    free(pool->tasks);

    pthread_mutex_destroy(&pool->lock);
    pthread_cond_destroy(&pool->not_empty);
    pthread_cond_destroy(&pool->not_full);
}

// 添加任务到线程池
void thread_pool_add_task(thread_pool_t *pool, void (*func)(void *), void *arg) {
    pthread_mutex_lock(&pool->lock);

    // 等待任务队列非满
    while (pool->task_count == pool->thread_count && !pool->shutdown) {
        pthread_cond_wait(&pool->not_full, &pool->lock);
    }

    // 如果线程池关闭,则不再添加任务
    if (pool->shutdown) {
        pthread_mutex_unlock(&pool->lock);
        return;
    }

    // 添加任务到队列尾
    pool->tasks[pool->tail].func = func;
    pool->tasks[pool->tail].arg = arg;
    pool->tail = (pool->tail + 1) % pool->thread_count;
    pool->task_count++;

    // 通知线程池非空
    pthread_cond_signal(&pool->not_empty);

    pthread_mutex_unlock(&pool->lock);
}

// 处理客户端请求
void handle_client_request(void *arg) {
    client_t *client = (client_t *)arg;
    int fd = client->fd;

    char buf[BUF_SIZE];
    int n;

    while ((n = read(fd, buf, BUF_SIZE)) > 0) {
        // 处理客户端请求,这里简单的将客户端发送的消息原样返回
        write(fd, buf, n);
    }

    close(fd);
    free(client);
}

// 信号处理函数,用于关闭服务器
void signal_handler(int signum) {
    printf('Received signal %d, shutting down server...
', signum);
    exit(0);
}

int main(int argc, char *argv[]) {
    signal(SIGINT, signal_handler); // 注册信号处理函数

    int listen_fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);

    struct sockaddr_in server_addr;
    memset(&server_addr, 0, sizeof(server_addr));
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
    server_addr.sin_port = htons(8888);

    bind(listen_fd, (struct sockaddr *)&server_addr, sizeof(server_addr));

    listen(listen_fd, 10);

    thread_pool_t pool;
    thread_pool_init(&pool, THREAD_POOL_SIZE);

    while (1) {
        struct sockaddr_in client_addr;
        socklen_t client_addr_len = sizeof(client_addr);
        int client_fd = accept(listen_fd, (struct sockaddr *)&client_addr, &client_addr_len);

        client_t *client = (client_t *)malloc(sizeof(client_t));
        client->fd = client_fd;
        client->addr = client_addr;

        thread_pool_add_task(&pool, handle_client_request, client);
    }

    thread_pool_destroy(&pool);

    return 0;
}

该服务器使用了一个线程池来处理客户端请求,服务器启动后会等待客户端连接,一旦有连接就会将处理客户端请求的函数添加到线程池中,由线程池中的线程来执行这个函数。该服务器还实现了一个信号处理函数,用于在接收到 SIGINT 信号时关闭服务器。

Linux C 语言实现 TCP 服务器线程池

原文地址: https://www.cveoy.top/t/topic/oOUV 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录