среда, 18 апреля 2018 г.

[prog.c++] Наглядная разница при работе с многопоточностью "вручную" и с помощью CSP-шных каналов

Для очередной статьи на Хабре нужно было реализовать собственный диспетчер, заточенный под конкретную специфическую задачу. Первоначально я этот диспетчер начал делать "в лоб", с собственными очередями, mutex-ами, condition_variable и вот этим вот всем.

Однако, уже по ходу реализации в голову пришла простая мысль о том, что все тоже самое можно получить и используя CSP-шные каналы, благо в SO-5 они уже давно есть. В общем, было сделано две реализации. Одна на нитях/mutex/condition_variable. Вторая -- на SObjectizer-овских mchain-ах. То, какая в итоге вышла разница, можно увидеть под катом.

Итак, слева первоначальный вариант на нитях/mutex/condition_variable, справа -- итоговый на mchain-ах. При этом в первоначальном варианте была допущена серьезная ошибка, которая была найдена и исправлена не сразу.

class tricky_dispatcher_t
      : public std::enable_shared_from_this<tricky_dispatcher_t> {
   friend class tricky_event_queue_t;
   friend class tricky_disp_binder_t;

   class tricky_event_queue_t : public so_5::event_queue_t {
      tricky_dispatcher_t & disp_;
   public:
      tricky_event_queue_t(tricky_dispatcher_t & disp) : disp_{disp} {}

      virtual void push(so_5::execution_demand_t demand) override {
         disp_.push_demand(std::move(demand));
      }
   };

   class tricky_disp_binder_t : public so_5::disp_binder_t {
      std::shared_ptr<tricky_dispatcher_t> disp_;
   public:
      tricky_disp_binder_t(std::shared_ptr<tricky_dispatcher_t> disp)
            : disp_{std::move(disp)} {}

      virtual so_5::disp_binding_activator_t bind_agent(
            so_5::environment_t &,
            so_5::agent_ref_t agent) override {
         return [d = disp_, agent] {
            agent->so_bind_to_dispatcher(d->event_queue_);
         };
      }

      virtual void unbind_agent(
            so_5::environment_t &,
            so_5::agent_ref_t) override {
         // Ничего не нужно делать.
      }
   };

   // Тип очереди заявок.
   using demand_queue_t = std::deque<so_5::execution_demand_t>;

   // Тип стека для ожидающих своей работы очередей.
   using cond_var_stack_t = std::stack<std::condition_variable *>;

   // Тип контейнера для рабочих очередей.
   using thread_pool_t = std::vector<std::thread>;

   // Вспомогательный тип, который потребуется для нотификации о том,
   // что диспетчер должен завершить свою работу.
   struct shutdown_t {};

   // Объект, реализующий интерфейс so_5::event_queue_t для того,
   // чтобы выполнять привязку агентов к диспетчеру.
   tricky_event_queue_t event_queue_;

   // Замок объекта. Все модификации очередей заявок будут выполняться
   // под этим замком.
   std::mutex lock_;

   // Признак того, что диспетчер должен завершить свою работу.
   bool shutdown_{false};

   // Очередь заявок для сообщений init_device и reinit_device.
   demand_queue_t init_reinit_queue_;
   // Очередь для всех остальных сообщений.
   demand_queue_t other_demands_queue_;

   // Рабочие нити первого типа, которые ждут когда для них найдется работа.
   cond_var_stack_t sleeping_first_type_threads_;
   // Рабочие нити второго типа, которые ждут когда для них найдется работа.
   cond_var_stack_t sleeping_second_type_threads_;

   // Рабочие очереди этого диспетчера.
   thread_pool_t work_threads_;

   static const std::type_index init_device_type;
   static const std::type_index reinit_device_type;
   
   // Вспомогательный метод для вычисления размеров подпулов.
   static auto calculate_pools_sizes(unsigned pool_size) {
      if2u == pool_size)
         // Всего две очереди в пуле. Делим пополам.
         return std::tuple{1u1u};
      else {
         // Нитей первого типа будет 3/4 от общего количества.
         const auto first_pool_size = (pool_size/4u)*3u;
         return std::tuple{first_pool_size, pool_size - first_pool_size};
      }
   }

   // Вспомогательный метод для того, чтобы взять первую спящую рабочую
   // нить и разбудить ее.
   static void wake_up_thread_from(cond_var_stack_t & vars) {
      auto v = vars.top();
      vars.pop();
      v->notify_one();
   }

   // Вспомогательный метод для того, чтобы завершить работу всех нитей.
   void shutdown_work_threads() noexcept {
      // Сначала нужно выставить флаг shutdown для того, чтобы все
      // работающие нити поняли, что пришло время завершать свою работу.
      // Это нужно делать под захваченным замком объекта.
      {
         std::lock_guard l{lock_};

         shutdown_ = true;

         // Если есть спящие нити, то их следует разбудить.
         while(!sleeping_first_type_threads_.empty())
            wake_up_thread_from(sleeping_first_type_threads_);
         while(!sleeping_second_type_threads_.empty())
            wake_up_thread_from(sleeping_second_type_threads_);
      }

      // Теперь можно дождаться момента, когда все рабочие нити закончат
      // свою работу.
      for(auto & t : work_threads_)
         t.join();

      // Пул рабочих нитей должен быть очищен.
      work_threads_.clear();
   }

   // Запуск всех рабочих нитей.
   // Если в процессе запуска произойдет сбой, то ранее запущенные нити
   // должны быть остановлены.
   void launch_work_threads(
         unsigned first_type_threads_count,
         unsigned second_type_threads_count) {
      work_threads_.reserve(first_type_threads_count + second_type_threads_count);
      try {
         for(auto i = 0u; i < first_type_threads_count; ++i)
            work_threads_.emplace_back([this]{ first_type_thread_body(); });

         for(auto i = 0u; i < second_type_threads_count; ++i)
            work_threads_.emplace_back([this]{ second_type_thread_body(); });
      }
      catch(...) {
         shutdown_work_threads();
         throw// Пусть с исключениями разбираются выше.
      }
   }

   // Тело рабочей нити первого типа.
   void first_type_thread_body() {
      // Нам потребуется знать идентфикатор текущей нити для того,
      // чтобы запускать обработчики событий агента с эти идентификатором.
      const auto thread_id = so_5::query_current_thread_id();

      // Так же нам потребуется собственный condition_variable, который
      // будет нас будить когда поступит заявка для обработки.
      std::condition_variable cond_var;

      // Эта вспомогательная функция будет извлекать заявку из очередей или
      // спать до тех пор, пока что-нибудь произойдет.
      const auto extractor = [&]() -> std::variant<shutdown_t, so_5::execution_demand_t> {
         // Все операции выполняются под захваченным замком.
         std::unique_lock l{lock_};
         while(true) {
            if(shutdown_)
               return shutdown_t{};
            else if(!init_reinit_queue_.empty()) {
               auto d = std::move(init_reinit_queue_.front());
               init_reinit_queue_.pop_front();

               if(!init_reinit_queue_.empty() &&
                     !sleeping_first_type_threads_.empty())
                  // Нужно дать поработать еще одной нити.
                  wake_up_thread_from(sleeping_first_type_threads_);

               return d;
            }
            else if(!other_demands_queue_.empty()) {
               auto d = std::move(other_demands_queue_.front());
               other_demands_queue_.pop_front();

               if(!other_demands_queue_.empty()) {
                  // Нужно дать поработать еще одной нити.
                  if(!sleeping_second_type_threads_.empty())
                     wake_up_thread_from(sleeping_second_type_threads_);
                  else if(!sleeping_first_type_threads_.empty())
                     wake_up_thread_from(sleeping_first_type_threads_);
               }

               return d;
            }
            else {
               // Все очереди пусты, нужно заснуть до появления работы.
               sleeping_first_type_threads_.push(&cond_var);
               cond_var.wait(l);
            }
         }
      };

      // Выполняем работу до тех пор, пока не получим сигнал shutdown.
      while(true) {
         auto r = extractor();
         if(std::get_if<shutdown_t>(&r))
            return;
         else {
            auto & d = std::get<so_5::execution_demand_t>(r);
            d.call_handler(thread_id);
         }
      }
   }

   // Тело рабочей нити второго типа.
   void second_type_thread_body() {
      // Нам потребуется знать идентфикатор текущей нити для того,
      // чтобы запускать обработчики событий агента с эти идентификатором.
      const auto thread_id = so_5::query_current_thread_id();

      // Так же нам потребуется собственный condition_variable, который
      // будет нас будить когда поступит заявка для обработки.
      std::condition_variable cond_var;

      // Эта вспомогательная функция будет извлекать заявку из очередей или
      // спать до тех пор, пока что-нибудь произойдет.
      const auto extractor = [&]() -> std::variant<shutdown_t, so_5::execution_demand_t> {
         // Все операции выполняются под захваченным замком.
         std::unique_lock l{lock_};
         while(true) {
            if(shutdown_)
               return shutdown_t{};
            else if(!other_demands_queue_.empty()) {
               auto d = std::move(other_demands_queue_.front());
               other_demands_queue_.pop_front();

               if(!other_demands_queue_.empty()) {
                  // Нужно дать поработать еще одной нити.
                  if(!sleeping_second_type_threads_.empty())
                     wake_up_thread_from(sleeping_second_type_threads_);
                  else if(!sleeping_first_type_threads_.empty())
                     wake_up_thread_from(sleeping_first_type_threads_);
               }

               return d;
            }
            else {
               // Все очереди пусты, нужно заснуть до появления работы.
               sleeping_second_type_threads_.push(&cond_var);
               cond_var.wait(l);
            }
         }
      };

      // Выполняем работу до тех пор, пока не получим сигнал shutdown.
      while(true) {
         auto r = extractor();
         if(std::get_if<shutdown_t>(&r))
            return;
         else {
            auto & d = std::get<so_5::execution_demand_t>(r);
            d.call_handler(thread_id);
         }
      }
   }

   // Сохранение очередной заявки в очередях заявок.
   void push_demand(so_5::execution_demand_t demand) {
      // Делаем все действия обязательно под захваченным замком.
      std::lock_guard l{lock_};

      if(init_device_type == demand.m_msg_type ||
            reinit_device_type == demand.m_msg_type) {
         // Эти заявки должны идти в свою собственную очередь.
         const bool was_empty = init_reinit_queue_.empty();
         init_reinit_queue_.emplace_back(std::move(demand));
         // Если есть какая-то нить, которая ждет работу, то разбудим ее.
         if(was_empty && !sleeping_first_type_threads_.empty())
            wake_up_thread_from(sleeping_first_type_threads_);
      }
      else {
         // Это заявка, которая должна попасть в общую очередь.
         const bool was_empty = other_demands_queue_.empty();
         other_demands_queue_.emplace_back(std::move(demand));
         if(was_empty) {
            // Возможно, какая-то из рабочих нитей ждала работу для себя.
            // Попробуем с этим разобраться.
            if(!sleeping_second_type_threads_.empty())
               wake_up_thread_from(sleeping_second_type_threads_);
            // Рабочие потоки первого типа так же могут обработать такую
            // заявку, если другой работы для них нет.
            else if(!sleeping_first_type_threads_.empty())
               wake_up_thread_from(sleeping_first_type_threads_);
         }
      }
   }

