четверг, 14 декабря 2017 г.

[prog.c++] Попытка упростить работу с отложенными сообщениями в SO-5

Удобная и простая работа с таймерами -- это одна из важных причин использования акторов вообще и SObjectizer-а в частности. Вот, скажем, в примере для вчерашней статьи на Хабре, мне потребовалось сгенерировать цепочку событий, происходящих спустя определенное время. Что элементарно выполняется посредством простого использования функции send_delayed:

std::vector<milliseconds> delays{ 125ms, 250ms, 400ms, 500ms, 700ms, 750ms, 800ms };

for(const auto d : delays) {
   const std::string msg = std::to_string(d.count()) + "ms";
   so_5::send_delayed<std::string>(env, ordinary_mbox, d, msg);
   so_5::send_delayed<std::string>(env, anti_jitter_mbox, d, msg);
}

За счет того, что работа с таймерами происходит легко и непринужденно, таймеры в SObjectizer-овских приложениях используются повсеместно. Инициируешь какую-то операцию, отсылаешь самому себе следом отложенное сообщение для проверки результата. Когда отложенное сообщение пришло, проверяешь есть ли результат или нет. Если результат пришел раньше, то вообще хорошо: на отложенное сообщение можно не обращать внимания.

Однако, при работе с отложенными сообщениями в SObjectizer-5 есть очень важный нюанс: не так-то просто отказаться от отосланного отложенного сообщения. Допустим, вы начали операцию, взвели отложенное сообщение, а результат операции пришел раньше. Вроде как отложенное сообщение вам больше не нужно. И тут вы можете оказаться в одной из двух ситуаций:

  • можно тупо забить на это отложенное сообщение. Что вполне уместно, как минимум, в двух случаях:
    • во-первых, когда агент в принципе живет не долго. Скажем, агент начал операцию, получил результат, завершил работу. Отложенное сообщение до агента уже не дойдет. Не дождался результата, пришло отложенное сообщение, опять же завершил работу. Никаких проблем;
    • во-вторых, когда агент меняет свое состояние и больше никогда не возвращается в то состояние, в котором он ждал отложенного сообщения. Допустим, агент стартует в состоянии wait_resources и запрашивает нужные ему ресурсы. В этом состоянии агент реагирует на два сообщения: на сообщение о захвате ресурсов, после чего агент переходит в состояние working и никогда больше в wait_resources не возвращается, а так же на отложенное сообщение о том, что истекло время ожидания ресурсов. В этом случае после перехода в working агент уже может не думать про свое отложенное сообщение;
  • нужно корректно отменить отложенное сообщение, чтобы оно больше к вам не пришло. Например, ваш агент живет долго и периодически опрашивает соседних агентов. При каждом запросе он так же отсылает себе отложенное сообщение для того, чтобы среагировать на ситуацию, когда соседний агент не ответил. Вот тут вы практически сразу же сталкиваетесь с ситуацией, когда соседний агент ответил, а следом прилетело отложенное сообщение. На которое нужно как-то среагировать. И тут, казалось бы, самым удобным способом было бы просто отменить доставку отложенного сообщения. Мол, раз ответ получен, то отложенное сообщение больше нам не нужно.

В принципе, вы можете отменить отложенное сообщение. Для этого нужно a) отсылать его посредством send_periodic с нулевым значением параметра period и b) затем вызвать метод release() для возращенного ранее timer_id. Выглядит это как-то так:

class my_agent : public so_5::agent_t {
   // Идентификатор таймера, он нам нужен чтобы отменить отложенное
   // сообщение при получении ответа от соседнего агента.
   so_5::timer_id_t status_timer_;
   ...
   void check_coworker() {
      // Отсылаем запрос статуса.
      so_5::send<check_status>(coworker_mbox_, ...);
      // И взводим таймер на случай, если соседний агент не ответит вообще.
      status_timer_ = so_5::send_periodic<status_timedout>(
            *this// Сообщение самому себе.
            5s, // Задержка для сообщения.
            0s, // Нулевой period делает сообщение не периодическим,
               // а отложенным.
            ...);
   }

   void on_status(mhood_t<status_report> cmd) {
      // Соседний агент ответил. Отложенное сообщение нам больше не нужно.
      status_timer_.release();
      ...
   }
   ...
};

Так вот, фокус в том, что отмена отложенного сообщения произойдет только если сообщением все еще владеет таймерная нить (таймерный менеджер) SObjectizer-а. А вот если таймерная нить уже отсчитала положенное время и поставила сообщение в очередь агента (для простоты будем говорить про очередь агента), то отменить это сообщение уже нельзя. Оно таки дойдет до агента.

И вот это может стать неприятным моментом для тех агентов, которые живут долго и периодически используют отложенные сообщения.

