vendor/symfony/messenger/Worker.php line 103

  1. <?php
  2. /*
  3.  * This file is part of the Symfony package.
  4.  *
  5.  * (c) Fabien Potencier <fabien@symfony.com>
  6.  *
  7.  * For the full copyright and license information, please view the LICENSE
  8.  * file that was distributed with this source code.
  9.  */
  10. namespace Symfony\Component\Messenger;
  11. use Psr\EventDispatcher\EventDispatcherInterface;
  12. use Psr\Log\LoggerInterface;
  13. use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
  14. use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent;
  15. use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
  16. use Symfony\Component\Messenger\Event\WorkerRateLimitedEvent;
  17. use Symfony\Component\Messenger\Event\WorkerRunningEvent;
  18. use Symfony\Component\Messenger\Event\WorkerStartedEvent;
  19. use Symfony\Component\Messenger\Event\WorkerStoppedEvent;
  20. use Symfony\Component\Messenger\Exception\HandlerFailedException;
  21. use Symfony\Component\Messenger\Exception\RejectRedeliveredMessageException;
  22. use Symfony\Component\Messenger\Exception\RuntimeException;
  23. use Symfony\Component\Messenger\Stamp\AckStamp;
  24. use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp;
  25. use Symfony\Component\Messenger\Stamp\FlushBatchHandlersStamp;
  26. use Symfony\Component\Messenger\Stamp\NoAutoAckStamp;
  27. use Symfony\Component\Messenger\Stamp\ReceivedStamp;
  28. use Symfony\Component\Messenger\Transport\Receiver\QueueReceiverInterface;
  29. use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
  30. use Symfony\Component\RateLimiter\LimiterInterface;
  31. /**
  32.  * @author Samuel Roze <samuel.roze@gmail.com>
  33.  * @author Tobias Schultze <http://tobion.de>
  34.  *
  35.  * @final
  36.  */
  37. class Worker
  38. {
  39.     private array $receivers;
  40.     private MessageBusInterface $bus;
  41.     private ?EventDispatcherInterface $eventDispatcher;
  42.     private ?LoggerInterface $logger;
  43.     private bool $shouldStop false;
  44.     private WorkerMetadata $metadata;
  45.     private array $acks = [];
  46.     private \SplObjectStorage $unacks;
  47.     private ?array $rateLimiters;
  48.     /**
  49.      * @param ReceiverInterface[] $receivers Where the key is the transport name
  50.      */
  51.     public function __construct(array $receiversMessageBusInterface $busEventDispatcherInterface $eventDispatcher nullLoggerInterface $logger null, array $rateLimiters null)
  52.     {
  53.         $this->receivers $receivers;
  54.         $this->bus $bus;
  55.         $this->logger $logger;
  56.         $this->eventDispatcher $eventDispatcher;
  57.         $this->metadata = new WorkerMetadata([
  58.             'transportNames' => array_keys($receivers),
  59.         ]);
  60.         $this->unacks = new \SplObjectStorage();
  61.         $this->rateLimiters $rateLimiters;
  62.     }
  63.     /**
  64.      * Receive the messages and dispatch them to the bus.
  65.      *
  66.      * Valid options are:
  67.      *  * sleep (default: 1000000): Time in microseconds to sleep after no messages are found
  68.      *  * queues: The queue names to consume from, instead of consuming from all queues. When this is used, all receivers must implement the QueueReceiverInterface
  69.      */
  70.     public function run(array $options = []): void
  71.     {
  72.         $options array_merge([
  73.             'sleep' => 1000000,
  74.         ], $options);
  75.         $queueNames $options['queues'] ?? null;
  76.         $this->metadata->set(['queueNames' => $queueNames]);
  77.         $this->eventDispatcher?->dispatch(new WorkerStartedEvent($this));
  78.         if ($queueNames) {
  79.             // if queue names are specified, all receivers must implement the QueueReceiverInterface
  80.             foreach ($this->receivers as $transportName => $receiver) {
  81.                 if (!$receiver instanceof QueueReceiverInterface) {
  82.                     throw new RuntimeException(sprintf('Receiver for "%s" does not implement "%s".'$transportNameQueueReceiverInterface::class));
  83.                 }
  84.             }
  85.         }
  86.         while (!$this->shouldStop) {
  87.             $envelopeHandled false;
  88.             $envelopeHandledStart microtime(true);
  89.             foreach ($this->receivers as $transportName => $receiver) {
  90.                 if ($queueNames) {
  91.                     $envelopes $receiver->getFromQueues($queueNames);
  92.                 } else {
  93.                     $envelopes $receiver->get();
  94.                 }
  95.                 foreach ($envelopes as $envelope) {
  96.                     $envelopeHandled true;
  97.                     $this->rateLimit($transportName);
  98.                     $this->handleMessage($envelope$transportName);
  99.                     $this->eventDispatcher?->dispatch(new WorkerRunningEvent($thisfalse));
  100.                     if ($this->shouldStop) {
  101.                         break 2;
  102.                     }
  103.                 }
  104.                 // after handling a single receiver, quit and start the loop again
  105.                 // this should prevent multiple lower priority receivers from
  106.                 // blocking too long before the higher priority are checked
  107.                 if ($envelopeHandled) {
  108.                     break;
  109.                 }
  110.             }
  111.             if (!$envelopeHandled && $this->flush(false)) {
  112.                 continue;
  113.             }
  114.             if (!$envelopeHandled) {
  115.                 $this->eventDispatcher?->dispatch(new WorkerRunningEvent($thistrue));
  116.                 if ($sleep = (int) ($options['sleep'] - 1e6 * (microtime(true) - $envelopeHandledStart))) {
  117.                     usleep($sleep);
  118.                 }
  119.             }
  120.         }
  121.         $this->flush(true);
  122.         $this->eventDispatcher?->dispatch(new WorkerStoppedEvent($this));
  123.     }
  124.     private function handleMessage(Envelope $envelopestring $transportName): void
  125.     {
  126.         $event = new WorkerMessageReceivedEvent($envelope$transportName);
  127.         $this->eventDispatcher?->dispatch($event);
  128.         $envelope $event->getEnvelope();
  129.         if (!$event->shouldHandle()) {
  130.             return;
  131.         }
  132.         $acked false;
  133.         $ack = function (Envelope $envelope\Throwable $e null) use ($transportName, &$acked) {
  134.             $acked true;
  135.             $this->acks[] = [$transportName$envelope$e];
  136.         };
  137.         try {
  138.             $e null;
  139.             $envelope $this->bus->dispatch($envelope->with(new ReceivedStamp($transportName), new ConsumedByWorkerStamp(), new AckStamp($ack)));
  140.         } catch (\Throwable $e) {
  141.         }
  142.         $noAutoAckStamp $envelope->last(NoAutoAckStamp::class);
  143.         if (!$acked && !$noAutoAckStamp) {
  144.             $this->acks[] = [$transportName$envelope$e];
  145.         } elseif ($noAutoAckStamp) {
  146.             $this->unacks[$noAutoAckStamp->getHandlerDescriptor()->getBatchHandler()] = [$envelope->withoutAll(AckStamp::class), $transportName];
  147.         }
  148.         $this->ack();
  149.     }
  150.     private function ack(): bool
  151.     {
  152.         $acks $this->acks;
  153.         $this->acks = [];
  154.         foreach ($acks as [$transportName$envelope$e]) {
  155.             $receiver $this->receivers[$transportName];
  156.             if (null !== $e) {
  157.                 if ($rejectFirst $e instanceof RejectRedeliveredMessageException) {
  158.                     // redelivered messages are rejected first so that continuous failures in an event listener or while
  159.                     // publishing for retry does not cause infinite redelivery loops
  160.                     $receiver->reject($envelope);
  161.                 }
  162.                 if ($e instanceof HandlerFailedException) {
  163.                     $envelope $e->getEnvelope();
  164.                 }
  165.                 $failedEvent = new WorkerMessageFailedEvent($envelope$transportName$e);
  166.                 $this->eventDispatcher?->dispatch($failedEvent);
  167.                 $envelope $failedEvent->getEnvelope();
  168.                 if (!$rejectFirst) {
  169.                     $receiver->reject($envelope);
  170.                 }
  171.                 continue;
  172.             }
  173.             $handledEvent = new WorkerMessageHandledEvent($envelope$transportName);
  174.             $this->eventDispatcher?->dispatch($handledEvent);
  175.             $envelope $handledEvent->getEnvelope();
  176.             if (null !== $this->logger) {
  177.                 $message $envelope->getMessage();
  178.                 $context = [
  179.                     'class' => $message::class,
  180.                 ];
  181.                 $this->logger->info('{class} was handled successfully (acknowledging to transport).'$context);
  182.             }
  183.             $receiver->ack($envelope);
  184.         }
  185.         return (bool) $acks;
  186.     }
  187.     private function rateLimit(string $transportName): void
  188.     {
  189.         if (!$this->rateLimiters) {
  190.             return;
  191.         }
  192.         if (!\array_key_exists($transportName$this->rateLimiters)) {
  193.             return;
  194.         }
  195.         /** @var LimiterInterface $rateLimiter */
  196.         $rateLimiter $this->rateLimiters[$transportName]->create();
  197.         if ($rateLimiter->consume()->isAccepted()) {
  198.             return;
  199.         }
  200.         $this->logger?->info('Transport {transport} is being rate limited, waiting for token to become available...', ['transport' => $transportName]);
  201.         $this->eventDispatcher?->dispatch(new WorkerRateLimitedEvent($rateLimiter$transportName));
  202.         $rateLimiter->reserve()->wait();
  203.     }
  204.     private function flush(bool $force): bool
  205.     {
  206.         $unacks $this->unacks;
  207.         if (!$unacks->count()) {
  208.             return false;
  209.         }
  210.         $this->unacks = new \SplObjectStorage();
  211.         foreach ($unacks as $batchHandler) {
  212.             [$envelope$transportName] = $unacks[$batchHandler];
  213.             try {
  214.                 $this->bus->dispatch($envelope->with(new FlushBatchHandlersStamp($force)));
  215.                 $envelope $envelope->withoutAll(NoAutoAckStamp::class);
  216.                 unset($unacks[$batchHandler], $batchHandler);
  217.             } catch (\Throwable $e) {
  218.                 $this->acks[] = [$transportName$envelope$e];
  219.             }
  220.         }
  221.         return $this->ack();
  222.     }
  223.     public function stop(): void
  224.     {
  225.         $this->logger?->info('Stopping worker.', ['transport_names' => $this->metadata->getTransportNames()]);
  226.         $this->shouldStop true;
  227.     }
  228.     public function getMetadata(): WorkerMetadata
  229.     {
  230.         return $this->metadata;
  231.     }
  232. }