vendor/symfony/messenger/Middleware/HandleMessageMiddleware.php line 157

  1. <?php
  2. /*
  3.  * This file is part of the Symfony package.
  4.  *
  5.  * (c) Fabien Potencier <fabien@symfony.com>
  6.  *
  7.  * For the full copyright and license information, please view the LICENSE
  8.  * file that was distributed with this source code.
  9.  */
  10. namespace Symfony\Component\Messenger\Middleware;
  11. use Psr\Log\LoggerAwareTrait;
  12. use Psr\Log\NullLogger;
  13. use Symfony\Component\Messenger\Envelope;
  14. use Symfony\Component\Messenger\Exception\HandlerFailedException;
  15. use Symfony\Component\Messenger\Exception\LogicException;
  16. use Symfony\Component\Messenger\Exception\NoHandlerForMessageException;
  17. use Symfony\Component\Messenger\Handler\Acknowledger;
  18. use Symfony\Component\Messenger\Handler\HandlerDescriptor;
  19. use Symfony\Component\Messenger\Handler\HandlersLocatorInterface;
  20. use Symfony\Component\Messenger\Stamp\AckStamp;
  21. use Symfony\Component\Messenger\Stamp\FlushBatchHandlersStamp;
  22. use Symfony\Component\Messenger\Stamp\HandledStamp;
  23. use Symfony\Component\Messenger\Stamp\HandlerArgumentsStamp;
  24. use Symfony\Component\Messenger\Stamp\NoAutoAckStamp;
  25. /**
  26.  * @author Samuel Roze <samuel.roze@gmail.com>
  27.  */
  28. class HandleMessageMiddleware implements MiddlewareInterface
  29. {
  30.     use LoggerAwareTrait;
  31.     private HandlersLocatorInterface $handlersLocator;
  32.     private bool $allowNoHandlers;
  33.     public function __construct(HandlersLocatorInterface $handlersLocatorbool $allowNoHandlers false)
  34.     {
  35.         $this->handlersLocator $handlersLocator;
  36.         $this->allowNoHandlers $allowNoHandlers;
  37.         $this->logger = new NullLogger();
  38.     }
  39.     /**
  40.      * @throws NoHandlerForMessageException When no handler is found and $allowNoHandlers is false
  41.      */
  42.     public function handle(Envelope $envelopeStackInterface $stack): Envelope
  43.     {
  44.         $handler null;
  45.         $message $envelope->getMessage();
  46.         $context = [
  47.             'class' => $message::class,
  48.         ];
  49.         $exceptions = [];
  50.         $alreadyHandled false;
  51.         foreach ($this->handlersLocator->getHandlers($envelope) as $handlerDescriptor) {
  52.             if ($this->messageHasAlreadyBeenHandled($envelope$handlerDescriptor)) {
  53.                 $alreadyHandled true;
  54.                 continue;
  55.             }
  56.             try {
  57.                 $handler $handlerDescriptor->getHandler();
  58.                 $batchHandler $handlerDescriptor->getBatchHandler();
  59.                 /** @var AckStamp $ackStamp */
  60.                 if ($batchHandler && $ackStamp $envelope->last(AckStamp::class)) {
  61.                     $ack = new Acknowledger(get_debug_type($batchHandler), static function (\Throwable $e null$result null) use ($envelope$ackStamp$handlerDescriptor) {
  62.                         if (null !== $e) {
  63.                             $e = new HandlerFailedException($envelope, [$e]);
  64.                         } else {
  65.                             $envelope $envelope->with(HandledStamp::fromDescriptor($handlerDescriptor$result));
  66.                         }
  67.                         $ackStamp->ack($envelope$e);
  68.                     });
  69.                     $result $this->callHandler($handler$message$ack$envelope->last(HandlerArgumentsStamp::class));
  70.                     if (!\is_int($result) || $result) {
  71.                         throw new LogicException(sprintf('A handler implementing BatchHandlerInterface must return the size of the current batch as a positive integer, "%s" returned from "%s".'\is_int($result) ? $result get_debug_type($result), get_debug_type($batchHandler)));
  72.                     }
  73.                     if (!$ack->isAcknowledged()) {
  74.                         $envelope $envelope->with(new NoAutoAckStamp($handlerDescriptor));
  75.                     } elseif ($ack->getError()) {
  76.                         throw $ack->getError();
  77.                     } else {
  78.                         $result $ack->getResult();
  79.                     }
  80.                 } else {
  81.                     $result $this->callHandler($handler$messagenull$envelope->last(HandlerArgumentsStamp::class));
  82.                 }
  83.                 $handledStamp HandledStamp::fromDescriptor($handlerDescriptor$result);
  84.                 $envelope $envelope->with($handledStamp);
  85.                 $this->logger->info('Message {class} handled by {handler}'$context + ['handler' => $handledStamp->getHandlerName()]);
  86.             } catch (\Throwable $e) {
  87.                 $exceptions[] = $e;
  88.             }
  89.         }
  90.         /** @var FlushBatchHandlersStamp $flushStamp */
  91.         if ($flushStamp $envelope->last(FlushBatchHandlersStamp::class)) {
  92.             /** @var NoAutoAckStamp $stamp */
  93.             foreach ($envelope->all(NoAutoAckStamp::class) as $stamp) {
  94.                 try {
  95.                     $handler $stamp->getHandlerDescriptor()->getBatchHandler();
  96.                     $handler->flush($flushStamp->force());
  97.                 } catch (\Throwable $e) {
  98.                     $exceptions[] = $e;
  99.                 }
  100.             }
  101.         }
  102.         if (null === $handler && !$alreadyHandled) {
  103.             if (!$this->allowNoHandlers) {
  104.                 throw new NoHandlerForMessageException(sprintf('No handler for message "%s".'$context['class']));
  105.             }
  106.             $this->logger->info('No handler for message {class}'$context);
  107.         }
  108.         if (\count($exceptions)) {
  109.             throw new HandlerFailedException($envelope$exceptions);
  110.         }
  111.         return $stack->next()->handle($envelope$stack);
  112.     }
  113.     private function messageHasAlreadyBeenHandled(Envelope $envelopeHandlerDescriptor $handlerDescriptor): bool
  114.     {
  115.         /** @var HandledStamp $stamp */
  116.         foreach ($envelope->all(HandledStamp::class) as $stamp) {
  117.             if ($stamp->getHandlerName() === $handlerDescriptor->getName()) {
  118.                 return true;
  119.             }
  120.         }
  121.         return false;
  122.     }
  123.     private function callHandler(callable $handlerobject $message, ?Acknowledger $ack, ?HandlerArgumentsStamp $handlerArgumentsStamp): mixed
  124.     {
  125.         $arguments = [$message];
  126.         if (null !== $ack) {
  127.             $arguments[] = $ack;
  128.         }
  129.         if (null !== $handlerArgumentsStamp) {
  130.             $arguments = [...$arguments, ...$handlerArgumentsStamp->getAdditionalArguments()];
  131.         }
  132.         return $handler(...$arguments);
  133.     }
  134. }