开始
C++版本的线程池是根据C语言版本修改来的,为了熟悉一下C++的相关语法,所以就有了这篇笔记。记录一下修改的过程中遇到的问题。
本线程池是基于对象的,并不是面向对象。
[线程池简单实现(基于C语言)](Linux C线程池简单实现 | Blog (ethereal14.github.io))
文件说明
| 文件名 |
作用 |
| threadpool.cpp |
线程池成员函数具体实现 |
| threadpool.h |
线程池对象的定义、各种枚举类型 |
| main.cpp |
测试用例 |
| CMakeLists.txt |
使用cmake组织的工程 |
线程池对象设计
基本就是把原来的结构体改成现在的class,把变量定义在private里。把工作线程定义为静态成员函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| class threadpool { private: pthread_mutex_t m_lock; pthread_cond_t m_notify; pthread_t *m_threads; threadpool_task *m_queue;
int m_thread_count; int m_queue_size; int m_head; int m_tail; int m_count; int m_shutdown; int m_started;
private: static void *threadpool_thread(void *arg);
public: threadpool(int thread_count, int queue_size, int flags); ~threadpool();
int threadpool_add(threadpool *pool, void (*function)(void *), void *argument, int flag); int threadpoolexit(threadpool *pool, int flag); };
|
任务队列对象
如上,仅仅把struct改为class
1 2 3 4 5 6 7 8
| class threadpool_task { public: void (*function)(void *); void *argument; threadpool_task(); ~threadpool_task(); };
|
各种枚举类型
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| typedef enum { threadpool_invalid = -1, threadpool_lock_failure = -2, threadpool_queue_full = -3, threadpool_shutdown = -4, threadpool_thread_failure = -5 } threadpool_terror_t;
typedef enum { immediate_shutdown = 1, graceful_shutdown = 2 } threadpool_shutdown_t;
typedef enum { threadpool_graceful = 1 } threadpool_destroy_flags_t;
|
线程池构造函数
- 这里遇到个问题:构造函数是没有返回值的,所以不能把原来的创建线程池函数照搬过来。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| threadpool::threadpool(int thread_count = 32, int queue_size = 256, int flags = 0) : m_queue_size(queue_size) { do { if (thread_count <= 0 || thread_count > MAX_THREADS || queue_size <= 0 || queue_size > MAX_QUEUE) { std::cout << "argument eeror" << std::endl; break; }
this->m_thread_count = 0; this->m_head = this->m_tail = this->m_count = 0; this->m_shutdown = this->m_started = 0;
this->m_threads = new pthread_t[thread_count]; this->m_queue = new threadpool_task[queue_size];
if ((pthread_mutex_init(&this->m_lock, nullptr) != 0) || (pthread_cond_init(&this->m_notify, NULL) != 0) || (this->m_threads == nullptr) || (this->m_queue == nullptr)) { std::cout << "init eeror" << std::endl; break; }
for (size_t i = 0; i < thread_count; i++) { if (pthread_create(&this->m_threads[i], nullptr, threadpool_thread, this) != 0) { std::cout << "create " << i << " thread error" << std::endl; break; } this->m_thread_count++; this->m_started++; }
} while (0); }
|
线程池析构函数
析构函数把一些资源释放了就行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| threadpool::~threadpool() { if (this == NULL || this->m_started > 0) { exit(-1); } if (this->m_threads) { delete[] this->m_threads; delete[] this->m_queue;
pthread_mutex_lock(&(this->m_lock)); pthread_mutex_destroy(&(this->m_lock)); pthread_cond_destroy(&(this->m_notify)); } }
|
以下函数基本没啥变化
添加任务函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| int threadpool::threadpool_add(threadpool *pool, void (*function)(void *), void *argument, int flag) { int err = 0, next;
if (pool == nullptr || function == nullptr) return threadpool_invalid; if (pthread_mutex_lock(&(pool->m_lock)) != 0) { return threadpool_lock_failure; }
next = pool->m_tail + 1;
next = (next == pool->m_queue_size) ? 0 : next;
do { if (pool->m_count == pool->m_queue_size) { err = threadpool_queue_full; break; } pool->m_queue[pool->m_tail].function = function; pool->m_queue[pool->m_tail].argument = argument;
pool->m_tail = next; pool->m_count++;
if (pthread_cond_signal(&(pool->m_notify)) != 0) { err = threadpool_lock_failure; break; }
} while (0);
if (pthread_mutex_unlock(&(pool->m_lock)) != 0) { err = threadpool_lock_failure; }
return err; }
|
线程退出函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| int threadpool::threadpoolexit(threadpool *pool, int flag) { int i, err = 0;
if (pool == nullptr) return threadpool_invalid;
if (pthread_mutex_lock(&(pool->m_lock)) != 0) return threadpool_lock_failure;
do { if (pool->m_shutdown) { err = threadpool_shutdown; break; } pool->m_shutdown = (flag & threadpool_graceful) ? graceful_shutdown : immediate_shutdown;
if ((pthread_cond_broadcast(&(pool->m_notify)) != 0) || (pthread_mutex_unlock(&(pool->m_lock)) != 0)) { err = threadpool_lock_failure; break; }
for (size_t i = 0; i < pool->m_thread_count; i++) { if (pthread_join(pool->m_threads[i], nullptr) != 0) err = threadpool_thread_failure; }
} while (0);
return err; }
|
工作线程函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| void *threadpool::threadpool_thread(void *arg) { threadpool *pool = static_cast<threadpool *>(arg); threadpool_task task;
for (;;) { pthread_mutex_lock(&(pool->m_lock)); while ((pool->m_count == 0) && (!pool->m_shutdown)) { pthread_cond_wait(&(pool->m_notify), &(pool->m_lock)); } if ((pool->m_shutdown == immediate_shutdown) || ((pool->m_shutdown == graceful_shutdown) && (pool->m_count == 0))) { break; }
task.function = pool->m_queue[pool->m_head].function; task.argument = pool->m_queue[pool->m_head].argument;
pool->m_head++; pool->m_head = (pool->m_head == pool->m_queue_size) ? 0 : pool->m_head; pool->m_count--;
pthread_mutex_unlock(&(pool->m_lock));
task.function(task.argument); }
pool->m_started--; pthread_mutex_unlock(&(pool->m_lock)); pthread_exit(nullptr); return nullptr; }
|
最后的测试用例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| #include "threadpool.h" #include <iostream> #include <pthread.h> #include <assert.h> #include <unistd.h>
#define THREAD 32 #define QUEUE 256
int tasks = 0, done = 0; pthread_mutex_t lock;
void dumy_task(void *arg) { usleep(10000); pthread_mutex_lock(&lock);
done++; pthread_mutex_unlock(&lock); }
int main(int, char **) { threadpool *pool = new threadpool(THREAD, QUEUE, 0);
pthread_mutex_init(&lock, nullptr);
std::cout << "pool started with " << THREAD << " threads and queue size of " << QUEUE << std::endl;
while ((pool->threadpool_add(pool, &dumy_task, nullptr, 0)) == 0) { pthread_mutex_lock(&lock); tasks++; pthread_mutex_unlock(&lock); } std::cout << "add " << tasks << " tasks" << std::endl;
while ((tasks / 2) > done) { usleep(10000); } assert(pool->threadpoolexit(pool, 0) == 0); std::cout << "did " << done << " tasks" << std::endl;
delete[] pool;
return 0; }
|
CMakeLists文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| cmake_minimum_required(VERSION 3.0.0) project(threadpool_cpp VERSION 0.1.0)
include(CTest) enable_testing()
add_executable(threadpool_cpp main.cpp threadpool.cpp threadpool.h)
find_package(Threads) target_link_libraries(threadpool_cpp ${CMAKE_THREAD_LIBS_INIT})
set(CPACK_PROJECT_NAME ${PROJECT_NAME}) set(CPACK_PROJECT_VERSION ${PROJECT_VERSION}) include(CPack)
|
测试效果图