public:
   // Конструктор сразу же запускает все рабочие нити.
   tricky_dispatcher_t(
         // Количество рабочих потоков, которые должны быть созаны диспетчером.
         unsigned pool_size)
         :  event_queue_{*this} {
      const auto [first_type_count, second_type_count] =
            calculate_pools_sizes(pool_size);

      launch_work_threads(first_type_count, second_type_count);
   }
   ~tricky_dispatcher_t() noexcept {
      // Все работающие нити должны быть остановлены.
      shutdown_work_threads();
   }

   // Метод-фабрика для создания экземпляров диспетчера.
   static auto make(unsigned pool_size) {
      return std::make_shared<tricky_dispatcher_t>(pool_size);
   }

   // Создать биндера, который сможет привязать агента к этому диспетчеру.
   so_5::disp_binder_unique_ptr_t binder() {
      return so_5::disp_binder_unique_ptr_t{
            new tricky_disp_binder_t{shared_from_this()}};
   }
};

const std::type_index tricky_dispatcher_t::init_device_type =
      typeid(a_device_manager_t::init_device_t);
const std::type_index tricky_dispatcher_t::reinit_device_type =
      typeid(so_5::mutable_msg<a_device_manager_t::reinit_device_t>);
class tricky_dispatcher_t
      : public std::enable_shared_from_this<tricky_dispatcher_t> {
   friend class tricky_event_queue_t;
   friend class tricky_disp_binder_t;

   class tricky_event_queue_t : public so_5::event_queue_t {
      tricky_dispatcher_t & disp_;
   public:
      tricky_event_queue_t(tricky_dispatcher_t & disp) : disp_{disp} {}

      virtual void push(so_5::execution_demand_t demand) override {
         disp_.push_demand(std::move(demand));
      }
   };

   class tricky_disp_binder_t : public so_5::disp_binder_t {
      std::shared_ptr<tricky_dispatcher_t> disp_;
   public:
      tricky_disp_binder_t(std::shared_ptr<tricky_dispatcher_t> disp)
            : disp_{std::move(disp)} {}

      virtual so_5::disp_binding_activator_t bind_agent(
            so_5::environment_t &,
            so_5::agent_ref_t agent) override {
         return [d = disp_, agent] {
            agent->so_bind_to_dispatcher(d->event_queue_);
         };
      }

      virtual void unbind_agent(
            so_5::environment_t &,
            so_5::agent_ref_t) override {
         // Ничего не нужно делать.
      }
   };

   // Тип контейнера для рабочих очередей.
   using thread_pool_t = std::vector<std::thread>;

   // Объект, реализующий интерфейс so_5::event_queue_t для того,
   // чтобы выполнять привязку агентов к диспетчеру.
   tricky_event_queue_t event_queue_;

   // Каналы, которые будут использоваться в качестве очередей сообщений.
   so_5::mchain_t init_reinit_ch_;
   so_5::mchain_t other_demands_ch_;

   // Рабочие очереди этого диспетчера.
   thread_pool_t work_threads_;

   static const std::type_index init_device_type;
   static const std::type_index reinit_device_type;
   
   // Вспомогательный метод для вычисления размеров подпулов.
   static auto calculate_pools_sizes(unsigned pool_size) {
      if2u == pool_size)
         // Всего две очереди в пуле. Делим пополам.
         return std::tuple{1u1u};
      else {
         // Нитей первого типа будет 3/4 от общего количества.
         const auto first_pool_size = (pool_size/4u)*3u;
         return std::tuple{first_pool_size, pool_size - first_pool_size};
      }
   }

   // Вспомогательный метод для того, чтобы завершить работу всех нитей.
   void shutdown_work_threads() noexcept {
      // Сначала закроем оба канала.
      so_5::close_drop_content(init_reinit_ch_);
      so_5::close_drop_content(other_demands_ch_);

      // Теперь можно дождаться момента, когда все рабочие нити закончат
      // свою работу.
      for(auto & t : work_threads_)
         t.join();

      // Пул рабочих нитей должен быть очищен.
      work_threads_.clear();
   }

   // Запуск всех рабочих нитей.
   // Если в процессе запуска произойдет сбой, то ранее запущенные нити
   // должны быть остановлены.
   void launch_work_threads(
         unsigned first_type_threads_count,
         unsigned second_type_threads_count) {
      work_threads_.reserve(first_type_threads_count + second_type_threads_count);
      try {
         for(auto i = 0u; i < first_type_threads_count; ++i)
            work_threads_.emplace_back([this]{ first_type_thread_body(); });

         for(auto i = 0u; i < second_type_threads_count; ++i)
            work_threads_.emplace_back([this]{ second_type_thread_body(); });
      }
      catch(...) {
         shutdown_work_threads();
         throw// Пусть с исключениями разбираются выше.
      }
   }

   // Обработчик объектов so_5::execution_demand_t.
   static void exec_demand_handler(so_5::execution_demand_t d) {
      d.call_handler(so_5::null_current_thread_id());
   }

   // Тело рабочей нити первого типа.
   void first_type_thread_body() {
      // Выполняем работу до тех пор, пока не будут закрыты все каналы.
      so_5::select(so_5::from_all(),
            case_(init_reinit_ch_, exec_demand_handler),
            case_(other_demands_ch_, exec_demand_handler));
   }

   // Тело рабочей нити второго типа.
   void second_type_thread_body() {
      // Выполняем работу до тех пор, пока не будут закрыты все каналы.
      so_5::select(so_5::from_all(),
            case_(other_demands_ch_, exec_demand_handler));
   }

   // Сохранение очередной заявки в очередях заявок.
   void push_demand(so_5::execution_demand_t demand) {
      if(init_device_type == demand.m_msg_type ||
            reinit_device_type == demand.m_msg_type) {
         // Эти заявки должны идти в свою собственную очередь.
         so_5::send<so_5::execution_demand_t>(init_reinit_ch_, std::move(demand));
      }
      else {
         // Это заявка, которая должна попасть в общую очередь.
         so_5::send<so_5::execution_demand_t>(other_demands_ch_, std::move(demand));
      }
   }

