-
Notifications
You must be signed in to change notification settings - Fork 0
/
ThreadExecutor.h
72 lines (58 loc) · 1.45 KB
/
ThreadExecutor.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
#pragma once
#include <future>
#include <queue>
#include "Executor.h"
/*
* Simple single thread executor. Manages its own thread and executes all scheduled tasks
* on that thread in FIFO order.
*/
class ThreadExecutor : public Executor {
public:
ThreadExecutor() {
std::promise<std::thread::id> threadIdPromise;
auto threadIdFuture = threadIdPromise.get_future();
thread_ = std::thread([&, threadIdPromise = std::move(threadIdPromise)]() mutable {
threadIdPromise.set_value(std::this_thread::get_id());
while (true) {
std::unique_lock<std::mutex> lock(queueMutex_);
while (!stop_ && queue_.empty()) {
queueCV_.wait(lock);
}
while (!queue_.empty()) {
queue_.front()();
queue_.pop();
}
if (stop_) {
break;
}
}
});
threadId_ = threadIdFuture.get();
}
virtual void add(std::function<void()> func) {
std::lock_guard<std::mutex> lg(queueMutex_);
if (stop_) {
throw std::logic_error("Adding a task after thread was stopped");
}
queue_.push(std::move(func));
queueCV_.notify_all();
}
virtual bool isInExecutor() {
return threadId_ == std::this_thread::get_id();
}
void join() {
{
std::lock_guard<std::mutex> lg(queueMutex_);
stop_ = true;
queueCV_.notify_all();
}
thread_.join();
}
private:
bool stop_{ false };
std::mutex queueMutex_;
std::condition_variable queueCV_;
std::queue<std::function<void()>> queue_;
std::thread::id threadId_;
std::thread thread_;
};