Libthreadar  1.4.0
ratelier_gather.hpp
Go to the documentation of this file.
1 /*********************************************************************/
2 // libthreadar - is a library providing several C++ classes to work with threads
3 // Copyright (C) 2014-2020 Denis Corbin
4 //
5 // This file is part of libthreadar
6 //
7 // libthreadar is free software: you can redistribute it and/or modify
8 // it under the terms of the GNU Lesser General Public License as published by
9 // the Free Software Foundation, either version 3 of the License, or
10 // (at your option) any later version.
11 //
12 // libhtreadar is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // GNU Lesser General Public License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with libthreadar. If not, see <http://www.gnu.org/licenses/>
19 //
20 //----
21 // to contact the author: dar.linux@free.fr
22 /*********************************************************************/
23 
24 #ifndef LIBTHREADAR_RATELIER_GATHER_HPP
25 #define LIBTHREADAR_RATELIER_GATHER_HPP
26 
69 
70 
71 
72 #include "config.h"
73 
74  // C system headers
75 extern "C"
76 {
77 }
78  // C++ standard headers
79 #include <vector>
80 #include <map>
81 #include <deque>
82 #include <memory>
83 
84  // libthreadar headers
85 #include "mutex.hpp"
86 
87 
88 namespace libthreadar
89 {
91 
96 
97  template <class T> class ratelier_gather
98  {
99  public:
100  ratelier_gather(unsigned int size, signed int flag = 0);
101  ratelier_gather(const ratelier_gather & ref) = delete;
102  ratelier_gather(ratelier_gather && ref) = default;
103  ratelier_gather & operator = (const ratelier_gather & ref) = delete;
104  ratelier_gather & operator = (ratelier_gather && ref) noexcept = default;
105  virtual ~ratelier_gather() = default;
106 
108 
116  void worker_push_one(unsigned int slot, std::unique_ptr<T> & one, signed int flag = 0);
117 
119 
124  void gather(std::deque<std::unique_ptr<T> > & ones, std::deque<signed int> & flag);
125 
127  void reset();
128 
129  private:
130 
131  static const unsigned int cond_pending_data = 0;
132  static const unsigned int cond_full = 1;
133 
134  struct slot
135  {
136  std::unique_ptr<T> obj;
137  bool empty;
138  unsigned int index;
139  signed int flag;
140 
141  slot(signed int val) { empty = true; flag = val; };
142  slot(const slot & ref) { obj.reset(); empty = ref.empty; index = ref.index; flag = ref.flag; };
143  };
144 
145  unsigned int next_index;
146  std::vector<slot> table;
147  std::map<unsigned int, unsigned int> corres;
148  std::deque<unsigned int> empty_slot;
149  libthreadar::condition verrou;
150  };
151 
152  template <class T> ratelier_gather<T>::ratelier_gather(unsigned int size, signed int flag):
153  table(size, slot(flag)),
154  verrou(2)
155  {
156  next_index = 0;
157 
158  for(unsigned int i = 0; i < size; ++i)
159  empty_slot.push_back(i);
160  }
161 
162  template <class T> void ratelier_gather<T>::worker_push_one(unsigned int slot, std::unique_ptr<T> & one, signed int flag)
163  {
164  verrou.lock();
165 
166  try
167  {
168  while(empty_slot.empty() // no free slot available
169  || ((empty_slot.size() == 1 && slot != next_index) // one slot available and we do not provide the lowest expecting slot num
170  && corres.begin() != corres.end() && (corres.begin())->first != next_index)) // and lowest slot is still not received
171  verrou.wait(cond_full);
172 
173  std::map<unsigned int, unsigned int>::iterator it = corres.find(slot);
174  unsigned int index;
175 
176  if(it != corres.end())
177  throw exception_range("the ratelier_gather index to fill is already used");
178 
179  index = empty_slot.back();
180 
181  // sanity checks
182 
183  if(index >= table.size())
184  throw THREADAR_BUG;
185  if( ! table[index].empty)
186  throw THREADAR_BUG;
187 
188  // recording the change
189 
190  corres[slot] = index;
191  table[index].obj = std::move(one);
192  table[index].empty = false;
193  table[index].index = slot;
194  table[index].flag = flag;
195 
196  empty_slot.pop_back();
197 
198  if(verrou.get_waiting_thread_count(cond_pending_data) > 0)
199  if(corres.find(next_index) != corres.end()) // some data can be gathered
200  verrou.signal(cond_pending_data); // awaking the gathering thread
201  }
202  catch(...)
203  {
204  verrou.unlock(); // unlock first, as broadcast/signal may be the cause of the exception
205  verrou.broadcast(cond_pending_data);
206  verrou.broadcast(cond_full);
207  throw;
208  }
209  verrou.unlock();
210  }
211 
212  template <class T> void ratelier_gather<T>::gather(std::deque<std::unique_ptr<T> > & ones, std::deque<signed int> & flag)
213  {
214  ones.clear();
215  flag.clear();
216 
217  verrou.lock();
218  try
219  {
220  std::map<unsigned int, unsigned int>::iterator it;
221  std::map<unsigned int, unsigned int>::iterator tmp;
222 
223  do
224  {
225  it = corres.begin();
226 
227  while(it != corres.end())
228  {
229  if(it->first > next_index) // not continuous sequence
230  break; // exiting the inner while loop
231 
232  if(it->first == next_index)
233  {
234 
235  // sanity checks
236 
237  if(it->second >= table.size())
238  throw THREADAR_BUG;
239  if(table[it->second].index != next_index)
240  throw THREADAR_BUG;
241  if(table[it->second].empty)
242  throw THREADAR_BUG;
243  if( ! table[it->second].obj)
244  throw THREADAR_BUG;
245 
246  // recording the change
247 
248  ones.push_back(std::move(table[it->second].obj));
249  flag.push_back(table[it->second].flag);
250 
251  table[it->second].empty = true;
252  empty_slot.push_back(it->second);
253  tmp = it;
254  ++it;
255  corres.erase(tmp);
256  ++next_index;
257  }
258  else // integer overload occured for the index
259  ++it; // skipping this entry
260  }
261 
262  if(ones.empty())
263  verrou.wait(cond_pending_data);
264  }
265  while(ones.empty());
266 
267  if(verrou.get_waiting_thread_count(cond_full) > 0)
268  verrou.broadcast(cond_full); // awake all pending workers
269  }
270  catch(...)
271  {
272  verrou.unlock(); // unlock first, as broadcast() may be the cause of the exception
273  verrou.broadcast(cond_pending_data);
274  verrou.broadcast(cond_full);
275  throw;
276  }
277  verrou.unlock();
278 
279  if(ones.size() != flag.size())
280  throw THREADAR_BUG;
281  }
282 
283  template <class T> void ratelier_gather<T>::reset()
284  {
285  unsigned int size = table.size();
286  next_index = 0;
287  corres.clear();
288  empty_slot.clear();
289 
290  for(unsigned int i = 0; i < size; ++i)
291  {
292  table[i].obj.reset();
293  table[i].empty = true;
294  empty_slot.push_back(i);
295  }
296 
297  verrou.lock();
298  verrou.broadcast(cond_pending_data);
299  verrou.broadcast(cond_full);
300  verrou.unlock();
301  }
302 
303 
304 } // end of namespace
305 
306 #endif
#define THREADAR_BUG
Macro used to throw an exception_bug when execution reach that statement.
Definition: exceptions.hpp:164
defines the mutex C++ class
void worker_push_one(unsigned int slot, std::unique_ptr< T > &one, signed int flag=0)
provides to a worker thread a mean to given data with its associated index to a gathering thread ...
Wrapper around the Posix pthread_cond_t object and its associated mutex.
Definition: condition.hpp:45
void gather(std::deque< std::unique_ptr< T > > &ones, std::deque< signed int > &flag)
obtain the lowest continuous filled slots of the ratelier_gather and free them
void reset()
reset the object in its prestine state
the class ratelier_gather has a fixed length range of slots of arbitrary defined object type ...
This is the only namespace used in libthreadar and all symbols provided by libthreadar are member of ...
Definition: barrier.hpp:45
Exception used to report out or range value or argument.
Definition: exceptions.hpp:206