public:
   // Конструктор сразу же запускает все рабочие нити.
   tricky_dispatcher_t(
         // SObjectizer Environment, на котором нужно будет работать.
         so_5::environment_t & env,
         // Количество рабочих потоков, которые должны быть созаны диспетчером.
         unsigned pool_size)
         :  event_queue_{*this}
         ,  init_reinit_ch_{so_5::create_mchain(env)}
         ,  other_demands_ch_{so_5::create_mchain(env)} {
      const auto [first_type_count, second_type_count] =
            calculate_pools_sizes(pool_size);

      launch_work_threads(first_type_count, second_type_count);
   }
   ~tricky_dispatcher_t() noexcept {
      // Все работающие нити должны быть остановлены.
      shutdown_work_threads();
   }

   // Метод-фабрика для создания экземпляров диспетчера.
   static auto make(so_5::environment_t & env, unsigned pool_size) {
      return std::make_shared<tricky_dispatcher_t>(env, pool_size);
   }

   // Создать биндера, который сможет привязать агента к этому диспетчеру.
   so_5::disp_binder_unique_ptr_t binder() {
      return so_5::disp_binder_unique_ptr_t{
            new tricky_disp_binder_t{shared_from_this()}};
   }
};

const std::type_index tricky_dispatcher_t::init_device_type =
      typeid(a_device_manager_t::init_device_t);
const std::type_index tricky_dispatcher_t::reinit_device_type =
      typeid(so_5::mutable_msg<a_device_manager_t::reinit_device_t>);

Комментариев нет: