Logo
ApraPipes 1.0
Loading...
Searching...
No Matches
ThreadSafeQue.h
1#pragma once
2#include <boost/container/deque.hpp>
3#include <mutex>
4#include <condition_variable>
5
6template <class T>
8{
9public:
10
11 typedef boost::container::deque<T> container_type;
12 typedef typename container_type::size_type size_type;
13 typedef typename container_type::value_type value_type;
14
15 explicit threadsafe_que() : m_unread(0), m_wakeExternally(false) {}
16
17 void push(const value_type& item)
18 { // `param_type` represents the "best" way to pass a parameter of type `value_type` to a method.
19
20 std::unique_lock<std::mutex> lock(m_mutex);
21 m_container.push_front(item);
22 ++m_unread;
23 lock.unlock();
24 m_not_empty.notify_one();
25 }
26
28 std::unique_lock<std::mutex> lock(m_mutex);
29 if (is_not_empty())
30 {
31 --m_unread;
32 value_type ret = m_container.back();
33 m_container.pop_back();
34 lock.unlock();
35 return ret;
36 }
37 else {
38 lock.unlock();
39 return value_type();//empty container
40 }
41 }
42
44 std::unique_lock<std::mutex> lock(m_mutex);
45 m_not_empty.wait(lock, [this]() { return is_not_empty(); });
46 --m_unread;
47 value_type ret = m_container.back();
48 m_container.pop_back();
49 lock.unlock();
50 return ret;
51 }
52
53 void clear() {
54 std::unique_lock<std::mutex> lock(m_mutex);
55 m_container.clear();
56 m_unread = 0;
57 lock.unlock();
58 }
59
60 size_t size()
61 {
62 std::unique_lock<std::mutex> lock(m_mutex);
63 return m_unread;
64 }
65
67 std::unique_lock<std::mutex> lock(m_mutex);
68 m_not_empty.wait(lock, [this]() { return is_not_empty_external(); });
69 if(!is_not_empty()) {
70 lock.unlock();
71 return value_type();//empty container
72 }
73 --m_unread;
74 value_type ret = m_container.back();
75 m_container.pop_back();
76 lock.unlock();
77 return ret;
78 }
79
80 void setWake()
81 {
82 std::unique_lock<std::mutex> lock(m_mutex);
83 m_wakeExternally = true; // forever true - now try_pop_external becomes equivalent to try_pop
84 m_not_empty.notify_one();
85 }
86
87private:
88 threadsafe_que(const threadsafe_que&); // Disabled copy constructor.
89 threadsafe_que& operator = (const threadsafe_que&); // Disabled assign operator.
90
91 bool is_not_empty() const { return m_unread > 0; }
92
94 {
95 return m_unread > 0 || m_wakeExternally;
96 }
97
101 std::mutex m_mutex;
102 std::condition_variable m_not_empty;
103};
Definition ThreadSafeQue.h:8
std::mutex m_mutex
Definition ThreadSafeQue.h:101
value_type try_pop_external()
Definition ThreadSafeQue.h:66
container_type::value_type value_type
Definition ThreadSafeQue.h:13
value_type pop()
Definition ThreadSafeQue.h:43
threadsafe_que()
Definition ThreadSafeQue.h:15
void push(const value_type &item)
Definition ThreadSafeQue.h:17
container_type m_container
Definition ThreadSafeQue.h:100
value_type try_pop()
Definition ThreadSafeQue.h:27
size_type m_unread
Definition ThreadSafeQue.h:99
container_type::size_type size_type
Definition ThreadSafeQue.h:12
bool is_not_empty() const
Definition ThreadSafeQue.h:91
bool is_not_empty_external() const
Definition ThreadSafeQue.h:93
std::condition_variable m_not_empty
Definition ThreadSafeQue.h:102
size_t size()
Definition ThreadSafeQue.h:60
void setWake()
Definition ThreadSafeQue.h:80
void clear()
Definition ThreadSafeQue.h:53
threadsafe_que & operator=(const threadsafe_que &)
threadsafe_que(const threadsafe_que &)
boost::container::deque< T > container_type
Definition ThreadSafeQue.h:11
bool m_wakeExternally
Definition ThreadSafeQue.h:98