Joshua
open source statistical hierarchical phrase-based machine translation system
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator Friends
src/kenlm/util/stream/multi_stream.hh
00001 #ifndef UTIL_STREAM_MULTI_STREAM_H
00002 #define UTIL_STREAM_MULTI_STREAM_H
00003 
00004 #include "util/fixed_array.hh"
00005 #include "util/scoped.hh"
00006 #include "util/stream/chain.hh"
00007 #include "util/stream/stream.hh"
00008 
00009 #include <cstddef>
00010 #include <new>
00011 
00012 #include <cassert>
00013 #include <cstdlib>
00014 
00015 namespace util { namespace stream {
00016 
00017 class Chains;
00018 
00019 class ChainPositions : public util::FixedArray<util::stream::ChainPosition> {
00020   public:
00021     ChainPositions() {}
00022 
00023     explicit ChainPositions(std::size_t bound) :
00024       util::FixedArray<util::stream::ChainPosition>(bound) {}
00025 
00026     void Init(Chains &chains);
00027 
00028     explicit ChainPositions(Chains &chains) {
00029       Init(chains);
00030     }
00031 };
00032 
00033 class Chains : public util::FixedArray<util::stream::Chain> {
00034   private:
00035     template <class T, void (T::*ptr)(const ChainPositions &) = &T::Run> struct CheckForRun {
00036       typedef Chains type;
00037     };
00038 
00039   public:
00040     // Must call Init.
00041     Chains() {}
00042 
00043     explicit Chains(std::size_t limit) : util::FixedArray<util::stream::Chain>(limit) {}
00044 
00045     template <class Worker> typename CheckForRun<Worker>::type &operator>>(const Worker &worker) {
00046       threads_.push_back(new util::stream::Thread(ChainPositions(*this), worker));
00047       return *this;
00048     }
00049 
00050     template <class Worker> typename CheckForRun<Worker>::type &operator>>(const boost::reference_wrapper<Worker> &worker) {
00051       threads_.push_back(new util::stream::Thread(ChainPositions(*this), worker));
00052       return *this;
00053     }
00054 
00055     Chains &operator>>(const util::stream::Recycler &recycler) {
00056       for (util::stream::Chain *i = begin(); i != end(); ++i)
00057         *i >> recycler;
00058       return *this;
00059     }
00060 
00061     void Wait(bool release_memory = true) {
00062       threads_.clear();
00063       for (util::stream::Chain *i = begin(); i != end(); ++i) {
00064         i->Wait(release_memory);
00065       }
00066     }
00067 
00068   private:
00069     boost::ptr_vector<util::stream::Thread> threads_;
00070 
00071     Chains(const Chains &);
00072     void operator=(const Chains &);
00073 };
00074 
00075 inline void ChainPositions::Init(Chains &chains) {
00076   util::FixedArray<util::stream::ChainPosition>::Init(chains.size());
00077   for (util::stream::Chain *i = chains.begin(); i != chains.end(); ++i) {
00078     // use "placement new" syntax to initalize ChainPosition in an already-allocated memory location
00079     new (end()) util::stream::ChainPosition(i->Add()); Constructed();
00080   }
00081 }
00082 
00083 inline Chains &operator>>(Chains &chains, ChainPositions &positions) {
00084   positions.Init(chains);
00085   return chains;
00086 }
00087 
00088 template <class T> class GenericStreams : public util::FixedArray<T> {
00089   private:
00090     typedef util::FixedArray<T> P;
00091   public:
00092     GenericStreams() {}
00093 
00094     // Limit restricts to positions[0,limit)
00095     void Init(const ChainPositions &positions, std::size_t limit) {
00096       P::Init(limit);
00097       for (const util::stream::ChainPosition *i = positions.begin(); i != positions.begin() + limit; ++i) {
00098         P::push_back(*i);
00099       }
00100     }
00101     void Init(const ChainPositions &positions) {
00102       Init(positions, positions.size());
00103     }
00104 
00105     GenericStreams(const ChainPositions &positions) {
00106       Init(positions);
00107     }
00108 
00109     void Init(std::size_t amount) {
00110       P::Init(amount);
00111     }
00112 };
00113 
00114 template <class T> inline Chains &operator>>(Chains &chains, GenericStreams<T> &streams) {
00115   ChainPositions positions;
00116   chains >> positions;
00117   streams.Init(positions);
00118   return chains;
00119 }
00120 
00121 typedef GenericStreams<Stream> Streams;
00122 
00123 }} // namespaces
00124 #endif // UTIL_STREAM_MULTI_STREAM_H