线程池常用于高性能服务器中。为了减小线程创建、销毁时的开销,引入了池式结构。比如:内存池、数据库连接池等等。
学习线程池也只要是因为最近在学网络编程的东西。
线程池结构

从上图可以看到线程池主要包括任务队列、线程数组、管理者线程。为了简单起见,没有加入管理者线程。
调用顺序
//TODO
线程池数据结构设计
线程池结构定义
主要包括线程池的一些属性。线程池状态信息、任务队列信息、互斥锁
1 2 3 4 5 6 7 8 9 10 11 12 13
| struct threadpool_t{ pthread_mutex_t lock; pthread_cond_t notify; pthread_t *threads; threadpool_task_t *queue; int thread_count; int queue_size; int head; int tail; int count; int shutdown; int started; };
|
任务队列结构定义
可以放置多种不同任务函数的函数指针,参数为*void**型
1 2 3 4 5
| typedef struct { void (*function)(void *); void *argument; } threadpool_task_t;
|
其他一些枚举类型
错误码
1 2 3 4 5 6 7 8
| typedef enum { threadpool_invalid = -1, threadpool_lock_failure = -2, threadpool_queue_full = -3, threadpool_shutdown = -4, threadpool_thread_failure = -5 } threadpool_terror_t;
|
线程池关闭方式
1 2 3 4
| typedef enum { immediate_shutdown = 1, graceful_shutdown = 2 } threadpool_shutdown_t;
|
线程池销毁的枚举
1 2 3 4
| typedef enum { threadpool_graceful = 1 } threadpool_destroy_flags_t;
|
线程池API设计
线程池对外开放了三个API:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
|
threadpool_t *threadpool_create(int thread_count, int queue_size, int flags);
int threadpool_add(threadpool_t *pool, void (*routine)(void *),void *arg, int flags);
int threadpool_destroy(threadpool_t *pool, int flags);
|
此外还有一个工作线程未对外开放:
1
| static void *threadpool_thread(void *threadpool);
|
为了避免内存越界:
1
| int threadpool_free(threadpool_t *pool);
|
创建线程池
创建和运行线程池
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
| threadpool_t *threadpool_create(int thread_count, int queue_size, int flags) { if (thread_count <= 0 || thread_count > MAX_THREADS || queue_size <= 0 || queue_size > MAX_QUEUE) { return NULL; } threadpool_t *pool; if ((pool = (threadpool_t *)malloc(sizeof(threadpool_t))) == NULL) { goto err; } pool->thread_count = 0; pool->queue_size = queue_size; pool->head = pool->tail = pool->count = 0; pool->shutdown = pool->started = 0; pool->threads = (pthread_t *)malloc(sizeof(pthread_t) * thread_count); pool->queue = (threadpool_task_t *)malloc(sizeof(threadpool_task_t) * queue_size); if ((pthread_mutex_init(&pool->lock, NULL) != 0) || (pthread_cond_init(&pool->notify, NULL) != 0) || (pool->threads == NULL) || (pool->queue == NULL)) { goto err; } for (size_t i = 0; i < thread_count; i++) { if (pthread_create(&pool->threads[i], NULL, threadpool_thread, (void *)pool) != 0) { threadpool_destroy(pool, 0); return NULL; } pool->thread_count++; pool->started++; } return pool;
err: if (pool) { threadpool_free(pool); } return NULL; }
|
添加任务到线程池
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
| int threadpool_add(threadpool_t *pool, void (*function)(void *), void *argument, int flags) { int err = 0; int next; if (pool == NULL || function == NULL) { return threadpool_invalid; } if (pthread_mutex_lock(&(pool->lock)) != 0) { return threadpool_lock_failure; } next = pool->tail + 1; next = (next == pool->queue_size) ? 0 : next; do { if (pool->count == pool->queue_size) { err = threadpool_queue_full; break; } pool->queue[pool->tail].function = function; pool->queue[pool->tail].argument = argument; pool->tail = next; pool->count += 1; if (pthread_cond_signal(&(pool->notify)) != 0) { err = threadpool_lock_failure; break; } } while (0); if (pthread_mutex_unlock(&(pool->lock)) != 0) { err = threadpool_lock_failure; }
return err; }
|
线程池销毁
线程池的销毁比较简单。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| int threadpool_destroy(threadpool_t *pool, int flags) { int i, err = 0;
if (pool == NULL) return threadpool_invalid; if (pthread_mutex_lock(&(pool->lock)) != 0) return threadpool_lock_failure; do { if (pool->shutdown) { err = threadpool_shutdown; break; } pool->shutdown = (flags & threadpool_graceful) ? graceful_shutdown : immediate_shutdown; if ((pthread_cond_broadcast(&(pool->notify)) != 0) || (pthread_mutex_unlock(&(pool->lock)) != 0)) { err = threadpool_lock_failure; break; } for (size_t i = 0; i < pool->thread_count; i++) { if (pthread_join(pool->threads[i], NULL) != 0) { err = threadpool_thread_failure; } } } while (0);
if (!err) { threadpool_free(pool); }
return err; }
|
释放互斥锁、条件变量等资源
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| int threadpool_free(threadpool_t *pool) { if (pool == NULL || pool->started > 0) { return -1; } if (pool->threads) { free(pool->threads); free(pool->queue); pthread_mutex_lock(&(pool->lock)); pthread_mutex_destroy(&(pool->lock)); pthread_cond_destroy(&(pool->notify)); } free(pool); return 0; }
|
工作线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| static void *threadpool_thread(void *threadpool) { threadpool_t *pool = (threadpool_t *)threadpool; threadpool_task_t task;
for (;;) { pthread_mutex_lock(&(pool->lock)); while ((pool->count == 0) && (!pool->shutdown)) { pthread_cond_wait(&(pool->notify), &(pool->lock)); } if ((pool->shutdown == immediate_shutdown) || ((pool->shutdown == graceful_shutdown) && (pool->count == 0))) { break; } task.function = pool->queue[pool->head].function; task.argument = pool->queue[pool->head].argument; pool->head += 1; pool->head = (pool->head == pool->queue_size) ? 0 : pool->head; pool->count -= 1; pthread_mutex_unlock(&(pool->lock)); (*(task.function))(task.argument); } pool->started--;
pthread_mutex_unlock(&(pool->lock)); pthread_exit(NULL); return NULL; }
|
测试用例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
| #include "threadpool.h" #include <stdio.h> #include <pthread.h> #include <assert.h> #include <unistd.h>
#define THREAD 32 #define QUEUE 256
int tasks = 0, done = 0; pthread_mutex_t lock;
void dummy_task(void *arg) { usleep(10000); pthread_mutex_lock(&lock); done++;
pthread_mutex_unlock(&lock); }
int main(int argc, char **argv) { threadpool_t *pool; pthread_mutex_init(&lock, NULL);
assert((pool = threadpool_create(THREAD, QUEUE, 0)) != NULL); fprintf(stderr, "Pool started with %d threads and queue size of %d\n", THREAD, QUEUE);
while (threadpool_add(pool, &dummy_task, NULL, 0) == 0) { pthread_mutex_lock(&lock); tasks++; pthread_mutex_unlock(&lock); }
fprintf(stderr, "added %d tasks\n", tasks); while ((tasks / 2) > done) { usleep(10000); } assert(threadpool_destroy(pool, 1) == 0); fprintf(stderr, "did %d tasks\n", done);
return 0; }
|
pthread库一些函数
创建线程
1 2 3 4 5 6 7 8 9 10 11
|
int pthread_create(pthread_t *thread,const pthread_attr_t *attr, void *(*start_routine)(void *),void *arg);
|
初始化互斥锁
1
| int pthread_mutex_init(pthread_mutex_t *__mutex, const pthread_mutexattr_t *__mutexattr)
|
初始化条件变量
1
| int pthread_cond_init(pthread_cond_t *__restrict__ __cond, const pthread_condattr_t *__restrict__ __cond_attr)
|
//TODO