Logo
ApraPipes 1.0
Loading...
Searching...
No Matches
BoundBuffer.h
1#pragma once
2#include <boost/container/deque.hpp>
3#include <mutex>
4#include <condition_variable>
5#include <thread>
6#include <functional>
7#include "Logger.h"
8
9template <class T>
11{
12public:
13
14 typedef boost::container::deque<T> container_type;
15 typedef typename container_type::size_type size_type;
16 typedef typename container_type::value_type value_type;
17
18 explicit bounded_buffer(size_type capacity) : m_unread(0), m_capacity(capacity), m_accept(true) {}
19
20 void push(const value_type& item)
21 { // C++11: Using const reference for parameter passing
22
23 std::unique_lock<std::mutex> lock(m_mutex);
24 m_not_full.wait(lock, [this]() { return is_ready_to_accept(); });
25 if (is_not_full() && m_accept)
26 {
27 m_container.push_front(item);
28 ++m_unread;
29 lock.unlock();
30 m_not_empty.notify_one();
31 }
32 else
33 {
34 // check and remove if explicit unlock is required
35 lock.unlock();
36 }
37 }
38
39 void push_back(const value_type& item)
40 { // C++11: Using const reference for parameter passing
41
42 std::unique_lock<std::mutex> lock(m_mutex);
43 bool isCommandQueueNotFull = m_unread < m_capacity * 2;
44 if (m_accept && isCommandQueueNotFull)
45 {
46 LOG_TRACE << "command pushed" << std::endl;
47 m_container.push_back(item);
48 ++m_unread;
49 lock.unlock();
50 m_not_empty.notify_one();
51 }
52 else
53 {
54 // check and remove if explicit unlock is required
55 lock.unlock();
56 }
57 }
58
59 void push_drop_oldest(const value_type& item)
60 {
61 std::unique_lock<std::mutex> lock(m_mutex);
62 if(!m_accept) return; // we are not accepting yet so drop what came in
63
64 if(is_not_full())
65 {
66 ++m_unread;
67 }
68 else // we are full
69 {
70 //note: m_unread does not change in this case.
71 m_container.pop_back();//to drop the oldest
72 }
73 m_container.push_front(item);
74 lock.unlock();
75 m_not_empty.notify_one(); //let them know we have a new sample
76 }
77
78
79 bool try_push(const value_type& item)
80 {
81 std::unique_lock<std::mutex> lock(m_mutex);
82 if (is_not_full() && m_accept)
83 {
84 m_container.push_front(item);
85 ++m_unread;
86 lock.unlock();
87 m_not_empty.notify_one();
88 return true;
89 }
90 else {
91 lock.unlock();
92 return false;
93 }
94 }
95
96 bool isFull() {
97 bool iret = false;
98 std::unique_lock<std::mutex> lock(m_mutex);
99 iret = !is_not_full();
100 lock.unlock();
101 return iret;
102 }
104 std::unique_lock<std::mutex> lock(m_mutex);
105 m_not_empty.wait(lock, [this]() { return is_not_empty(); });
106 --m_unread;
107 value_type ret = m_container.back();
108 m_container.pop_back();
109 lock.unlock();
110 m_not_full.notify_one();
111 return ret;
112 }
113
115 std::unique_lock<std::mutex> lock(m_mutex);
116 if (is_not_empty())
117 {
118 value_type ret = m_container.back();
119 return ret;
120 }
121 }
122
124 std::unique_lock<std::mutex> lock(m_mutex);
125 if (is_not_empty())
126 {
127 --m_unread;
128 value_type ret = m_container.back();
129 m_container.pop_back();
130 lock.unlock();
131 m_not_full.notify_one();
132 return ret;
133 }
134 else {
135 lock.unlock();
136 return value_type();//empty container
137 }
138 }
139 void clear() {
140 std::unique_lock<std::mutex> lock(m_mutex);
141 m_container.clear();
142 m_unread = 0;
143 m_accept = false;
144 m_not_full.notify_one();
145
146 lock.unlock();
147 }
148
149 void flush() {
150 std::unique_lock<std::mutex> lock(m_mutex);
151 m_container.clear();
152 m_unread = 0;
153 m_not_full.notify_one();
154 }
155
156 void accept() {
157 std::unique_lock<std::mutex> lock(m_mutex);
158 m_accept = true;
159 lock.unlock();
160 }
161
162 size_t size()
163 {
164 std::unique_lock<std::mutex> lock(m_mutex);
165 return m_container.size();
166 }
167
168
169private:
170 bounded_buffer(const bounded_buffer&); // Disabled copy constructor.
171 bounded_buffer& operator = (const bounded_buffer&); // Disabled assign operator.
172
173 bool is_not_empty() const { return m_unread > 0; }
174 bool is_not_full() const { return m_unread < m_capacity; }
176 {
177 return ((m_unread < m_capacity) || !m_accept);
178 }
179
184 std::mutex m_mutex;
185 std::condition_variable m_not_empty;
186 std::condition_variable m_not_full;
187
189
191 {
192 m_mutex.lock();
193 }
194
196 {
197 m_mutex.unlock();
198 }
199
200 // to be used by QuePushStrategy Only
202 {
203 m_container.push_front(item);
204 ++m_unread;
205 m_mutex.unlock();
206 m_not_empty.notify_one();
207 }
208};
Definition QuePushStrategy.h:46
Definition BoundBuffer.h:11
container_type::size_type size_type
Definition BoundBuffer.h:15
bool try_push(const value_type &item)
Definition BoundBuffer.h:79
bounded_buffer(const bounded_buffer &)
size_type m_unread
Definition BoundBuffer.h:181
bool is_ready_to_accept() const
Definition BoundBuffer.h:175
void push_drop_oldest(const value_type &item)
Definition BoundBuffer.h:59
std::mutex m_mutex
Definition BoundBuffer.h:184
bool isFull()
Definition BoundBuffer.h:96
size_type m_capacity
Definition BoundBuffer.h:182
std::condition_variable m_not_full
Definition BoundBuffer.h:186
bool is_not_empty() const
Definition BoundBuffer.h:173
void push_back(const value_type &item)
Definition BoundBuffer.h:39
value_type peek()
Definition BoundBuffer.h:114
void push(const value_type &item)
Definition BoundBuffer.h:20
bool is_not_full() const
Definition BoundBuffer.h:174
void clear()
Definition BoundBuffer.h:139
void acquireLock()
Definition BoundBuffer.h:190
value_type try_pop()
Definition BoundBuffer.h:123
void releaseLock()
Definition BoundBuffer.h:195
size_t size()
Definition BoundBuffer.h:162
container_type m_container
Definition BoundBuffer.h:183
std::condition_variable m_not_empty
Definition BoundBuffer.h:185
bounded_buffer(size_type capacity)
Definition BoundBuffer.h:18
container_type::value_type value_type
Definition BoundBuffer.h:16
bool m_accept
Definition BoundBuffer.h:180
value_type pop()
Definition BoundBuffer.h:103
boost::container::deque< T > container_type
Definition BoundBuffer.h:14
bounded_buffer & operator=(const bounded_buffer &)
void flush()
Definition BoundBuffer.h:149
void accept()
Definition BoundBuffer.h:156
void pushUnsafeForQuePushStrategy(const value_type &item)
Definition BoundBuffer.h:201