forked from asiniscalchi/eosio_sql_plugin
-
Notifications
You must be signed in to change notification settings - Fork 1
/
consumer.h
74 lines (59 loc) · 1.31 KB
/
consumer.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
/**
* @file
* @copyright defined in eos/LICENSE.txt
*/
#pragma once
#include <thread>
#include <atomic>
#include <vector>
#include <boost/noncopyable.hpp>
#include <fc/log/logger.hpp>
#include "consumer_core.h"
#include "fifo.h"
namespace eosio {
template<typename T>
class consumer final : public boost::noncopyable
{
public:
consumer(std::unique_ptr<consumer_core<T>> core);
~consumer();
void push(const T& element);
private:
void run();
fifo<T> m_fifo;
std::unique_ptr<consumer_core<T>> m_core;
std::atomic<bool> m_exit;
std::unique_ptr<std::thread> m_thread;
};
template<typename T>
consumer<T>::consumer(std::unique_ptr<consumer_core<T> > core):
m_fifo(fifo<T>::behavior::blocking),
m_core(std::move(core)),
m_exit(false),
m_thread(std::make_unique<std::thread>([&]{this->run();}))
{
}
template<typename T>
consumer<T>::~consumer()
{
m_fifo.set_behavior(fifo<T>::behavior::not_blocking);
m_exit = true;
m_thread->join();
}
template<typename T>
void consumer<T>::push(const T& element)
{
m_fifo.push(element);
}
template<typename T>
void consumer<T>::run()
{
dlog("Consumer thread Start");
while (!m_exit)
{
auto elements = m_fifo.pop_all();
m_core->consume(elements);
}
dlog("Consumer thread End");
}
} // namespace