Неприятный момент здесь в том, что один из очевидных способов борьбы с отложенными сообщениями, а именно отписка от сообщения, может и не сработать. На самом деле, иногда пытаются поступать так:

class my_agent : public so_5::agent_t {
   // Идентификатор таймера, он нам нужен чтобы отменить отложенное
   // сообщение при получении ответа от соседнего агента.
   so_5::timer_id_t status_timer_;
   ...
   void check_coworker() {
      // Отсылаем запрос статуса.
      so_5::send<check_status>(coworker_mbox_, ...);
      // Подписываемся на отложенное сообщение.
      so_subscribe_self().event(&my_agent::on_status_timedout);
      // И взводим таймер на случай, если соседний агент не ответит вообще.
      status_timer_ = so_5::send_periodic<status_timedout>(
            *this// Сообщение самому себе.
            5s, // Задержка для сообщения.
            0s, // Нулевой period делает сообщение не периодическим,
               // а отложенным.
            ...);
   }

   void on_status(mhood_t<status_report> cmd) {
      // Соседний агент ответил. Отложенное сообщение нам больше не нужно.
      status_timer_.release();
      // Отписываемся от отложенного сообщения.
      // Теперь даже если сообщение уже стоит в очереди, то оно не будет
      // обработано, т.к. нет подписки на него.
      so_drop_subscription(so_direct_mbox(), &my_agent::on_status_timedout);
      ...
   }
   ...
};

В принципе, это хороший подход. Но он может не сработать в условиях, когда в очереди агента стоит много сообщений. Например, в приложении случился какой-то затык, кто-то стал подтормаживать, у каких-то агентов начали расти очереди необработанных сообщений. И вот в конец этой очереди становится отложенное сообщение status_timedout. Соответственно, status_timer_.release() должным образом не сработал, подписку мы отменили и надеемся, что отложенное сообщение будет проигнорировано...

Но в процессе обработки всех накопившихся сообщений мы опять решили, что пора бы проверить статус соседнего агента. Отослали запрос статуса, подписались и отослали новое отложенное сообщение. И тут к нам таки дошло старое отложенное сообщение, которое уже стоит в очереди. А мы его воспринимаем как новое!

Конечно же, вероятность такого стечения событий при нормальной работе крайне мала. Поэтому на такие нюансы не заморачиваешься. Но вот когда с приложением начинаются какие-то непредусмотренные фокусы, например, внезапные всплески активности, то вот эта маленькая вероятность внезапно начинает стремительно расти и, к несчастью, временами таки достигает единицы... Тогда происходит двойная жопа: мало того, что само приложение ведет себя непонятно как, так еще и откуда-то прилетают сообщения, которых быть не должно. Кстати говоря, про непонятности в поведении акторного приложения речь шла во второй части статьи про набитые за 15 лет использования акторов шишки.

Очень надежно себя зарекомендовал такой подход: в агенте заводится счетчик ID-шников, который увеличивается при отсылке очередного отложенного сообщения и при его отмене. Значение счетчика отсылается в отложенном сообщении. Когда сообщение поступает, то мы проверяем значение счетчика из сообщения и из самого агента. Если они совпадают, то сообщение обрабатывается. Если нет -- то игнорируется. В принципе, в этом случае даже нет необходимости отменять таймер, если отложенные сообщения отсылаются нечасто. Можно вообще делать как-то так:

class my_agent : public so_5::agent_t {
   // Идентификатор операции запроса статуса.
   std::int_fast64_t status_check_id_{};
   ...
   // Отложенное сообщение, которое мы будем отсылать самим себе.
   // И в котором нам нужен идентификатор операции.
   struct status_timedout final : public so_5::message_t {
      std::int_fast64_t id_;
      status_timedout(std::int_fast64_t id) : id_{id} {}
   };

   void check_coworker() {
      // Для новой операции нужен новый идентификатор.
      ++status_check_id_;
      // Отсылаем запрос статуса.
      so_5::send<check_status>(coworker_mbox_, ...);
      // И взводим таймер на случай, если соседний агент не ответит вообще.
      so_5::send_delayed<status_timedout>(
            *this// Сообщение самому себе.
            5s, // Задержка для сообщения.
            status_check_id_ // Идентификатор текущей операции.
            );
   }

   void on_status(mhood_t<status_report> cmd) {
      // Соседний агент ответил. Меняем идентификатор текущей операции.
      ++status_check_id_;
      ...
   }

   void on_status_timedout(mhood_t<status_timedout> cmd) {
      if(status_check_id_ == cmd->id_) {
         ... // Это действительно тайм-аут для актуальной операции.
      }
   }
   ...
};

В общем, именно такой способ "борьбы" с отложенными сообщениями мы рекомендуем всем, кто задает нам вопросы по поводу особенностей работы с таймерами в SObjectizer. Но смущало то, что сам SObjectizer до сих пор не имеет каких-то встроенных механизмов для упрощения этой работы.

