Libthreadar  1.4.0
ratelier_scatter.hpp
Go to the documentation of this file.
1 /*********************************************************************/
2 // libthreadar - is a library providing several C++ classes to work with threads
3 // Copyright (C) 2014-2020 Denis Corbin
4 //
5 // This file is part of libthreadar
6 //
7 // libthreadar is free software: you can redistribute it and/or modify
8 // it under the terms of the GNU Lesser General Public License as published by
9 // the Free Software Foundation, either version 3 of the License, or
10 // (at your option) any later version.
11 //
12 // libhtreadar is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // GNU Lesser General Public License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with libthreadar. If not, see <http://www.gnu.org/licenses/>
19 //
20 //----
21 // to contact the author: dar.linux@free.fr
22 /*********************************************************************/
23 
24 #ifndef LIBTHREADAR_RATELIER_SCATTER_HPP
25 #define LIBTHREADAR_RATELIER_SCATTER_HPP
26 
33 
34 #include "config.h"
35 
36  // C system headers
37 extern "C"
38 {
39 }
40  // C++ standard headers
41 #include <vector>
42 #include <map>
43 #include <deque>
44 #include <memory>
45 
46  // libthreadar headers
47 #include "mutex.hpp"
48 
49 
50 namespace libthreadar
51 {
53 
62 
63  template <class T> class ratelier_scatter
64  {
65  public:
66  ratelier_scatter(unsigned int size, signed int flag = 0);
67  ratelier_scatter(const ratelier_scatter & ref) = delete;
68  ratelier_scatter(ratelier_scatter && ref) = default;
69  ratelier_scatter & operator = (const ratelier_scatter & ref) = delete;
70  ratelier_scatter & operator = (ratelier_scatter && ref) noexcept = default;
71  virtual ~ratelier_scatter() = default;
72 
81  void scatter(std::unique_ptr<T> & one, signed int flag = 0);
82 
90  std::unique_ptr<T> worker_get_one(unsigned int & slot, signed int & flag);
91 
93  void reset();
94 
95  private:
96 
97  static const unsigned int cond_empty = 0;
98  static const unsigned int cond_full = 1;
99 
100  struct slot
101  {
102  std::unique_ptr<T> obj;
103  bool empty;
104  unsigned int index;
105  signed int flag;
106 
107  slot(signed int val) { empty = true; flag = val; };
108  slot(const slot & ref) { obj.reset(); empty = ref.empty; index = ref.index; flag = ref.flag; };
109  };
110 
111  unsigned int next_index;
112  unsigned int lowest_index;
113  std::vector<slot> table;
114  std::map<unsigned int, unsigned int> corres;
115  std::deque<unsigned int> empty_slot;
116  libthreadar::condition verrou;
117  };
118 
119  template <class T> ratelier_scatter<T>::ratelier_scatter(unsigned int size, signed int flag):
120  table(size, slot(flag)),
121  verrou(2)
122  {
123  next_index = 0;
124  lowest_index = 0;
125 
126  for(unsigned int i = 0; i < size; ++i)
127  empty_slot.push_back(i);
128  }
129 
130  template <class T> void ratelier_scatter<T>::scatter(std::unique_ptr<T> & one, signed int flag)
131  {
132  unsigned int tableindex;
133 
134  verrou.lock();
135  try
136  {
137  while(empty_slot.empty()) // ratelier_scatter is full
138  verrou.wait(cond_full);
139 
140  tableindex = empty_slot.back();
141 
142  // sanity checks
143 
144  if(tableindex >= table.size())
145  throw THREADAR_BUG;
146  if( ! table[tableindex].empty)
147  throw THREADAR_BUG;
148 
149  // recording the change
150 
151  table[tableindex].empty = false;
152  table[tableindex].obj = std::move(one);
153  table[tableindex].index = next_index;
154  table[tableindex].flag = flag;
155 
156  corres[next_index] = tableindex;
157  ++next_index;
158 
159  empty_slot.pop_back();
160  if(verrou.get_waiting_thread_count(cond_empty) > 0)
161  verrou.signal(cond_empty);
162  }
163  catch(...)
164  {
165  verrou.unlock();
166  verrou.broadcast(cond_empty);
167  verrou.broadcast(cond_full);
168  throw;
169  }
170  verrou.unlock();
171  }
172 
173  template <class T> std::unique_ptr<T> ratelier_scatter<T>::worker_get_one(unsigned int & slot, signed int & flag)
174  {
175  std::unique_ptr<T> ret;
176 
177  verrou.lock();
178  try
179  {
180  std::map<unsigned int, unsigned int>::iterator it = corres.begin();
181  // using sequential reading provides sorted scanning
182  // of the map, looking first for the lowest index available (oldest entries)
183 
184  do
185  {
186  if(it != corres.end())
187  {
188  if(it->first < lowest_index) // overflooding occured
189  ++it; // ignoring this slot
190  else
191  {
192 
193  // sanity checks
194 
195  if(it->second >= table.size())
196  throw THREADAR_BUG;
197  if(table[it->second].empty)
198  throw THREADAR_BUG;
199  if( ! table[it->second].obj)
200  throw THREADAR_BUG;
201 
202  // recording the change
203 
204  ret = std::move(table[it->second].obj);
205  slot = table[it->second].index;
206  flag = table[it->second].flag;
207  table[it->second].empty = true;
208 
209  if(lowest_index != slot)
210  throw THREADAR_BUG;
211  ++lowest_index;
212 
213  // reusing quicker the last block used
214  // as the back() be used first
215  empty_slot.push_back(it->second);
216  corres.erase(it); // removing the correspondance
217 
218  if(verrou.get_waiting_thread_count(cond_full) > 0)
219  verrou.signal(cond_full);
220  }
221  }
222  else
223  {
224  // ratelier_scatter is empty
225 
226  verrou.wait(cond_empty);
227  it = corres.begin();
228  }
229  }
230  while( ! ret);
231  }
232  catch(...)
233  {
234  verrou.unlock();
235  verrou.broadcast(cond_empty);
236  verrou.broadcast(cond_full);
237  throw;
238  }
239  verrou.unlock();
240 
241  return ret;
242  }
243 
244  template <class T> void ratelier_scatter<T>::reset()
245  {
246  unsigned int size = table.size();
247  next_index = 0;
248  lowest_index = 0;
249  corres.clear();
250  empty_slot.clear();
251 
252  for(unsigned int i = 0; i < size; ++i)
253  {
254  table[i].obj.reset();
255  table[i].empty = true;
256  empty_slot.push_back(i);
257  }
258 
259  verrou.lock();
260  verrou.broadcast(cond_empty);
261  verrou.broadcast(cond_full);
262  verrou.unlock();
263  }
264 
265 } // end of namespace
266 
267 #endif
#define THREADAR_BUG
Macro used to throw an exception_bug when execution reach that statement.
Definition: exceptions.hpp:164
the class ratelier_scatter has a fixed length range of slots of arbitrary defined object type ...
defines the mutex C++ class
Wrapper around the Posix pthread_cond_t object and its associated mutex.
Definition: condition.hpp:45
void reset()
reset the object in its prestine state
std::unique_ptr< T > worker_get_one(unsigned int &slot, signed int &flag)
void scatter(std::unique_ptr< T > &one, signed int flag=0)
This is the only namespace used in libthreadar and all symbols provided by libthreadar are member of ...
Definition: barrier.hpp:45