Joshua
open source statistical hierarchical phrase-based machine translation system
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator Friends
src/kenlm/util/thread_pool.hh
00001 #ifndef UTIL_THREAD_POOL_H
00002 #define UTIL_THREAD_POOL_H
00003 
00004 #include "util/pcqueue.hh"
00005 
00006 #include <boost/ptr_container/ptr_vector.hpp>
00007 #include <boost/optional.hpp>
00008 #include <boost/thread.hpp>
00009 
00010 #include <iostream>
00011 #include <cstdlib>
00012 
00013 namespace util {
00014 
00015 template <class HandlerT> class Worker : boost::noncopyable {
00016   public:
00017     typedef HandlerT Handler;
00018     typedef typename Handler::Request Request;
00019 
00020     template <class Construct> Worker(PCQueue<Request> &in, Construct &construct, const Request &poison)
00021       : in_(in), handler_(construct), poison_(poison), thread_(boost::ref(*this)) {}
00022 
00023     // Only call from thread.
00024     void operator()() {
00025       Request request;
00026       while (1) {
00027         in_.Consume(request);
00028         if (request == poison_) return;
00029         try {
00030           (*handler_)(request);
00031         }
00032         catch(const std::exception &e) {
00033           std::cerr << "Handler threw " << e.what() << std::endl;
00034           abort();
00035         }
00036         catch(...) {
00037           std::cerr << "Handler threw an exception, dropping request" << std::endl;
00038           abort();
00039         }
00040       }
00041     }
00042 
00043     void Join() {
00044       thread_.join();
00045     }
00046 
00047   private:
00048     PCQueue<Request> &in_;
00049 
00050     boost::optional<Handler> handler_;
00051 
00052     const Request poison_;
00053 
00054     boost::thread thread_;
00055 };
00056 
00057 template <class HandlerT> class ThreadPool : boost::noncopyable {
00058   public:
00059     typedef HandlerT Handler;
00060     typedef typename Handler::Request Request;
00061 
00062     template <class Construct> ThreadPool(size_t queue_length, size_t workers, Construct handler_construct, Request poison) : in_(queue_length), poison_(poison) {
00063       for (size_t i = 0; i < workers; ++i) {
00064         workers_.push_back(new Worker<Handler>(in_, handler_construct, poison));
00065       }
00066     }
00067 
00068     ~ThreadPool() {
00069       for (size_t i = 0; i < workers_.size(); ++i) {
00070         Produce(poison_);
00071       }
00072       for (typename boost::ptr_vector<Worker<Handler> >::iterator i = workers_.begin(); i != workers_.end(); ++i) {
00073         i->Join();
00074       }
00075     }
00076 
00077     void Produce(const Request &request) {
00078       in_.Produce(request);
00079     }
00080 
00081     // For adding to the queue.
00082     PCQueue<Request> &In() { return in_; }
00083 
00084   private:
00085     PCQueue<Request> in_;
00086 
00087     boost::ptr_vector<Worker<Handler> > workers_;
00088 
00089     Request poison_;
00090 };
00091 
00092 } // namespace util
00093 
00094 #endif // UTIL_THREAD_POOL_H