Joshua
open source statistical hierarchical phrase-based machine translation system
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator Friends
src/kenlm/util/pcqueue.hh
00001 #ifndef UTIL_PCQUEUE_H
00002 #define UTIL_PCQUEUE_H
00003 
00004 #include "util/exception.hh"
00005 
00006 #include <boost/interprocess/sync/interprocess_semaphore.hpp>
00007 #include <boost/scoped_array.hpp>
00008 #include <boost/thread/mutex.hpp>
00009 #include <boost/utility.hpp>
00010 
00011 #include <cerrno>
00012 
00013 #ifdef __APPLE__
00014 #include <mach/semaphore.h>
00015 #include <mach/task.h>
00016 #include <mach/mach_traps.h>
00017 #include <mach/mach.h>
00018 #endif // __APPLE__
00019 
00020 namespace util {
00021 
00022 /* OS X Maverick and Boost interprocess were doing "Function not implemented."
00023  * So this is my own wrapper around the mach kernel APIs.
00024  */
00025 #ifdef __APPLE__
00026 
00027 #define MACH_CALL(call) UTIL_THROW_IF(KERN_SUCCESS != (call), Exception, "Mach call failure")
00028 
00029 class Semaphore {
00030   public:
00031     explicit Semaphore(int value) : task_(mach_task_self()) {
00032       MACH_CALL(semaphore_create(task_, &back_, SYNC_POLICY_FIFO, value));
00033     }
00034 
00035     ~Semaphore() {
00036       MACH_CALL(semaphore_destroy(task_, back_));
00037     }
00038 
00039     void wait() {
00040       MACH_CALL(semaphore_wait(back_));
00041     }
00042 
00043     void post() {
00044       MACH_CALL(semaphore_signal(back_));
00045     }
00046 
00047   private:
00048     semaphore_t back_;
00049     task_t task_;
00050 };
00051 
00052 inline void WaitSemaphore(Semaphore &semaphore) {
00053   semaphore.wait();
00054 }
00055 
00056 #else
00057 typedef boost::interprocess::interprocess_semaphore Semaphore;
00058 
00059 inline void WaitSemaphore (Semaphore &on) {
00060   while (1) {
00061     try {
00062       on.wait();
00063       break;
00064     }
00065     catch (boost::interprocess::interprocess_exception &e) {
00066       if (e.get_native_error() != EINTR) {
00067         throw;
00068       }
00069     }
00070   }
00071 }
00072 
00073 #endif // __APPLE__
00074 
00082 template <class T> class PCQueue : boost::noncopyable {
00083  public:
00084   explicit PCQueue(size_t size)
00085    : empty_(size), used_(0),
00086      storage_(new T[size]),
00087      end_(storage_.get() + size),
00088      produce_at_(storage_.get()),
00089      consume_at_(storage_.get()) {}
00090 
00091   // Add a value to the queue.
00092   void Produce(const T &val) {
00093     WaitSemaphore(empty_);
00094     {
00095       boost::unique_lock<boost::mutex> produce_lock(produce_at_mutex_);
00096       try {
00097         *produce_at_ = val;
00098       }
00099       catch (...) {
00100         empty_.post();
00101         throw;
00102       }
00103       if (++produce_at_ == end_) produce_at_ = storage_.get();
00104     }
00105     used_.post();
00106   }
00107 
00108   // Consume a value, assigning it to out.
00109   T& Consume(T &out) {
00110     WaitSemaphore(used_);
00111     {
00112       boost::unique_lock<boost::mutex> consume_lock(consume_at_mutex_);
00113       try {
00114         out = *consume_at_;
00115       }
00116       catch (...) {
00117         used_.post();
00118         throw;
00119       }
00120       if (++consume_at_ == end_) consume_at_ = storage_.get();
00121     }
00122     empty_.post();
00123     return out;
00124   }
00125 
00126   // Convenience version of Consume that copies the value to return.
00127   // The other version is faster.
00128   T Consume() {
00129     T ret;
00130     Consume(ret);
00131     return ret;
00132   }
00133 
00134  private:
00135   // Number of empty spaces in storage_.
00136   Semaphore empty_;
00137   // Number of occupied spaces in storage_.
00138   Semaphore used_;
00139 
00140   boost::scoped_array<T> storage_;
00141 
00142   T *const end_;
00143 
00144   // Index for next write in storage_.
00145   T *produce_at_;
00146   boost::mutex produce_at_mutex_;
00147 
00148   // Index for next read from storage_.
00149   T *consume_at_;
00150   boost::mutex consume_at_mutex_;
00151 
00152 };
00153 
00154 } // namespace util
00155 
00156 #endif // UTIL_PCQUEUE_H