суббота, 13 января 2018 г.

[prog.c++] Тема с упрощением "асинхронных операций" в SObjectizer начинает дышать...

Данный пост продолжает тему, затронутую в двух декабрьских постах "Попытка упростить работу с отложенными сообщениями в SO-5" и "Еще одна попытка упростить работу с отложенными сообщениями в SO-5". В результате длительного и напряженного выкуривания бамбука удалось придумать, как же это все должно выглядеть. И уже начала дышать одна из реализаций. Кому интересно, милости прошу под кат. Для тех же, кто не интересуется SObjectizer-ом и C++ными наворотами, ничего интересного в данном посте не будет. Извините.

Итак, прежде всего удалось переосмыслить проблему и пути ее решения. Что позволило понять, что речь идет о ситуациях, когда агент должен однократно среагировать на результат какой-то операции. По сути, речь идет о каком-то аналоге работы с std::async и std::future, но применительно к специфике SObjectizer-а (mbox-ы, сообщения и агенты с состояниями). Если при работы с std::async/std::future мы записываем что-то вроде:

std::future<result> f = std::async(service_provider, args);
... // We can do something here while service_provider works.
result r = f.get();

То в случае с агентами в SObjectizer придется приложить немного больше усилий. Нужно отослать сообщение на какой-то mbox. Затем нужно получить ответное сообщение, для чего нужно будет подписаться на него... Чтобы "тяжесть ситуации" была лучше понятна, попробую показать это в коде.

Сперва для простоты предположим, что для взаимодействия между service_provider-ом и customer-ом используется один и тот же mbox. Пусть так же, для простоты, у customer-а будет всего одно состояние, в котором он ожидает ответа. В этом случае разработчику потребуется написать такой код:

class customer : public so_5::agent_t {
   // Event handler of result message.
   void on_result(mhood_t<result> cmd) {
      ... // result handling code.
   }
   ...
   void some_internal_method() {
      // We should have mbox of service_provider to interact with it.
      const auto service_provider_mbox = acquire_service_provider_mbox_somehow();

      // We need to receive a result from service_provider.
      // A subscription must be created for that.
      so_subscribe(service_provider_mbox).event(&customer::on_result);

      // A request to service_provider can be initiated here.
      so_5::send<request>(service_provider_mbox, args);

      ... // We can do something here while service_provider works.
   }
   ...
};

Однако, не все так хорошо :(

Дело в том, что если мы делаем однократный запрос к service_provider-у, то нам нет смысла держать подписку на сообщение result из service_provider_mbox после того, как мы получим сообщение result. Поэтому в обработчике on_result эту подписку мы должны отменить. Соответственно, наш код уже усложняется:

class customer : public so_5::agent_t {
   // We should store mbox of service_provider to remove subscription
   // when result will be received.
   so_5::mbox_t service_provider_mbox_;

   // Event handler of result message.
   void on_result(mhood_t<result> cmd) {
      // Subscription should be destroyed.
      so_drop_subscription<result>(service_provider_mbox_);

      ... // result handling code.
   }
   ...
   void some_internal_method() {
      // We should have mbox of service_provider to interact with it.
      service_provider_mbox_ = acquire_service_provider_mbox_somehow();

      // We need to receive a result from service_provider.
      // A subscription must be created for that.
      so_subscribe(service_provider_mbox_).event(&customer::on_result);

      // A request to service_provider can be initiated here.
      so_5::send<request>(service_provider_mbox_, args);

      ... // We can do something here while service_provider works.
   }
   ...
};

Но ситуация становится еще хуже, когда в рассмотрение принимаются еще и состояния агента customer-а. Допустим, что у него два состояния -- st_free и st_busy, в каждом из которых агент должен реагировать на сообщение result по-разному. Тогда у нас получится что-то вроде:

class customer : public so_5::agent_t {
   state_t st_free{ this };
   state_t st_busy{ this };

   // We should store mbox of service_provider to remove subscription
   // when result will be received.
   so_5::mbox_t service_provider_mbox_;

   // Event handler of result message in st_free state.
   void on_result_when_free(mhood_t<result> cmd) {
      // Subscription should be destroyed.
      // NOTE: for all states!
      so_drop_subscription_for_all_states<result>(service_provider_mbox_);

      ... // result handling code.
   }
   // Event handler of result message in st_busy state.
   void on_result_when_free(mhood_t<result> cmd) {
      // Subscription should be destroyed.
      // NOTE: for all states!
      so_drop_subscription_for_all_states<result>(service_provider_mbox_);

      ... // result handling code.
   }
   ...
   void some_internal_method() {
      // We should have mbox of service_provider to interact with it.
      service_provider_mbox_ = acquire_service_provider_mbox_somehow();

      // We need to receive a result from service_provider.
      // A subscription must be created for that.
      st_free.event(service_provider_mbox_, &customer::on_result_when_free);
      st_busy.event(service_provider_mbox_, &customer::on_result_when_busy);

      // A request to service_provider can be initiated here.
      so_5::send<request>(service_provider_mbox_, args);

      ... // We can do something here while service_provider works.
   }
   ...
};

В чем здесь проблема?

На самом деле проблемы две :)

