ThreadPool 2

Design a ThreadPool model with advanced features.

Previously, I wrote a post that implements a simplified ThreadPool model https://chuzcjoe.github.io/2024/03/24/cpp-threading-pool/

Based on that, in this post, we will designed a more advanced one. The requirements for the new ThreadPool model:

  1. Each thread should have a unique name.
  2. Each thread should have a task queue and can handle certain number of tasks. More incoming tasks will be discarded if the queue is full.
  3. A new task will be queued based on its priority. Tasks with higher priority numbers will be executed first.
  4. A new task should have some metainfo and will be sent to the matching thread for execution.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
#include <iostream>
#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <string>
#include <unordered_map>

struct TaskInfo {
std::string threadName;
int priority;
std::string taskType;

TaskInfo(std::string threadName, int priority, std::string taskType)
: threadName(std::move(threadName)), priority(priority), taskType(std::move(taskType)) {}
};

class Task {
public:
Task(TaskInfo metadata, std::function<void()> func)
: metadata(std::move(metadata)), func(std::move(func)) {}

void execute() { func(); }

const TaskInfo& getMetadata() const { return metadata; }

private:
TaskInfo metadata;
std::function<void()> func;
};

struct CompareTask {
bool operator()(const Task& t1, const Task& t2) {
return t1.getMetadata().priority < t2.getMetadata().priority;
}
};

class Worker {
public:
Worker(std::string name, size_t maxTasks)
: name(std::move(name)), maxTasks(maxTasks), shouldStop(false) {}

void start() {
thread = std::thread(&Worker::run, this);
}

void stop() {
{
std::unique_lock<std::mutex> lock(mutex);
shouldStop = true;
}
condition.notify_one();
if (thread.joinable()) {
thread.join();
}
}

bool runTask(const Task& task) {
std::unique_lock<std::mutex> lock(mutex);
if (tasks.size() >= maxTasks) {
std::cout << "Queue is full on thread: " << getName() << '\n';
return false;
}
tasks.push(task);
condition.notify_one();
return true;
}

const std::string& getName() const {
return name;
}

private:
void run() {
while (true) {
std::unique_lock<std::mutex> lock(mutex);
condition.wait(lock, [this]() { return shouldStop || !tasks.empty(); });
if (shouldStop && tasks.empty()) {
return;
}
auto task = tasks.top();
tasks.pop();
lock.unlock();
task.execute();
}
}

std::string name;
size_t maxTasks;
std::thread thread;
std::priority_queue<Task, std::vector<Task>, CompareTask> tasks;
std::mutex mutex;
std::condition_variable condition;
bool shouldStop;
};

class ThreadPool {
public:
ThreadPool() {}

~ThreadPool() {
for (auto& [_, worker] : workers) {
worker->stop();
}
}

void allocateThread(const std::string& name, size_t maxTasks) {
if (workers.find(name) != workers.end()) {
std::cout << "Thread with this name already exists\n";
return;
}
auto worker = std::make_unique<Worker>(name, maxTasks);
worker->start();
workers[name] = std::move(worker);
}

bool runTask(const Task& task) {
const std::string& threadName = task.getMetadata().threadName;
auto it = workers.find(threadName);
if (it != workers.end()) {
return it->second->runTask(task);
} else {
// If no matching thread, try to assign to any available worker
for (auto& [_, worker] : workers) {
if (worker->runTask(task)) {
return true;
}
}
return false; // All queues are full
}
}

private:
std::unordered_map<std::string, std::unique_ptr<Worker>> workers;
};

// Example usage
int main() {
ThreadPool pool;

// Allocating named threads with max tasks
pool.allocateThread("Thread1", 5);
pool.allocateThread("Thread2", 3);
pool.allocateThread("Thread3", 4);

// Running tasks with different priorities and metadata
pool.runTask(Task(TaskInfo("Thread2", 2, "TypeB"),
[]() { std::cout << "Task executed on thread 2 (priority 2)\n"; }));
pool.runTask(Task(TaskInfo("Thread3", 3, "TypeC"),
[]() { std::cout << "Task executed on thread 3 (priority 3)\n"; }));
pool.runTask(Task(TaskInfo("Thread1", 1, "TypeA"),
[]() { std::cout << "Task executed on thread 1 (priority 1)\n"; }));

// Trying to run a task on a full queue
for (int i = 10; i > 5; --i) {
pool.runTask(Task(TaskInfo("Thread2", i, "TypeB"),
[i]() { std::cout << "Task executed on Thread 2 (priority " << i << ")\n"; }));
}

// Allow tasks to be processed
std::this_thread::sleep_for(std::chrono::seconds(2));

return 0;
}
1
2
3
4
5
6
7
8
Task executed on thread 3 (priority 3)
Task executed on thread 1 (priority 1)
Queue is full on thread: Thread2
Queue is full on thread: Thread2
Queue is full on thread: Thread2
Task executed on Thread 2 (priority 10)
Task executed on Thread 2 (priority 9)
Task executed on thread 2 (priority 2)
Author

Joe Chu

Posted on

2024-09-14

Updated on

2025-01-09

Licensed under

Comments