-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathtimer.hpp
executable file
·327 lines (275 loc) · 11.6 KB
/
timer.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
#pragma once
#include <atomic>
#include <cassert>
#include <chrono>
#include <cmath>
#include <condition_variable>
#include <ctime>
#include <functional>
#include <iostream>
#include <limits>
#include <memory>
#include <mutex>
#include <thread>
#include <utility>
#include <vector>
namespace bzh {
using namespace std::chrono_literals;
//智能指针形式 std::vector 无内存泄露
class TimerTask {
public:
explicit TimerTask(std::function<void()> function) : taskFun(std::move(function)){};
~TimerTask() = default;
/**
* 取消任务
*/
bool cancel() {
std::unique_lock<std::mutex> lock(mtx); //互斥锁
bool result = (state == State::SCHEDULED);
state = State::CANCELLED;
return result;
}
/**
* 获取这个任务计划执行的时间
*/
long scheduledExecutionTime() {
std::unique_lock<std::mutex> lock(mtx); //互斥锁
return period < 0 ? nextExecutionTime + period : nextExecutionTime - period;
}
private:
friend class Timer;
enum class State {
VIRGIN = 0, //这个任务尚未安排
SCHEDULED = 1, //此任务计划执行。如果它是非重复任务,它尚未执行
EXECUTED = 2, //此非重复任务已执行(或当前正在执行)且尚未取消
CANCELLED = 3 //此任务已取消
};
State state = State::VIRGIN; //任务的状态
long long nextExecutionTime = 0; //下一次任务的执行时间
std::mutex mtx;
long long period = 0; //重复任务的时间段(以毫秒为单位)
std::function<void()> taskFun; //要执行得任务实现
};
class Timer final {
public:
static Timer& getInstanse() {
static Timer timer;
return timer;
}
~Timer() {
std::unique_lock<std::mutex> lock(mainMtx);
isWaitChildThread = true;
mainCv.wait(lock);
mainCv.wait(lock, [this]() { return taskQueue.isEmpty(); });
};
template <typename _Rep_Delay, typename _Period_Delay, typename _Rep_Period = long long, typename _Period_Period = std::milli,
typename Task>
void schedule(Task&& task, const std::chrono::duration<_Rep_Delay, _Period_Delay>& delay,
const std::chrono::duration<_Rep_Period, _Period_Period>& period = std::chrono::milliseconds{0}) {
static_assert(std::disjunction_v<std::is_invocable<std::decay_t<Task>>, std::is_same<std::decay_t<Task>, TimerTask*>>,
"参数错误");
std::chrono::milliseconds delay_ms = std::chrono::duration_cast<std::chrono::milliseconds>(delay);
std::chrono::milliseconds period_ms = std::chrono::duration_cast<std::chrono::milliseconds>(period);
if (delay_ms.count() < 0) {
std::cerr << "Negative delay." << std::endl;
return;
}
if (period_ms.count() < 0) {
std::cerr << "Non-positive period." << std::endl;
return;
}
if constexpr (std::is_same_v<std::decay_t<Task>, TimerTask*>) {
sched(std::shared_ptr<TimerTask>(task), currentTimeMillis() + delay_ms.count(), -period_ms.count());
} else {
sched(std::make_shared<TimerTask>(task), currentTimeMillis() + delay_ms.count(), -period_ms.count());
}
}
void cancel() {
std::unique_lock<std::mutex> queueLock(mtx);
newTasksMayBeScheduled = false;
taskQueue.clear();
cv.notify_all();
}
int purge() {
int result = 0;
std::unique_lock<std::mutex> queueLock(mtx);
for (int i = taskQueue.getSize(); i > 0; i--) {
if (taskQueue.get(i)->state == TimerTask::State::CANCELLED) {
taskQueue.quickRemove(i);
result++;
}
}
if (result != 0)
taskQueue.heapify();
return result;
}
//指定时间
static long long getTime(int year, int month, int day, int hour = 0, int minute = 0, int second = 0) {
std::tm time{};
time.tm_year = year - 1900;
time.tm_mon = month - 1;
time.tm_mday = day;
time.tm_hour = hour;
time.tm_min = minute;
time.tm_sec = second;
time.tm_isdst = -1;
std::time_t tt = std::mktime(&time);
return tt * 1000 - currentTimeMillis();
}
private:
Timer() {
std::thread thread([&] { mainLoop(); });
thread.detach();
};
Timer(const Timer&) = delete;
Timer& operator=(const Timer&) = delete;
class TaskQueue {
public:
TaskQueue() = default;
int getSize() { return queue.size(); }
void add(std::shared_ptr<TimerTask> timerTask) {
queue.push_back(timerTask);
fixUp();
}
std::shared_ptr<TimerTask> getMin() { return queue[0]; }
std::shared_ptr<TimerTask> get(int i) { return queue[i]; }
void removeMin() {
quickRemove(0);
fixDown(0);
}
/**
* 快速删除
*/
void quickRemove(int i) {
if (queue.size() != 1) {
std::swap(queue[i], queue[queue.size() - 1]);
}
queue.erase(std::cend(queue) - 1);
}
/**
* 重新排列
*/
void rescheduleMin(long long newTime) {
queue[0]->nextExecutionTime = newTime;
fixDown(0);
}
bool isEmpty() { return queue.empty(); }
void clear() { queue.clear(); }
void heapify() {
for (auto i = queue.size() / 2; i >= 1; i--)
fixDown(i);
}
private:
std::vector<std::shared_ptr<TimerTask>> queue; //任务容器
//把时间最小的放在第一位
void fixUp() {
std::size_t k = queue.size() - 1;
while (k > 0) {
std::size_t j = k / 2;
if (queue[j]->nextExecutionTime <= queue[k]->nextExecutionTime)
break;
std::swap(queue[j], queue[k]);
k = j;
}
}
//把时间最小的放在第一位
void fixDown(int k) {
if (queue.empty() || queue.size() == 1) {
return;
}
int j = 0;
while ((j = k * 2) <= queue.size() - 1) {
if (j <= static_cast<int>(queue.size() - 2) && queue[j]->nextExecutionTime > queue[j + 1]->nextExecutionTime) {
j++; // j indexes smallest kid
}
if (queue[k]->nextExecutionTime <= queue[j]->nextExecutionTime) {
break;
}
std::swap(queue[j], queue[k]);
k = j;
}
}
};
void mainLoop() {
while (true) {
std::unique_lock<std::mutex> mainLock(mainMtx);
std::shared_ptr<TimerTask> task;
bool taskFired; //任务被解除 false等待 true执行任务
std::unique_lock<std::mutex> queueLock(mtx);
if (!newTasksMayBeScheduled) { //取消任务
mainCv.notify_all();
break;
}
if (taskQueue.isEmpty() && isWaitChildThread) {
mainCv.notify_all();
break;
}
if (taskQueue.isEmpty()) {
mainCv.notify_all();
continue;
}
long long currentTime = 0;
long long executionTime = 0;
task = taskQueue.getMin();
std::unique_lock<std::mutex> taskLock(task->mtx);
if (task->state == TimerTask::State::CANCELLED) {
taskQueue.removeMin();
continue; // No action required, poll queue again
}
currentTime = currentTimeMillis();
executionTime = task->nextExecutionTime;
taskFired = (executionTime <= currentTime);
if (taskFired) {
if (task->period == 0) { // Non-repeating, remove 不是循环任务 移除掉
taskQueue.removeMin();
task->state = TimerTask::State::EXECUTED;
} else { // Repeating task, reschedule
taskQueue.rescheduleMin(task->period < 0 ? (currentTime - task->period) : (executionTime + task->period));
}
}
taskLock.unlock();
if (!taskFired) // Task hasn't yet fired; wait
cv.wait_for(queueLock, std::chrono::milliseconds(executionTime - currentTime));
queueLock.unlock();
if (taskFired) { // Task fired; run it, holding no locks
task->taskFun();
}
mainCv.notify_all();
} //结束子线程
}
//获取当前系统时间
static long long currentTimeMillis() {
auto time = std::chrono::system_clock::now();
return std::chrono::duration_cast<std::chrono::milliseconds>(time.time_since_epoch()).count();
}
void sched(std::shared_ptr<TimerTask> task, long long time, long long period) {
if (time < 0)
std::cerr << "Illegal execution time." << std::endl;
// Constrain value of period sufficiently to prevent numeric
// overflow while still being effectively infinitely large.
//绝对值
if (std::llabs(period) > (std::numeric_limits<long long>::max() / 2))
period /= 2;
std::unique_lock<std::mutex> queueLock(mtx);
if (!newTasksMayBeScheduled)
std::cerr << "Timer already cancelled." << std::endl;
std::unique_lock<std::mutex> taskLock(task->mtx);
if (task->state != TimerTask::State::VIRGIN)
std::cerr << "Task already scheduled or cancelled" << std::endl;
task->nextExecutionTime = time;
task->period = period;
task->state = TimerTask::State::SCHEDULED;
taskLock.unlock();
taskQueue.add(task);
if (taskQueue.getMin().get() == task.get())
cv.notify_one();
}
TaskQueue taskQueue; //任务队列
std::mutex mtx;
std::mutex mainMtx; //阻塞主线程
std::condition_variable mainCv; //阻塞主线程
std::condition_variable cv;
bool newTasksMayBeScheduled = true; //可以安排新任务
std::atomic<bool> isWaitChildThread = false; // main线程是否需要等待子线程
};
}; // namespace bzh