Joshua
open source statistical hierarchical phrase-based machine translation system
|
00001 #ifndef UTIL_PCQUEUE_H 00002 #define UTIL_PCQUEUE_H 00003 00004 #include "util/exception.hh" 00005 00006 #include <boost/interprocess/sync/interprocess_semaphore.hpp> 00007 #include <boost/scoped_array.hpp> 00008 #include <boost/thread/mutex.hpp> 00009 #include <boost/utility.hpp> 00010 00011 #include <cerrno> 00012 00013 #ifdef __APPLE__ 00014 #include <mach/semaphore.h> 00015 #include <mach/task.h> 00016 #include <mach/mach_traps.h> 00017 #include <mach/mach.h> 00018 #endif // __APPLE__ 00019 00020 namespace util { 00021 00022 /* OS X Maverick and Boost interprocess were doing "Function not implemented." 00023 * So this is my own wrapper around the mach kernel APIs. 00024 */ 00025 #ifdef __APPLE__ 00026 00027 #define MACH_CALL(call) UTIL_THROW_IF(KERN_SUCCESS != (call), Exception, "Mach call failure") 00028 00029 class Semaphore { 00030 public: 00031 explicit Semaphore(int value) : task_(mach_task_self()) { 00032 MACH_CALL(semaphore_create(task_, &back_, SYNC_POLICY_FIFO, value)); 00033 } 00034 00035 ~Semaphore() { 00036 MACH_CALL(semaphore_destroy(task_, back_)); 00037 } 00038 00039 void wait() { 00040 MACH_CALL(semaphore_wait(back_)); 00041 } 00042 00043 void post() { 00044 MACH_CALL(semaphore_signal(back_)); 00045 } 00046 00047 private: 00048 semaphore_t back_; 00049 task_t task_; 00050 }; 00051 00052 inline void WaitSemaphore(Semaphore &semaphore) { 00053 semaphore.wait(); 00054 } 00055 00056 #else 00057 typedef boost::interprocess::interprocess_semaphore Semaphore; 00058 00059 inline void WaitSemaphore (Semaphore &on) { 00060 while (1) { 00061 try { 00062 on.wait(); 00063 break; 00064 } 00065 catch (boost::interprocess::interprocess_exception &e) { 00066 if (e.get_native_error() != EINTR) { 00067 throw; 00068 } 00069 } 00070 } 00071 } 00072 00073 #endif // __APPLE__ 00074 00082 template <class T> class PCQueue : boost::noncopyable { 00083 public: 00084 explicit PCQueue(size_t size) 00085 : empty_(size), used_(0), 00086 storage_(new T[size]), 00087 end_(storage_.get() + size), 00088 produce_at_(storage_.get()), 00089 consume_at_(storage_.get()) {} 00090 00091 // Add a value to the queue. 00092 void Produce(const T &val) { 00093 WaitSemaphore(empty_); 00094 { 00095 boost::unique_lock<boost::mutex> produce_lock(produce_at_mutex_); 00096 try { 00097 *produce_at_ = val; 00098 } 00099 catch (...) { 00100 empty_.post(); 00101 throw; 00102 } 00103 if (++produce_at_ == end_) produce_at_ = storage_.get(); 00104 } 00105 used_.post(); 00106 } 00107 00108 // Consume a value, assigning it to out. 00109 T& Consume(T &out) { 00110 WaitSemaphore(used_); 00111 { 00112 boost::unique_lock<boost::mutex> consume_lock(consume_at_mutex_); 00113 try { 00114 out = *consume_at_; 00115 } 00116 catch (...) { 00117 used_.post(); 00118 throw; 00119 } 00120 if (++consume_at_ == end_) consume_at_ = storage_.get(); 00121 } 00122 empty_.post(); 00123 return out; 00124 } 00125 00126 // Convenience version of Consume that copies the value to return. 00127 // The other version is faster. 00128 T Consume() { 00129 T ret; 00130 Consume(ret); 00131 return ret; 00132 } 00133 00134 private: 00135 // Number of empty spaces in storage_. 00136 Semaphore empty_; 00137 // Number of occupied spaces in storage_. 00138 Semaphore used_; 00139 00140 boost::scoped_array<T> storage_; 00141 00142 T *const end_; 00143 00144 // Index for next write in storage_. 00145 T *produce_at_; 00146 boost::mutex produce_at_mutex_; 00147 00148 // Index for next read from storage_. 00149 T *consume_at_; 00150 boost::mutex consume_at_mutex_; 00151 00152 }; 00153 00154 } // namespace util 00155 00156 #endif // UTIL_PCQUEUE_H