Libthreadar  1.4.0
tampon.hpp
Go to the documentation of this file.
1 /*********************************************************************/
2 // libthreadar - is a library providing several C++ classes to work with threads
3 // Copyright (C) 2014-2020 Denis Corbin
4 //
5 // This file is part of libthreadar
6 //
7 // libthreadar is free software: you can redistribute it and/or modify
8 // it under the terms of the GNU Lesser General Public License as published by
9 // the Free Software Foundation, either version 3 of the License, or
10 // (at your option) any later version.
11 //
12 // libhtreadar is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // GNU Lesser General Public License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with libthreadar. If not, see <http://www.gnu.org/licenses/>
19 //
20 //----
21 // to contact the author: dar.linux@free.fr
22 /*********************************************************************/
23 
24 #ifndef LIBTHREADAR_TAMPON_H
25 #define LIBTHREADAR_TAMPON_H
26 
29 
30 #include "config.h"
31 
32  // C system headers
33 extern "C"
34 {
35 
36 }
37  // C++ standard headers
38 
39 
40  // libthreadar headers
41 #include "mutex.hpp"
42 #include "exceptions.hpp"
43 
44 namespace libthreadar
45 {
46 
48 
101  template <class T> class tampon
102  {
103  public:
105 
109  tampon(unsigned int max_block, unsigned int block_size);
110 
112  tampon(const tampon & ref) = delete;
113 
115  tampon(tampon && ref) noexcept = default;
116 
118  tampon & operator = (const tampon & ref) = delete;
119 
121  tampon & operator = (tampon && ref) noexcept = default;
122 
125  ~tampon();
126 
128 
133  void get_block_to_feed(T * & ptr, unsigned int & num);
134 
136 
140  void feed(T* ptr, unsigned int written);
141 
143 
147  void feed_cancel_get_block(T *ptr);
148 
150 
155  void fetch(T* & ptr, unsigned int & num);
156 
158 
161  void fetch_recycle(T* ptr);
162 
164 
173  void fetch_push_back(T *ptr, unsigned int new_num);
174 
176 
179  void fetch_push_back_and_skip(T *ptr, unsigned int new_num);
180 
182  void fetch_skip_back();
183 
185  bool has_readable_block_next() const;
186 
188  bool is_empty() const;
189 
191  bool is_not_empty() const { return !is_empty(); };
192 
194  bool is_full() const { return full; }; // no need to acquire mutex "modif"
195 
197  bool is_not_full() const { return !is_full(); };
198 
200  bool is_quiet_full() const { unsigned int tmp = next_feed; shift_by_one(tmp); return tmp == fetch_head; };
201 
203 
205  unsigned int size() const { return table_size; };
206 
208 
210  unsigned int block_size() const { return alloc_size; };
211 
213  unsigned int load() const { return fetch_head <= next_feed ? next_feed - fetch_head : table_size - (fetch_head - next_feed); };
214 
216  void reset();
217 
218  private:
219 
220  struct atom
221  {
222  T* mem;
223  unsigned int data_size;
224 
225  atom() { mem = nullptr; data_size = 0; };
226  };
227 
228  mutex modif; //< to make critical section when non atomic action requires a status has not changed between a test and following action
229  atom *table; //< datastructure holding data in transit between two threads
230  unsigned int table_size; //< size of table, i.e. number of struct atom it holds
231  unsigned int alloc_size; //< size of allocated memory for each atom in table
232  unsigned int next_feed; //< index in table of the next atom to use for feeding table
233  unsigned int next_fetch; //< index in table of the next atom to use for fetch table
234  unsigned int fetch_head; //< the oldest object to be fetched
235  bool fetch_outside; //< if set to true, table's index pointed to by next_fetch is used by the fetcher
236  bool feed_outside; //< if set to true, table's index pointed to by next_feed is used by the feeder
237  mutex waiting_feeder; //< feeder thread may be stuck waiting on that semaphore if table is full
238  mutex waiting_fetcher; //< fetcher thread may be stuck waiting on that semaphore if table is empty
239  bool full; //< set when tampon is full
240  bool feeder_go_lock; //< true to inform fetcher than feeder is about to or has already acquire lock on waiting_feeder
241  bool feeder_lock_track; //< only used by feeder to lock on waiting_feeder once outside of critical section
242  bool fetcher_go_lock; //< true to inform feeder than fetcher is about to or has already acquire lock on waiting_fetcher
243  bool fetcher_lock_track; //< only used by fetcher to lock on waiting_fetcher once outside of critical section
244 
245  bool is_empty_no_lock() const { return next_feed == fetch_head && !full; };
246 
248  bool has_readable_block_next_no_lock() const { return next_feed != next_fetch || full; }
249 
251  void shift_by_one(unsigned int & x) const;
252 
254  void shift_back_by_one(unsigned int & x) const;
255 
260  void shift_by_one_data_in_range(unsigned int begin, unsigned int end);
261 
262  };
263 
264  template <class T> tampon<T>::tampon(unsigned int max_block, unsigned int block_size)
265  {
266  table_size = max_block;
267  table = new atom[table_size];
268  if(table == nullptr)
269  throw exception_memory();
270  try
271  {
272  alloc_size = block_size;
273  try
274  {
275  for(unsigned int i = 0 ; i < table_size ; ++i)
276  {
277  table[i].mem = new T[alloc_size];
278  if(table[i].mem == nullptr)
279  throw exception_memory();
280  table[i].data_size = 0;
281  }
282  reset();
283  }
284  catch(...)
285  {
286  for(unsigned int i = 0; i < table_size ; ++i)
287  {
288  if(table[i].mem != nullptr)
289  delete [] table[i].mem;
290  }
291 
292  throw;
293  }
294  }
295  catch(...)
296  {
297  if(table != nullptr)
298  delete [] table;
299  throw;
300  }
301  }
302 
303 
304  template <class T> tampon<T>::~tampon()
305  {
306  if(table != nullptr)
307  {
308  for(unsigned int i = 0 ; i < table_size ; ++i)
309  {
310  if(table[i].mem != nullptr)
311  delete [] table[i].mem;
312  }
313  delete [] table;
314  }
315  }
316 
317  template <class T> void tampon<T>::get_block_to_feed(T * & ptr, unsigned int & num)
318  {
319  if(feed_outside)
320  throw exception_range("feed already out!");
321 
322  modif.lock(); // --- critical section START
323  if(is_full())
324  {
325  feeder_go_lock = true; // inform fetcher that we will suspend in waiting_feeder
326  feeder_lock_track = true;// to suspend on waiting_feeder once we will be out of the critical section
327  }
328  modif.unlock(); // --- critical section END
329 
330  if(feeder_lock_track)
331  {
332  feeder_lock_track = false;
333  waiting_feeder.lock(); // cannot lock inside a critical section ...
334  }
335 
336  if(is_full())
337  throw THREADAR_BUG; // still full!
338 
339  feed_outside = true;
340  ptr = table[next_feed].mem;
341  num = alloc_size;
342  }
343 
344  template <class T> void tampon<T>::feed(T *ptr, unsigned int num)
345  {
346  if(!feed_outside)
347  throw exception_range("fetch not outside!");
348  feed_outside = false;
349 
350  if(ptr != table[next_feed].mem)
351  throw exception_range("returned ptr is not the one given earlier for feeding");
352  table[next_feed].data_size = num;
353 
354  modif.lock(); // --- critical section START
355  shift_by_one(next_feed);
356  if(next_feed == fetch_head)
357  full = true;
358  if(fetcher_go_lock)
359  {
360  fetcher_go_lock = false;
361  waiting_fetcher.unlock();
362  }
363  modif.unlock(); // --- critical section END
364  }
365 
366  template <class T> void tampon<T>::feed_cancel_get_block(T *ptr)
367  {
368  if(!feed_outside)
369  throw exception_range("feed not outside!");
370  feed_outside = false;
371  if(ptr != table[next_feed].mem)
372  throw exception_range("returned ptr is not the one given earlier for feeding");
373  }
374 
375  template <class T> void tampon<T>::fetch(T* & ptr, unsigned int & num)
376  {
377  if(fetch_outside)
378  throw exception_range("already fetched block outside");
379 
380  modif.lock(); // --- critical section START
381  if(!has_readable_block_next_no_lock())
382  {
383  fetcher_go_lock = true; // to inform feeder that we will suspend on waiting_fetcher
384  fetcher_lock_track = true; // to suspend on waiting_fetcher once we will be out of the critical section
385  }
386  modif.unlock(); // --- critical section END
387 
388  if(fetcher_lock_track)
389  {
390  fetcher_lock_track = false;
391  waiting_fetcher.lock(); // cannot lock inside a critical section ...
392  }
393 
394  if(is_empty())
395  throw THREADAR_BUG;
396 
397  fetch_outside = true;
398  ptr = table[next_fetch].mem;
399  num = table[next_fetch].data_size;
400  }
401 
402  template <class T> void tampon<T>::fetch_recycle(T* ptr)
403  {
404  if(!fetch_outside)
405  throw exception_range("no block outside for fetching");
406  fetch_outside = false;
407  if(ptr != table[next_fetch].mem)
408  throw exception_range("returned ptr is no the one given earlier for fetching");
409 
410  modif.lock(); // --- critical section START
411  if(next_fetch == fetch_head)
412  {
413 
414  // no block were skipped
415 
416  shift_by_one(fetch_head);
417  next_fetch = fetch_head;
418  full = false;
419  }
420  else
421  {
422  unsigned int tmp = next_fetch;
423  atom tmp_tom;
424 
425  shift_by_one(tmp);
426  shift_by_one_data_in_range(tmp, next_feed);
427 
428  // we also take into account the situation
429  // where a block has been given for feeding
430  // so the next call to feed() will match the
431  // expected address of the returned block
432  tmp = next_feed; // recording old position of next_feed
433  shift_back_by_one(next_feed);
434  // swapping contents between old next_feed position
435  // and new one:
436  tmp_tom = table[next_feed];
437  table[next_feed] = table[tmp];
438  table[tmp] = tmp_tom;
439  // done!
440 
441  full = false;
442  }
443 
444  if(feeder_go_lock)
445  {
446  feeder_go_lock = false;
447  waiting_feeder.unlock();
448  }
449  modif.unlock(); // --- critical section END
450  }
451 
452  template <class T> void tampon<T>::fetch_push_back(T* ptr, unsigned int new_num)
453  {
454  if(!fetch_outside)
455  throw exception_range("no block outside for fetching");
456  fetch_outside = false;
457 
458  if(ptr != table[next_fetch].mem)
459  throw exception_range("returned ptr is not the one given earlier for fetching");
460  table[next_fetch].data_size = new_num;
461  }
462 
463  template <class T> void tampon<T>::fetch_push_back_and_skip(T *ptr,
464  unsigned int new_num)
465  {
466  fetch_push_back(ptr, new_num);
467  modif.lock(); // --- critical section START
468  if(full && next_fetch == next_feed) // reach last block feed, cannot skip it
469  throw exception_range("cannot skip the last fed block when the tampon is full");
470  shift_by_one(next_fetch);
471  modif.unlock(); // --- critical section END
472  }
473 
474  template <class T> void tampon<T>::fetch_skip_back()
475  {
476  if(fetch_outside)
477  throw exception_range("cannot skip back fetching while a block is being fetched");
478  next_fetch = fetch_head;
479  }
480 
481 
482  template <class T> bool tampon<T>::has_readable_block_next() const
483  {
484  bool ret;
485 
486  tampon<T> *me = const_cast<tampon<T> *>(this);
487  if(me == nullptr)
488  throw THREADAR_BUG;
489  me->modif.lock();
490  ret = has_readable_block_next_no_lock();
491  me->modif.unlock();
492 
493  return ret;
494  }
495 
496 
497  template <class T> bool tampon<T>::is_empty() const
498  {
499  bool ret;
500 
501  tampon<T> * me = const_cast<tampon<T> *>(this);
502  if(me == nullptr)
503  throw THREADAR_BUG;
504  me->modif.lock();
505  ret = is_empty_no_lock();
506  me->modif.unlock();
507 
508  return ret;
509  }
510 
511  template <class T> void tampon<T>::reset()
512  {
513  next_feed = 0;
514  next_fetch = 0;
515  fetch_head = 0;
516  fetch_outside = false;
517  feed_outside = false;
518  full = false;
519  feeder_go_lock = false;
520  feeder_lock_track = false;
521  fetcher_go_lock = false;
522  fetcher_lock_track = false;
523  (void)waiting_feeder.try_lock();
524  (void)waiting_fetcher.try_lock();
525  }
526 
527  template <class T> void tampon<T>::shift_by_one(unsigned int & x) const
528  {
529  ++x;
530  if(x >= table_size)
531  x = 0;
532  }
533 
534  template <class T> void tampon<T>::shift_back_by_one(unsigned int & x) const
535  {
536  if(x == 0)
537  x = table_size - 1;
538  else
539  --x;
540  }
541 
542  template <class T> void tampon<T>::shift_by_one_data_in_range(unsigned int begin, unsigned int end)
543  {
544 
545  if(begin != end)
546  {
547  unsigned int prev = begin;
548  shift_back_by_one(prev);
549  T* not_squeezed = table[prev].mem; // we will erase the address pointed to by mem so we keep it in memory here
550 
551  while(begin != end)
552  {
553  table[prev] = table[begin]; // this copies both mem (the value of the pointer, not the pointed to) and data_size,
554  prev = begin;
555  shift_by_one(begin);
556  }
557 
558  table[prev].mem = not_squeezed;
559  table[prev].data_size = 0; // by precaution
560  }
561  }
562 
563 
564 } // end of namespace
565 
566 #endif
567 
void fetch_push_back(T *ptr, unsigned int new_num)
fetcher call - step 2 alternative
Definition: tampon.hpp:452
#define THREADAR_BUG
Macro used to throw an exception_bug when execution reach that statement.
Definition: exceptions.hpp:164
defines the mutex C++ class
DEPRECATED see fast_tampon instead!
Definition: tampon.hpp:101
void fetch(T *&ptr, unsigned int &num)
fetcher call - step 1
Definition: tampon.hpp:375
defines a set of exceptions that are used by libthreadar to report error situations ...
void reset()
reset the object fields and mutex as if the object was just created
Definition: tampon.hpp:511
bool is_not_full() const
to know whether the tampon is not full
Definition: tampon.hpp:197
tampon(unsigned int max_block, unsigned int block_size)
constructor
Definition: tampon.hpp:264
Exception used to report memory allocation failures.
Definition: exceptions.hpp:151
bool has_readable_block_next() const
to known whether next fetch will be blocking (not skipped blocks)
Definition: tampon.hpp:482
void fetch_recycle(T *ptr)
fetcher call - step 2
Definition: tampon.hpp:402
void fetch_skip_back()
reactivate all skipped blocks, next fetch() will be the oldest available block
Definition: tampon.hpp:474
tampon & operator=(const tampon &ref)=delete
no assignment operator
void get_block_to_feed(T *&ptr, unsigned int &num)
feeder call - step 1
Definition: tampon.hpp:317
unsigned int load() const
returns the current number of blocks currently used in the tampon (fed but not fetched) ...
Definition: tampon.hpp:213
bool is_quiet_full() const
returns true if only one slot is available before filling the tampon
Definition: tampon.hpp:200
unsigned int size() const
returns the size of the tampon in maximum number of block it can contain
Definition: tampon.hpp:205
void unlock()
unlock the mutex
unsigned int block_size() const
returns the allocation size of each block
Definition: tampon.hpp:210
void fetch_push_back_and_skip(T *ptr, unsigned int new_num)
put back the fetched block and skip to next block for the next fetch()
Definition: tampon.hpp:463
void feed_cancel_get_block(T *ptr)
feeder call - step 2 alternative
Definition: tampon.hpp:366
void lock()
lock the mutex
bool is_not_empty() const
to know whether the tampon is not empty
Definition: tampon.hpp:191
bool is_full() const
for feeder to know whether the next call to get_block_to_feed() will be blocking
Definition: tampon.hpp:194
void feed(T *ptr, unsigned int written)
feeder call - step 2
Definition: tampon.hpp:344
This is the only namespace used in libthreadar and all symbols provided by libthreadar are member of ...
Definition: barrier.hpp:45
Exception used to report out or range value or argument.
Definition: exceptions.hpp:206
bool is_empty() const
to know whether the tampon has objects (readable or skipped)
Definition: tampon.hpp:497