Сейчас вот появилась некая идея, которая на стадии прототипа начала проходить проверку. Я попробую ее описать и показать. Если кто-то сочтет возможным оставить свой фидбэк по этой идее, то это будет очень здорово. Либо идея канет в лету, как нежизнеспобная, либо же приобретет окончательные черты и станет доступна пользователям SObjectizer-а.

Итак, суть в том, чтобы:

  • подмешать специальный тип в свой тип отложенного сообщения. Как раз в этой "примеси" и будет находится уникальное значение идентификатора, по которому можно будет проверить актуальность сообщения;
  • создать в своем агенте специального помощника для работы с отложенным сообщением. Посредством этого помощника будет происходить отсылка, отмена и обработка отложенных сообщений.

Показанный выше пример с этим помощником будет выглядеть так:

class my_agent : public so_5::agent_t {
   // Помощник, который будет заниматься отложенным сообщением.
   so_5::extra::delayed_msg_helper<>::issuer_t status_timeout_issuer_;
   ...
   // Отложенное сообщение, которое мы будем отсылать самим себе.
   // И в котором нам нужен идентификатор операции.
   struct status_timedout final
      : public so_5::message_t
      // Наследуемся и от специального типа, внутри которого будет
      // все необходимое для помощника.
      , public so_5::extra::delayed_msg_helper<>::msg_mixin_t
   {
      // Здесь могут быть любые дополнительные данные, которые нам
      // могут потребоваться в отложенном сообщении.
      const so_5::mbox_t coworker_mbox_;
      ...

      status_timedout(so_5::mbox_t coworker_mbox, ...)
         : coworker_mbox_{std::move(coworker_mbox)}
         ...
         {}
   };

   void check_coworker() {
      // Отсылаем запрос статуса.
      so_5::send<check_status>(coworker_mbox_, ...);
      // И взводим таймер на случай, если соседний агент не ответит вообще.
      status_timeout_issuer_.schedule<status_timedout>(
            *this// Сообщение самому себе.
            5s, // Задержка для сообщения.
            // Все остальные значения пойдут в конструктор status_timedout.
            coworker_mbox_, ...);
   }

   void on_status(mhood_t<status_report> cmd) {
      // Соседний агент ответил. Отменяем отложенное сообщение.
      status_timeout_issuer_.cancel();
      ...
   }

   void on_status_timedout(mhood_t<status_timedout> cmd) {
      // Обращаемся к помощнику. Если пришло актуальное отложенное сообщение,
      // то он вызовет переданную ему лямбда-функцию.
      status_timeout_issuer_.try_handle(*cmd, [this](const auto & msg) {
         ... // Если мы здесь, значит отложенное сообщение актуально.
            // А ссылка msg -- это ссылка на актуальный экземпляр status_timedout.
      });
   }
   ...
};

Вот как-то так пока вырисовывается.

У кого какие впечатления? Стоит ли пытаться это доводить до ума?

Сейчас помощник -- это шаблонный тип, который параметризуется типом используемого счетчика (по умолчанию это std::int_fast64_t). Если кому-то достаточно std::int_fast8_t, то можно параметризовать помощника именно std::int_fast8_t.

Может сразу параметризовать помощника типом отсылаемого сообщения? Т.е. могло бы быть так:

class my_agent : public so_5::agent_t {
   // Отложенное сообщение, которое мы будем отсылать самим себе.
   // И в котором нам нужен идентификатор операции.
   struct status_timedout final
      : public so_5::message_t
      // Наследуемся и от специального типа, внутри которого будет
      // все необходимое для помощника.
      , public so_5::extra::delayed_msg::msg_mixin_t<>
   {
      // Здесь могут быть любые дополнительные данные, которые нам
      // могут потребоваться в отложенном сообщении.
      const so_5::mbox_t coworker_mbox_;
      ...

      status_timedout(so_5::mbox_t coworker_mbox, ...)
         : coworker_mbox_{std::move(coworker_mbox)}
         ...
         {}
   };

   // Помощник, который будет заниматься отложенным сообщением.
   so_5::extra::delayed_msg::issuer_t<status_timeout> status_timeout_issuer_;

   void check_coworker() {
      // Отсылаем запрос статуса.
      so_5::send<check_status>(coworker_mbox_, ...);
      // И взводим таймер на случай, если соседний агент не ответит вообще.
      status_timeout_issuer_.schedule(
            *this// Сообщение самому себе.
            5s, // Задержка для сообщения.
            // Все остальные значения пойдут в конструктор status_timedout.
            coworker_mbox_, ...);
   }

В общем, любой фидбэк, в том числе и отрицательный, будет очень полезен.

Отправить комментарий