Первая проблема в том, что когда взаимодействие request-reply реализуется на основе асинхронных сообщений, то объем "синтаксического оверхеда" (с) волей-неволей возрастает. На мой взгляд, это вообще фундаментальная особенность общения на асинхронных сообщениях. В каких-то языках (вроде Erlang-а) объем синтаксического оверхеда будет поменьше. В C++ -- побольше. Но принципиально это все равно будет несколько многословнее синхронного взаимодействия. Так что первую проблему, как фундаментальную, я даже и не пытался решать.

Вторая проблема связана с тем, что в SObjectizer есть подписки и состояния. Поэтому после того, как однократное сообщение было получено, нужно отписаться от него. При этом подписку и отписку нужно производить для всех состояний, в которых агент хочет получать это однократное сообщение. И именно вот эту работу хочется сократить.

Итак, для того, чтобы упростить жизнь разработчику, которому приходится иметь дело с множеством подобных однократных запросов, в so_5_extra сейчас добавляется поддержка так называемых "асинхронных операций" (т.е. операций, которые не должны прерывать работу агента, инициировавшего операцию, результат же операции со временем должен прийти к агенту в виде того или иного сообщения). С их помощью показанный выше пример будет записан вот в таком виде:

class customer : public so_5::agent_t {
   state_t st_free{ this };
   state_t st_busy{ this };

   // Event handler of result message in st_free state.
   void on_result_when_free(mhood_t<result> cmd) {
      ... // result handling code.
   }
   // Event handler of result message in st_busy state.
   void on_result_when_free(mhood_t<result> cmd) {
      ... // result handling code.
   }
   ...
   void some_internal_method() {
      // We should have mbox of service_provider to interact with it.
      const auto service_provider_mbox = acquire_service_provider_mbox_somehow();

      // Create an async operation for this agent...
      so_5::extra::async_op::time_unlimited::make(*this)
         // Now define completion handlers for it...
         ->completed_on(service_provider_mbox, st_free, &customer::on_result_when_free)
         .completed_on(service_provider_mbox, st_busy, &customer::on_result_when_busy)
         // And activate operation.
         .activate();

      // A request to service_provider can be initiated here.
      so_5::send<request>(service_provider_mbox_, args);

      ... // We can do something here while service_provider works.
   }
   ...
};

Сразу можно обратить внимание на две вещи. Во-первых, теперь в обработчиках сообщений result нет надобности делать отписку от сообщений. И, во-вторых, т.к. не нужно делать отписку, то и нет надобности хранить mbox service_provider-а внутри customer-а.

Эти две вещи стали ненужными за счет того, что мы создаем объект "асинхронной операции" и настраиваем его. Вызовы completed_on -- это и есть настройка. Каждый вызов completed_on указывает сообщение, которое свидетельствует о завершении операции. Вызывая completed_on мы говорим: "когда из вот из этого mbox-а прилетит сообщение типа M, то оно должно быть обработано сообработчиком evt в состоянии state, а сама асинхронная операция должна считаться завершенной".

В примере выше было два вызова completed_on, поскольку у нас было всего два обработчика для двух состояний. Но, вообще-то говоря, этих обработчиков может быть и больше. И получать обработчики могут разные типы сообщений (тип сообщений выводится автоматически исходя из типа обработчика). Например, предположим, что у нас есть некая операция acquire_resource, которая может привести как к сообщению resource_acquired, так и к no_free_resources, так и к no_such_resource_type. Обработку всего этого хозяйства с помощью асинхронной операции можно записать так:

