Joshua
open source statistical hierarchical phrase-based machine translation system
|
00001 #ifndef UTIL_THREAD_POOL_H 00002 #define UTIL_THREAD_POOL_H 00003 00004 #include "util/pcqueue.hh" 00005 00006 #include <boost/ptr_container/ptr_vector.hpp> 00007 #include <boost/optional.hpp> 00008 #include <boost/thread.hpp> 00009 00010 #include <iostream> 00011 #include <cstdlib> 00012 00013 namespace util { 00014 00015 template <class HandlerT> class Worker : boost::noncopyable { 00016 public: 00017 typedef HandlerT Handler; 00018 typedef typename Handler::Request Request; 00019 00020 template <class Construct> Worker(PCQueue<Request> &in, Construct &construct, const Request &poison) 00021 : in_(in), handler_(construct), poison_(poison), thread_(boost::ref(*this)) {} 00022 00023 // Only call from thread. 00024 void operator()() { 00025 Request request; 00026 while (1) { 00027 in_.Consume(request); 00028 if (request == poison_) return; 00029 try { 00030 (*handler_)(request); 00031 } 00032 catch(const std::exception &e) { 00033 std::cerr << "Handler threw " << e.what() << std::endl; 00034 abort(); 00035 } 00036 catch(...) { 00037 std::cerr << "Handler threw an exception, dropping request" << std::endl; 00038 abort(); 00039 } 00040 } 00041 } 00042 00043 void Join() { 00044 thread_.join(); 00045 } 00046 00047 private: 00048 PCQueue<Request> &in_; 00049 00050 boost::optional<Handler> handler_; 00051 00052 const Request poison_; 00053 00054 boost::thread thread_; 00055 }; 00056 00057 template <class HandlerT> class ThreadPool : boost::noncopyable { 00058 public: 00059 typedef HandlerT Handler; 00060 typedef typename Handler::Request Request; 00061 00062 template <class Construct> ThreadPool(size_t queue_length, size_t workers, Construct handler_construct, Request poison) : in_(queue_length), poison_(poison) { 00063 for (size_t i = 0; i < workers; ++i) { 00064 workers_.push_back(new Worker<Handler>(in_, handler_construct, poison)); 00065 } 00066 } 00067 00068 ~ThreadPool() { 00069 for (size_t i = 0; i < workers_.size(); ++i) { 00070 Produce(poison_); 00071 } 00072 for (typename boost::ptr_vector<Worker<Handler> >::iterator i = workers_.begin(); i != workers_.end(); ++i) { 00073 i->Join(); 00074 } 00075 } 00076 00077 void Produce(const Request &request) { 00078 in_.Produce(request); 00079 } 00080 00081 // For adding to the queue. 00082 PCQueue<Request> &In() { return in_; } 00083 00084 private: 00085 PCQueue<Request> in_; 00086 00087 boost::ptr_vector<Worker<Handler> > workers_; 00088 00089 Request poison_; 00090 }; 00091 00092 } // namespace util 00093 00094 #endif // UTIL_THREAD_POOL_H