Logo
ApraPipes 1.0
Loading...
Searching...
No Matches
BoundBuffer.h
1#pragma once
2#include <boost/container/deque.hpp>
3#include <boost/thread/mutex.hpp>
4#include <boost/thread/condition.hpp>
5#include <boost/thread/thread.hpp>
6#include <boost/call_traits.hpp>
7#include <boost/bind/bind.hpp>
8#include "Logger.h"
9
10using namespace boost::placeholders;
11
12template <class T>
14{
15public:
16
17 typedef boost::container::deque<T> container_type;
18 typedef typename container_type::size_type size_type;
19 typedef typename container_type::value_type value_type;
20 typedef typename boost::call_traits<value_type>::param_type param_type;
21
22 explicit bounded_buffer(size_type capacity) : m_unread(0), m_capacity(capacity), m_accept(true) {}
23
24 void push(typename boost::call_traits<value_type>::param_type item)
25 { // `param_type` represents the "best" way to pass a parameter of type `value_type` to a method.
26
27 boost::mutex::scoped_lock lock(m_mutex);
28 m_not_full.wait(lock, boost::bind(&bounded_buffer<value_type>::is_ready_to_accept, this));
29 if (is_not_full() && m_accept)
30 {
31 m_container.push_front(item);
32 ++m_unread;
33 lock.unlock();
34 m_not_empty.notify_one();
35 }
36 else
37 {
38 // check and remove if explicit unlock is required
39 lock.unlock();
40 }
41 }
42
43 void push_back(typename boost::call_traits<value_type>::param_type item)
44 { // `param_type` represents the "best" way to pass a parameter of type `value_type` to a method.
45
46 boost::mutex::scoped_lock lock(m_mutex);
47 bool isCommandQueueNotFull = m_unread < m_capacity * 2;
48 if (m_accept && isCommandQueueNotFull)
49 {
50 LOG_TRACE << "command pushed" << std::endl;
51 m_container.push_back(item);
52 ++m_unread;
53 lock.unlock();
54 m_not_empty.notify_one();
55 }
56 else
57 {
58 // check and remove if explicit unlock is required
59 lock.unlock();
60 }
61 }
62
63 void push_drop_oldest(typename boost::call_traits<value_type>::param_type item)
64 {
65 boost::mutex::scoped_lock lock(m_mutex);
66 if(!m_accept) return; // we are not accepting yet so drop what came in
67
68 if(is_not_full())
69 {
70 ++m_unread;
71 }
72 else // we are full
73 {
74 //note: m_unread does not change in this case.
75 m_container.pop_back();//to drop the oldest
76 }
77 m_container.push_front(item);
78 lock.unlock();
79 m_not_empty.notify_one(); //let them know we have a new sample
80 }
81
82
83 bool try_push(typename boost::call_traits<value_type>::param_type item)
84 {
85 boost::mutex::scoped_lock lock(m_mutex);
86 if (is_not_full() && m_accept)
87 {
88 m_container.push_front(item);
89 ++m_unread;
90 lock.unlock();
91 m_not_empty.notify_one();
92 return true;
93 }
94 else {
95 lock.unlock();
96 return false;
97 }
98 }
99
100 bool isFull() {
101 bool iret = false;
102 boost::mutex::scoped_lock lock(m_mutex);
103 iret = !is_not_full();
104 lock.unlock();
105 return iret;
106 }
108 boost::mutex::scoped_lock lock(m_mutex);
109 m_not_empty.wait(lock, boost::bind(&bounded_buffer<value_type>::is_not_empty, this));
110 --m_unread;
111 value_type ret = m_container.back();
112 m_container.pop_back();
113 lock.unlock();
114 m_not_full.notify_one();
115 return ret;
116 }
117
119 boost::mutex::scoped_lock lock(m_mutex);
120 if (is_not_empty())
121 {
122 value_type ret = m_container.back();
123 return ret;
124 }
125 }
126
128 boost::mutex::scoped_lock lock(m_mutex);
129 if (is_not_empty())
130 {
131 --m_unread;
132 value_type ret = m_container.back();
133 m_container.pop_back();
134 lock.unlock();
135 m_not_full.notify_one();
136 return ret;
137 }
138 else {
139 lock.unlock();
140 return value_type();//empty container
141 }
142 }
143 void clear() {
144 boost::mutex::scoped_lock lock(m_mutex);
145 m_container.clear();
146 m_unread = 0;
147 m_accept = false;
148 m_not_full.notify_one();
149
150 lock.unlock();
151 }
152
153 void flush() {
154 boost::mutex::scoped_lock lock(m_mutex);
155 m_container.clear();
156 m_unread = 0;
157 m_not_full.notify_one();
158 }
159
160 void accept() {
161 boost::mutex::scoped_lock lock(m_mutex);
162 m_accept = true;
163 lock.unlock();
164 }
165
166 size_t size()
167 {
168 boost::mutex::scoped_lock lock(m_mutex);
169 return m_container.size();
170 }
171
172
173private:
174 bounded_buffer(const bounded_buffer&); // Disabled copy constructor.
175 bounded_buffer& operator = (const bounded_buffer&); // Disabled assign operator.
176
177 bool is_not_empty() const { return m_unread > 0; }
178 bool is_not_full() const { return m_unread < m_capacity; }
180 {
181 return ((m_unread < m_capacity) || !m_accept);
182 }
183
188 boost::mutex m_mutex;
189 boost::condition m_not_empty;
190 boost::condition m_not_full;
191
193
195 {
196 m_mutex.lock();
197 }
198
200 {
201 m_mutex.unlock();
202 }
203
204 // to be used by QuePushStrategy Only
205 void pushUnsafeForQuePushStrategy(typename boost::call_traits<value_type>::param_type item)
206 {
207 m_container.push_front(item);
208 ++m_unread;
209 m_mutex.unlock();
210 m_not_empty.notify_one();
211 }
212};
Definition QuePushStrategy.h:46
Definition BoundBuffer.h:14
container_type::size_type size_type
Definition BoundBuffer.h:18
boost::mutex m_mutex
Definition BoundBuffer.h:188
bounded_buffer(const bounded_buffer &)
size_type m_unread
Definition BoundBuffer.h:185
boost::condition m_not_empty
Definition BoundBuffer.h:189
bool is_ready_to_accept() const
Definition BoundBuffer.h:179
bool isFull()
Definition BoundBuffer.h:100
size_type m_capacity
Definition BoundBuffer.h:186
bool is_not_empty() const
Definition BoundBuffer.h:177
boost::call_traits< value_type >::param_type param_type
Definition BoundBuffer.h:20
value_type peek()
Definition BoundBuffer.h:118
boost::condition m_not_full
Definition BoundBuffer.h:190
bool is_not_full() const
Definition BoundBuffer.h:178
void clear()
Definition BoundBuffer.h:143
void acquireLock()
Definition BoundBuffer.h:194
bool try_push(typename boost::call_traits< value_type >::param_type item)
Definition BoundBuffer.h:83
value_type try_pop()
Definition BoundBuffer.h:127
void releaseLock()
Definition BoundBuffer.h:199
size_t size()
Definition BoundBuffer.h:166
container_type m_container
Definition BoundBuffer.h:187
void push_back(typename boost::call_traits< value_type >::param_type item)
Definition BoundBuffer.h:43
bounded_buffer(size_type capacity)
Definition BoundBuffer.h:22
container_type::value_type value_type
Definition BoundBuffer.h:19
bool m_accept
Definition BoundBuffer.h:184
void push_drop_oldest(typename boost::call_traits< value_type >::param_type item)
Definition BoundBuffer.h:63
value_type pop()
Definition BoundBuffer.h:107
void push(typename boost::call_traits< value_type >::param_type item)
Definition BoundBuffer.h:24
boost::container::deque< T > container_type
Definition BoundBuffer.h:17
bounded_buffer & operator=(const bounded_buffer &)
void flush()
Definition BoundBuffer.h:153
void pushUnsafeForQuePushStrategy(typename boost::call_traits< value_type >::param_type item)
Definition BoundBuffer.h:205
void accept()
Definition BoundBuffer.h:160