class resource_consumer : public so_5::agent_t {
   state_t st_working{ this };
   state_t st_free{ initial_substate_of{ st_working } };
   state_t st_busy{ substate_of{ st_working } };
   ...
   void on_resource_acquired_when_free(mhood_t<resource_acquired> cmd) {
      ...
   }
   void on_resource_acquired_when_busy(mhood_t<resource_acquired> cmd) {
      ...
   }
   void on_no_free_resources(mhood_t<no_free_resources> cmd) {
      ...
   }
   void on_no_such_resource_type(mhood_t<no_such_resource_type> cmd) {
      ...
   }

   void some_internal_method() {
      // Create an async operation for this agent...
      so_5::extra::async_op::time_unlimited::make(*this)
         // Now define completion handlers for it...
         // Expect reply messages to agent's direct mbox.
         ->completed_on(*this, st_free, &resource_consumer::on_resource_acquired_when_free)
         .completed_on(*this, st_busy, &resource_consumer::on_resource_acquired_when_busy)
         .completed_on(*this, st_working, &resource_consumer::on_no_free_resources)
         .completed_on(*this, st_working, &resource_consumer::on_no_such_resource_type)
         // And activate operation.
         .activate();

      // Request resource from resource_manager.
      so_5::send<request_source>(resource_manager_mbox(), ...);

      ... // We can do something here while service_provider works.
   }
};

Короче говоря, посредством completed_on задается перечень событий, которые будут считаться завершением операции. Затем операцию нужно активировать -- вызвать метод activate(). Именно в этом методе происходит основная магия. А именно: регистрируются обработчики всех перечисленных в completed_on событий. Т.е. когда вызывается completed_on, то описание события сохраняется внутри объекта-операции, но ничего не регистрируется. Таким образом внутри операции наполняется контейнера с описаниями событий. А вот внутри activate для всех событий из этого контейнера выполняется подписка. И с этого момента приход любого события, свидетельствующего о завершении операции, будет приводить к вызову нужного обработчика.

Фокус же состоит в том, что реально подписывается не заданный пользователем обработчик сообщения, специальная обертка. В этой обертке сперва выполняется отписка от всех обработчиков (ведь операция завершилась, подписки больше не нужны), а уже затем вызывается заданный пользователем обработчик. Т.е., если опускаться на уровень псевдокода, то вот такая конструкция:

class demo : public so_5::agent_t {
   void completion_handler(mhood_t<msg> cmd) {...}
   ...
   void some_internal_method() {
      so_5::extra::async_op::time_unlimited::make(*this)->
            completed_on(*this, state, &demo::completion_handler)
            .activate();
      ...
   }
};

превращается во что-то вроде:

class demo : public so_5::agent_t {
   void completion_handler(mhood_t<msg> cmd) {...}
   ...
   void some_internal_method() {
      {
         auto op__ = so_5::extra::async_op::time_unlimited::make(*this);
         so_subscribe_self()
            .in(state)
            .event([this, op__, &state](mhood_t<msg> cmd) {
               so_drop_subscription<msg>(so_direct_mbox(), state);
               op__.completed();
               completion_handler(cmd);
            });
      }
      ...
   }
};

Собственно, это и все. Получается, что нужно создать объект-операцию, описать все обработчики завершения операции (описание этих обработчиков сохраняется в объекте-операции), затем нужно активировать объект-операцию. Во время активации для всех описанных обработчиков завершения будут созданы подписки. Когда какая-то из этих подписок сработает, то произойдет отписка всех описанных обработчиков завершения, после чего будет вызван тот обработчик, для которого сработала подписка.

На данный момент реализована поддержка "асинхронных операций" без ограничения на время выполнения. Текущий вариант реализации можно увидеть здесь. Там же рядом можно посмотреть и простейший пример использования. На очереди реализация "асинхронных операций" с ограничением на время выполнения. Как раз там уже будут использоваться отложенные сообщения, ради упрощения работы с которыми все и затевалось. Интерфейс же time_limited-операций будет очень похож на интерфейс time_unlimited-операций.

Ну и кстати про интерфейс. Для создания асинхронной операции должна вызываться фабричная функция make. Эта функция возвращает умный указатель на объект-операцию (под именем op_shptr_t, хотя это имя мне самому пока не очень нравится). Объект-операция обязательно должен создаваться как динамический объект, поскольку умный указатель на него сохраняется в обработчиках-обертках. Соответственно, тут все построено на работе с умными указателями на объект-операцию. И сам объект-операция автоматически уничтожается после того, как он перестает быть нужным. Т.е., после завершения асинхронной операции. Именно поэтому во всех показанных выше примерах объект-операция нигде не сохранялся. Просто вызывался make, затем вызывались completed_on и activate. И все. Больше ничего не нужно.

