Logo
ApraPipes 1.0
Loading...
Searching...
No Matches
ThreadSafeQue.h
1#pragma once
2#include <boost/container/deque.hpp>
3#include <boost/thread/mutex.hpp>
4#include <boost/thread/condition.hpp>
5#include <boost/call_traits.hpp>
6
7template <class T>
9{
10public:
11
12 typedef boost::container::deque<T> container_type;
13 typedef typename container_type::size_type size_type;
14 typedef typename container_type::value_type value_type;
15
16 explicit threadsafe_que() : m_unread(0), m_wakeExternally(false) {}
17
18 void push(typename boost::call_traits<value_type>::param_type item)
19 { // `param_type` represents the "best" way to pass a parameter of type `value_type` to a method.
20
21 boost::mutex::scoped_lock lock(m_mutex);
22 m_container.push_front(item);
23 ++m_unread;
24 lock.unlock();
25 m_not_empty.notify_one();
26 }
27
29 boost::mutex::scoped_lock lock(m_mutex);
30 if (is_not_empty())
31 {
32 --m_unread;
33 value_type ret = m_container.back();
34 m_container.pop_back();
35 lock.unlock();
36 return ret;
37 }
38 else {
39 lock.unlock();
40 return value_type();//empty container
41 }
42 }
43
45 boost::mutex::scoped_lock lock(m_mutex);
46 m_not_empty.wait(lock, boost::bind(&threadsafe_que<value_type>::is_not_empty, this));
47 --m_unread;
48 value_type ret = m_container.back();
49 m_container.pop_back();
50 lock.unlock();
51 return ret;
52 }
53
54 void clear() {
55 boost::mutex::scoped_lock lock(m_mutex);
56 m_container.clear();
57 m_unread = 0;
58 lock.unlock();
59 }
60
61 size_t size()
62 {
63 boost::mutex::scoped_lock lock(m_mutex);
64 return m_unread;
65 }
66
68 boost::mutex::scoped_lock lock(m_mutex);
70 if(!is_not_empty()) {
71 lock.unlock();
72 return value_type();//empty container
73 }
74 --m_unread;
75 value_type ret = m_container.back();
76 m_container.pop_back();
77 lock.unlock();
78 return ret;
79 }
80
81 void setWake()
82 {
83 boost::mutex::scoped_lock lock(m_mutex);
84 m_wakeExternally = true; // forever true - now try_pop_external becomes equivalent to try_pop
85 m_not_empty.notify_one();
86 }
87
88private:
89 threadsafe_que(const threadsafe_que&); // Disabled copy constructor.
90 threadsafe_que& operator = (const threadsafe_que&); // Disabled assign operator.
91
92 bool is_not_empty() const { return m_unread > 0; }
93
95 {
96 return m_unread > 0 || m_wakeExternally;
97 }
98
102 boost::mutex m_mutex;
103 boost::condition m_not_empty;
104};
Definition ThreadSafeQue.h:9
value_type try_pop_external()
Definition ThreadSafeQue.h:67
container_type::value_type value_type
Definition ThreadSafeQue.h:14
value_type pop()
Definition ThreadSafeQue.h:44
threadsafe_que()
Definition ThreadSafeQue.h:16
container_type m_container
Definition ThreadSafeQue.h:101
value_type try_pop()
Definition ThreadSafeQue.h:28
boost::mutex m_mutex
Definition ThreadSafeQue.h:102
size_type m_unread
Definition ThreadSafeQue.h:100
container_type::size_type size_type
Definition ThreadSafeQue.h:13
bool is_not_empty() const
Definition ThreadSafeQue.h:92
bool is_not_empty_external() const
Definition ThreadSafeQue.h:94
void push(typename boost::call_traits< value_type >::param_type item)
Definition ThreadSafeQue.h:18
boost::condition m_not_empty
Definition ThreadSafeQue.h:103
size_t size()
Definition ThreadSafeQue.h:61
void setWake()
Definition ThreadSafeQue.h:81
void clear()
Definition ThreadSafeQue.h:54
threadsafe_que & operator=(const threadsafe_que &)
threadsafe_que(const threadsafe_que &)
boost::container::deque< T > container_type
Definition ThreadSafeQue.h:12
bool m_wakeExternally
Definition ThreadSafeQue.h:99