0%

线程池实现-C++版本

开始

C++版本的线程池是根据C语言版本修改来的,为了熟悉一下C++的相关语法,所以就有了这篇笔记。记录一下修改的过程中遇到的问题。

本线程池是基于对象的,并不是面向对象

[线程池简单实现(基于C语言)](Linux C线程池简单实现 | Blog (ethereal14.github.io))

  • 也不知道会不会有内存泄露的问题,这些事以后再说

文件说明

文件名 作用
threadpool.cpp 线程池成员函数具体实现
threadpool.h 线程池对象的定义、各种枚举类型
main.cpp 测试用例
CMakeLists.txt 使用cmake组织的工程

线程池对象设计

基本就是把原来的结构体改成现在的class,把变量定义在private里。把工作线程定义为静态成员函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class threadpool
{
private:
pthread_mutex_t m_lock;
pthread_cond_t m_notify;
pthread_t *m_threads;
threadpool_task *m_queue;

int m_thread_count;
int m_queue_size;
int m_head;
int m_tail;
int m_count;
int m_shutdown;
int m_started;

private:
static void *threadpool_thread(void *arg);

public:
threadpool(int thread_count, int queue_size, int flags);
~threadpool();

int threadpool_add(threadpool *pool, void (*function)(void *), void *argument, int flag);
int threadpoolexit(threadpool *pool, int flag);
};

任务队列对象

如上,仅仅把struct改为class

1
2
3
4
5
6
7
8
class threadpool_task
{
public:
void (*function)(void *);
void *argument;
threadpool_task();
~threadpool_task();
};

各种枚举类型

  • 不知道能不能把这些枚举类型放在class里边,以后有空研究
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
typedef enum
{
threadpool_invalid = -1,
threadpool_lock_failure = -2,
threadpool_queue_full = -3,
threadpool_shutdown = -4,
threadpool_thread_failure = -5
} threadpool_terror_t;

typedef enum
{
immediate_shutdown = 1, //自动关闭
graceful_shutdown = 2 //立即关闭
} threadpool_shutdown_t;

typedef enum
{
threadpool_graceful = 1
} threadpool_destroy_flags_t;

线程池构造函数

  • 这里遇到个问题:构造函数是没有返回值的,所以不能把原来的创建线程池函数照搬过来。
    • 使用do while结构来进行构造
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
threadpool::threadpool(int thread_count = 32, int queue_size = 256, int flags = 0) : m_queue_size(queue_size)
{
do
{
if (thread_count <= 0 || thread_count > MAX_THREADS || queue_size <= 0 || queue_size > MAX_QUEUE)
{
std::cout << "argument eeror" << std::endl;
break;
}

this->m_thread_count = 0;
//this->m_queue_size = queue_size;
this->m_head = this->m_tail = this->m_count = 0;
this->m_shutdown = this->m_started = 0;

this->m_threads = new pthread_t[thread_count];
this->m_queue = new threadpool_task[queue_size];

if ((pthread_mutex_init(&this->m_lock, nullptr) != 0) ||
(pthread_cond_init(&this->m_notify, NULL) != 0) ||
(this->m_threads == nullptr) || (this->m_queue == nullptr))
{
std::cout << "init eeror" << std::endl;
break;
}

for (size_t i = 0; i < thread_count; i++)
{
if (pthread_create(&this->m_threads[i], nullptr, threadpool_thread, this) != 0)
{
std::cout << "create " << i << " thread error" << std::endl;
break;
}
this->m_thread_count++;
this->m_started++;
}

} while (0);
}

线程池析构函数

析构函数把一些资源释放了就行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
threadpool::~threadpool()
{
if (this == NULL || this->m_started > 0)
{
exit(-1);
}
if (this->m_threads)
{
delete[] this->m_threads;
delete[] this->m_queue;

pthread_mutex_lock(&(this->m_lock));
pthread_mutex_destroy(&(this->m_lock));
pthread_cond_destroy(&(this->m_notify));
}
//delete[] this;
}

以下函数基本没啥变化

添加任务函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
int threadpool::threadpool_add(threadpool *pool, void (*function)(void *), void *argument, int flag)
{
int err = 0, next;

if (pool == nullptr || function == nullptr)
return threadpool_invalid;
if (pthread_mutex_lock(&(pool->m_lock)) != 0)
{
return threadpool_lock_failure;
}

next = pool->m_tail + 1;

next = (next == pool->m_queue_size) ? 0 : next;

do
{
if (pool->m_count == pool->m_queue_size)
{
err = threadpool_queue_full;
break;
}
pool->m_queue[pool->m_tail].function = function;
pool->m_queue[pool->m_tail].argument = argument;

pool->m_tail = next;
pool->m_count++;

if (pthread_cond_signal(&(pool->m_notify)) != 0)
{
err = threadpool_lock_failure;
break;
}

} while (0);

if (pthread_mutex_unlock(&(pool->m_lock)) != 0)
{
err = threadpool_lock_failure;
}

return err;
}

线程退出函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
int threadpool::threadpoolexit(threadpool *pool, int flag)
{
int i, err = 0;

if (pool == nullptr)
return threadpool_invalid;

if (pthread_mutex_lock(&(pool->m_lock)) != 0)
return threadpool_lock_failure;

do
{
if (pool->m_shutdown)
{
err = threadpool_shutdown;
break;
}
pool->m_shutdown = (flag & threadpool_graceful) ? graceful_shutdown : immediate_shutdown;

if ((pthread_cond_broadcast(&(pool->m_notify)) != 0) || (pthread_mutex_unlock(&(pool->m_lock)) != 0))
{
err = threadpool_lock_failure;
break;
}

for (size_t i = 0; i < pool->m_thread_count; i++)
{
if (pthread_join(pool->m_threads[i], nullptr) != 0)
err = threadpool_thread_failure;
}

} while (0);

return err;
}

工作线程函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
void *threadpool::threadpool_thread(void *arg)
{
threadpool *pool = static_cast<threadpool *>(arg);
threadpool_task task;

for (;;)
{
pthread_mutex_lock(&(pool->m_lock));
while ((pool->m_count == 0) && (!pool->m_shutdown))
{
pthread_cond_wait(&(pool->m_notify), &(pool->m_lock));
}
if ((pool->m_shutdown == immediate_shutdown) || ((pool->m_shutdown == graceful_shutdown) && (pool->m_count == 0)))
{
break;
}

task.function = pool->m_queue[pool->m_head].function;
task.argument = pool->m_queue[pool->m_head].argument;

pool->m_head++;
pool->m_head = (pool->m_head == pool->m_queue_size) ? 0 : pool->m_head;
pool->m_count--;

pthread_mutex_unlock(&(pool->m_lock));

// (*(task.function))(task.argument);

task.function(task.argument);
}

pool->m_started--;
pthread_mutex_unlock(&(pool->m_lock));
pthread_exit(nullptr);
return nullptr;
}

最后的测试用例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
#include "threadpool.h"
#include <iostream>
#include <pthread.h>
#include <assert.h>
#include <unistd.h>

#define THREAD 32
#define QUEUE 256

int tasks = 0, done = 0;
pthread_mutex_t lock;

void dumy_task(void *arg)
{
usleep(10000);
pthread_mutex_lock(&lock);

done++;
pthread_mutex_unlock(&lock);
}

int main(int, char **)
{
threadpool *pool = new threadpool(THREAD, QUEUE, 0);

pthread_mutex_init(&lock, nullptr);

std::cout << "pool started with " << THREAD << " threads and queue size of " << QUEUE << std::endl;

while ((pool->threadpool_add(pool, &dumy_task, nullptr, 0)) == 0)
{
pthread_mutex_lock(&lock);
tasks++;
pthread_mutex_unlock(&lock);
}
std::cout << "add " << tasks << " tasks" << std::endl;

while ((tasks / 2) > done)
{
usleep(10000);
}
//pool->threadpoolexit(pool, 0) == 0;
assert(pool->threadpoolexit(pool, 0) == 0);
std::cout << "did " << done << " tasks" << std::endl;

delete[] pool;

return 0;
}

CMakeLists文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
cmake_minimum_required(VERSION 3.0.0)
project(threadpool_cpp VERSION 0.1.0)

include(CTest)
enable_testing()

add_executable(threadpool_cpp main.cpp threadpool.cpp threadpool.h)

find_package(Threads)
target_link_libraries(threadpool_cpp ${CMAKE_THREAD_LIBS_INIT})

set(CPACK_PROJECT_NAME ${PROJECT_NAME})
set(CPACK_PROJECT_VERSION ${PROJECT_VERSION})
include(CPack)

测试效果图

image-20210607203945887

-------------THE END-------------

欢迎关注我的其它发布渠道