Однако, если пользователю захотелось отменить асинхронную операцию раньше, чем она завершиться, то можно сохранить результат make(). Например:

namespace asyncop = so_5::extra::async_op::time_unlimited;
class resource_consumer : public so_5::agent_t {
   ...
   asyncop::op_shptr_t get_resource_op_ = asyncop::make(*this);
   ...
   virtual void so_evt_start() override {
      ...
      get_resource_op_->completed_on(...)
         .completed_on(...)
         ...
         .activate();
      ...
   }
   ...
   void on_resource_manager_restarted(mhood_t<restart_notification> cmd) {
      ...
      // Previous resource request is not actual anymore.
      // Async operation must be cancelled.
      get_resource_op_->cancel();
      ...
   }
};

Вызов cancel отменяет все сделанные подписки. После чего объект-операцию можно использовать для иницирования новой операции.

Собственно, к чему был весь этот текст? Этот текст нужен был для следующий вещей:

Во-первых, это такой промежуточный отчет о том, в каком направлении двинулась мысль. Пока все это в граните не отлито, поэтому если кто-то обнаружит серьезный косяк или же укажет на какие-то фатальные недостатки этого подхода, то есть возможность все это безболезненно поправить. Ну или отказаться от этого направления движения вообще. Либо же, если кто-то увидит, как это дело можно развернуть в какую-то другую сторону, дабы сделать что-то еще более полезное, то есть время и возможность расширить и углубить.

Во-вторых, хочу посоветоваться с читателями. Сейчас у объекта-операции есть только один метод activate(), который не получает никаких аргументов. Однако, есть подозрение, что может быть полезен activate() с лямбдой. Это позволит явным образом задавать действия, которые нужно выполнить при активации операции. Т.е. вместо вот такого:

so_5::extra::async_op::time_unlimited::make(*this)->completed_on(...)
   .completed_on(...)
   ...
   .activate();

... // Here can be some other actions.

// Initiation of async operation.
so_5::send<request_source>(resource_manager_mbox(), ...);

пользователь сможет писать вот так:

so_5::extra::async_op::time_unlimited::make(*this)->completed_on(...)
   .completed_on(...)
   ...
   .activate([&]{
      // Initiation of async operation.
      so_5::send<request_source>(resource_manager_mbox(), ...);
   });

... // Here can be some other actions.

Во втором варианте гораздо сложнее забыть сделать собственно сам запрос для выполнения асинхронной операции.

Однако, во втором варианте у меня непонятка вот с чем: если сейчас пользователь не сделал ни одного вызова completed_on, но вызвал activate, то ничего не происходит. Объект-операция остается в своем первоначальном состоянии. Но если activate будет получать лямбду и оказывается, что нет заданных обработчиков завершения операции, то что делать? Не вызывать лямбду? Порождать исключение?

Пока я склоняюсь к тому, чтобы второй вариант activate() порождал исключение, если предшествующих вызовов completed_on не было. Но хотелось бы услышать и еще чье-то мнение по этому поводу.

В-третьих, хочется поделиться соображениями о том, как будет выглядеть интерфейс time_limited-операции. Пока идея вот такая:

so_5::extra::async_op::time_limited::make<timeout_msg_typs>(agent, timeout_msg_args...)->
   completed_on(mbox1, state1, handler1)
   .completed_on(mbox2, state2, handler2)
   ...
   .on_timeout(state1, timeout_handler1)
   .on_timeout(state2, timeout_handler2)
   ...
   .activate(timeout);

Т.е. практически то же самое, за исключением того, что:

  • фабричная функция make будет получать тип сообщения/сигнала, который должен использоваться для ограничения времени выполнения операции. А так же список аргументов, которые должны быть переданы в конструктор этого сообщения (если это сообщение, а не сигнал);
  • кроме обработчиков нормального завершения операции так же задаются и обработчики сообщения-таймаута;
  • в метод activate() нужно передавать максимальную длительность операции.

На данный момент кажется, что такой интерфейс вполне реализуем. Так это или нет выяснится в ближайшее время. Однако, если у кого-то есть соображения на счет того, как должны работать time_limited-операции, то самое время это услышать.

Напоследок добавлю, что функционал асинхронных операций потребовал расширения API самого SObjectizer-а. Поэтому новый so_5_extra-1.0.4 требует SO-5.5.21. Обе эти версии сейчас находятся в разработке. Они будут выпущены одновременно.

Отправить комментарий