Joshua
open source statistical hierarchical phrase-based machine translation system
|
00001 #ifndef UTIL_STREAM_MULTI_STREAM_H 00002 #define UTIL_STREAM_MULTI_STREAM_H 00003 00004 #include "util/fixed_array.hh" 00005 #include "util/scoped.hh" 00006 #include "util/stream/chain.hh" 00007 #include "util/stream/stream.hh" 00008 00009 #include <cstddef> 00010 #include <new> 00011 00012 #include <cassert> 00013 #include <cstdlib> 00014 00015 namespace util { namespace stream { 00016 00017 class Chains; 00018 00019 class ChainPositions : public util::FixedArray<util::stream::ChainPosition> { 00020 public: 00021 ChainPositions() {} 00022 00023 explicit ChainPositions(std::size_t bound) : 00024 util::FixedArray<util::stream::ChainPosition>(bound) {} 00025 00026 void Init(Chains &chains); 00027 00028 explicit ChainPositions(Chains &chains) { 00029 Init(chains); 00030 } 00031 }; 00032 00033 class Chains : public util::FixedArray<util::stream::Chain> { 00034 private: 00035 template <class T, void (T::*ptr)(const ChainPositions &) = &T::Run> struct CheckForRun { 00036 typedef Chains type; 00037 }; 00038 00039 public: 00040 // Must call Init. 00041 Chains() {} 00042 00043 explicit Chains(std::size_t limit) : util::FixedArray<util::stream::Chain>(limit) {} 00044 00045 template <class Worker> typename CheckForRun<Worker>::type &operator>>(const Worker &worker) { 00046 threads_.push_back(new util::stream::Thread(ChainPositions(*this), worker)); 00047 return *this; 00048 } 00049 00050 template <class Worker> typename CheckForRun<Worker>::type &operator>>(const boost::reference_wrapper<Worker> &worker) { 00051 threads_.push_back(new util::stream::Thread(ChainPositions(*this), worker)); 00052 return *this; 00053 } 00054 00055 Chains &operator>>(const util::stream::Recycler &recycler) { 00056 for (util::stream::Chain *i = begin(); i != end(); ++i) 00057 *i >> recycler; 00058 return *this; 00059 } 00060 00061 void Wait(bool release_memory = true) { 00062 threads_.clear(); 00063 for (util::stream::Chain *i = begin(); i != end(); ++i) { 00064 i->Wait(release_memory); 00065 } 00066 } 00067 00068 private: 00069 boost::ptr_vector<util::stream::Thread> threads_; 00070 00071 Chains(const Chains &); 00072 void operator=(const Chains &); 00073 }; 00074 00075 inline void ChainPositions::Init(Chains &chains) { 00076 util::FixedArray<util::stream::ChainPosition>::Init(chains.size()); 00077 for (util::stream::Chain *i = chains.begin(); i != chains.end(); ++i) { 00078 // use "placement new" syntax to initalize ChainPosition in an already-allocated memory location 00079 new (end()) util::stream::ChainPosition(i->Add()); Constructed(); 00080 } 00081 } 00082 00083 inline Chains &operator>>(Chains &chains, ChainPositions &positions) { 00084 positions.Init(chains); 00085 return chains; 00086 } 00087 00088 template <class T> class GenericStreams : public util::FixedArray<T> { 00089 private: 00090 typedef util::FixedArray<T> P; 00091 public: 00092 GenericStreams() {} 00093 00094 // Limit restricts to positions[0,limit) 00095 void Init(const ChainPositions &positions, std::size_t limit) { 00096 P::Init(limit); 00097 for (const util::stream::ChainPosition *i = positions.begin(); i != positions.begin() + limit; ++i) { 00098 P::push_back(*i); 00099 } 00100 } 00101 void Init(const ChainPositions &positions) { 00102 Init(positions, positions.size()); 00103 } 00104 00105 GenericStreams(const ChainPositions &positions) { 00106 Init(positions); 00107 } 00108 00109 void Init(std::size_t amount) { 00110 P::Init(amount); 00111 } 00112 }; 00113 00114 template <class T> inline Chains &operator>>(Chains &chains, GenericStreams<T> &streams) { 00115 ChainPositions positions; 00116 chains >> positions; 00117 streams.Init(positions); 00118 return chains; 00119 } 00120 00121 typedef GenericStreams<Stream> Streams; 00122 00123 }} // namespaces 00124 #endif // UTIL_STREAM_MULTI_STREAM_H