vendor/symfony/doctrine-messenger/Transport/DoctrineReceiver.php line 45

  1. <?php
  2. /*
  3.  * This file is part of the Symfony package.
  4.  *
  5.  * (c) Fabien Potencier <fabien@symfony.com>
  6.  *
  7.  * For the full copyright and license information, please view the LICENSE
  8.  * file that was distributed with this source code.
  9.  */
  10. namespace Symfony\Component\Messenger\Bridge\Doctrine\Transport;
  11. use Doctrine\DBAL\Exception as DBALException;
  12. use Doctrine\DBAL\Exception\RetryableException;
  13. use Symfony\Component\Messenger\Envelope;
  14. use Symfony\Component\Messenger\Exception\LogicException;
  15. use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
  16. use Symfony\Component\Messenger\Exception\TransportException;
  17. use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
  18. use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
  19. use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
  20. use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
  21. use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
  22. /**
  23.  * @author Vincent Touzet <vincent.touzet@gmail.com>
  24.  */
  25. class DoctrineReceiver implements ListableReceiverInterfaceMessageCountAwareInterface
  26. {
  27.     private const MAX_RETRIES 3;
  28.     private int $retryingSafetyCounter 0;
  29.     private Connection $connection;
  30.     private SerializerInterface $serializer;
  31.     public function __construct(Connection $connectionSerializerInterface $serializer null)
  32.     {
  33.         $this->connection $connection;
  34.         $this->serializer $serializer ?? new PhpSerializer();
  35.     }
  36.     public function get(): iterable
  37.     {
  38.         try {
  39.             $doctrineEnvelope $this->connection->get();
  40.             $this->retryingSafetyCounter 0// reset counter
  41.         } catch (RetryableException $exception) {
  42.             // Do nothing when RetryableException occurs less than "MAX_RETRIES"
  43.             // as it will likely be resolved on the next call to get()
  44.             // Problem with concurrent consumers and database deadlocks
  45.             if (++$this->retryingSafetyCounter >= self::MAX_RETRIES) {
  46.                 $this->retryingSafetyCounter 0// reset counter
  47.                 throw new TransportException($exception->getMessage(), 0$exception);
  48.             }
  49.             return [];
  50.         } catch (DBALException $exception) {
  51.             throw new TransportException($exception->getMessage(), 0$exception);
  52.         }
  53.         if (null === $doctrineEnvelope) {
  54.             return [];
  55.         }
  56.         return [$this->createEnvelopeFromData($doctrineEnvelope)];
  57.     }
  58.     public function ack(Envelope $envelope): void
  59.     {
  60.         try {
  61.             $this->connection->ack($this->findDoctrineReceivedStamp($envelope)->getId());
  62.         } catch (DBALException $exception) {
  63.             throw new TransportException($exception->getMessage(), 0$exception);
  64.         }
  65.     }
  66.     public function reject(Envelope $envelope): void
  67.     {
  68.         try {
  69.             $this->connection->reject($this->findDoctrineReceivedStamp($envelope)->getId());
  70.         } catch (DBALException $exception) {
  71.             throw new TransportException($exception->getMessage(), 0$exception);
  72.         }
  73.     }
  74.     public function getMessageCount(): int
  75.     {
  76.         try {
  77.             return $this->connection->getMessageCount();
  78.         } catch (DBALException $exception) {
  79.             throw new TransportException($exception->getMessage(), 0$exception);
  80.         }
  81.     }
  82.     public function all(int $limit null): iterable
  83.     {
  84.         try {
  85.             $doctrineEnvelopes $this->connection->findAll($limit);
  86.         } catch (DBALException $exception) {
  87.             throw new TransportException($exception->getMessage(), 0$exception);
  88.         }
  89.         foreach ($doctrineEnvelopes as $doctrineEnvelope) {
  90.             yield $this->createEnvelopeFromData($doctrineEnvelope);
  91.         }
  92.     }
  93.     public function find(mixed $id): ?Envelope
  94.     {
  95.         try {
  96.             $doctrineEnvelope $this->connection->find($id);
  97.         } catch (DBALException $exception) {
  98.             throw new TransportException($exception->getMessage(), 0$exception);
  99.         }
  100.         if (null === $doctrineEnvelope) {
  101.             return null;
  102.         }
  103.         return $this->createEnvelopeFromData($doctrineEnvelope);
  104.     }
  105.     private function findDoctrineReceivedStamp(Envelope $envelope): DoctrineReceivedStamp
  106.     {
  107.         /** @var DoctrineReceivedStamp|null $doctrineReceivedStamp */
  108.         $doctrineReceivedStamp $envelope->last(DoctrineReceivedStamp::class);
  109.         if (null === $doctrineReceivedStamp) {
  110.             throw new LogicException('No DoctrineReceivedStamp found on the Envelope.');
  111.         }
  112.         return $doctrineReceivedStamp;
  113.     }
  114.     private function createEnvelopeFromData(array $data): Envelope
  115.     {
  116.         try {
  117.             $envelope $this->serializer->decode([
  118.                 'body' => $data['body'],
  119.                 'headers' => $data['headers'],
  120.             ]);
  121.         } catch (MessageDecodingFailedException $exception) {
  122.             $this->connection->reject($data['id']);
  123.             throw $exception;
  124.         }
  125.         return $envelope->with(
  126.             new DoctrineReceivedStamp($data['id']),
  127.             new TransportMessageIdStamp($data['id'])
  128.         );
  129.     }
  130. }