Очередь на общественном транспорте rabbitmq, исключение

У меня есть потребитель, который обращается к Redis, чтобы сохранить некоторые данные

Сценарий такой:

  • Заказ на публикацию получен
  • Потребитель получает заказ
  • Сохраняет этот порядок в Redis

Когда сервер Redis отключен, и я пытаюсь сохранить в нем некоторые данные, он вызовет исключение (RedisException) в потребителе, когда это произойдет, я хочу повторно поставить сообщение в очередь для поддержания последовательности прибытия заказа и продолжать использовать сообщение, пока не получится


person Gabriel Guedes    schedule 14.01.2021    source источник


Ответы (2)


Этот тип упорядочивания сообщений ограничивает пропускную способность потребителя, поскольку вам, по сути, нужно указать PrefetchCount, равное 1, и запустить один единственный экземпляр вашего потребителя. Вы можете использовать политику повтора на MassTransit, чтобы повторить ошибку, и можете использовать новый Kill Switch, чтобы остановить конечную точку при превышении порога предела ошибок.

Хотя вы можете перенастроить конвейер ошибок MassTransit для NACK сообщения обратно в RabbitMQ, в нем нет никакой ценности, поскольку он просто будет доставлен прямо обратно потребителю, но вы можете сделать это через конфигурацию канала ошибок.

person Chris Patterson    schedule 14.01.2021

Вам необходимо использовать функцию повторной доставки MassTransit, которая достаточно хорошо документирована: https://masstransit-project.com/usage/exceptions.html#redelivery

Вот фрагмент кода из документации:

var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
    cfg.Host("rabbitmq://localhost/");

    cfg.UseMessageScheduler(new Uri("rabbitmq://localhost/quartz"));

    cfg.ReceiveEndpoint("submit-order", e =>
    {
        e.UseScheduledRedelivery(r => r.Intervals(TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(15), TimeSpan.FromMinutes(30)));
        e.UseMessageRetry(r => r.Immediate(5));
        e.Consumer(() => new SubmitOrderConsumer(sessionFactory));
    });
});

Как видите, вы можете комбинировать повторные попытки сообщения, которые происходят в процессе, с запланированной повторной доставкой, которая, по сути, возвращает сообщение в ту же очередь позже. Поскольку для этого требуется планирование, вам нужно либо использовать планирование Quartz, либо или поддержка отложенного обмена сообщениями из транспорта (например, запланированные сообщения в служебной шине Azure или отложенный обмен в RMQ).

Однако в этом случае трудно поддерживать последовательность. В то же время не существует брокера сообщений с возможностями конкурирующих потребителей, который в любом случае поддерживает упорядоченную доставку. Kafka поддерживает его, но у него другая парадигма линейно-секционированных журналов событий, и потребители могут потреблять только из назначенных им разделов, поэтому в любом случае нет действительно конкурирующих потребителей.

person Alexey Zimarev    schedule 14.01.2021