vendor/symfony/doctrine-messenger/Transport/DoctrineTransport.php line 72

  1. <?php
  2. /*
  3.  * This file is part of the Symfony package.
  4.  *
  5.  * (c) Fabien Potencier <fabien@symfony.com>
  6.  *
  7.  * For the full copyright and license information, please view the LICENSE
  8.  * file that was distributed with this source code.
  9.  */
  10. namespace Symfony\Component\Messenger\Bridge\Doctrine\Transport;
  11. use Doctrine\DBAL\Connection as DbalConnection;
  12. use Doctrine\DBAL\Schema\Schema;
  13. use Doctrine\DBAL\Schema\Table;
  14. use Symfony\Component\Messenger\Envelope;
  15. use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
  16. use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
  17. use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
  18. use Symfony\Component\Messenger\Transport\SetupableTransportInterface;
  19. use Symfony\Component\Messenger\Transport\TransportInterface;
  20. /**
  21.  * @author Vincent Touzet <vincent.touzet@gmail.com>
  22.  */
  23. class DoctrineTransport implements TransportInterfaceSetupableTransportInterfaceMessageCountAwareInterfaceListableReceiverInterface
  24. {
  25.     private Connection $connection;
  26.     private SerializerInterface $serializer;
  27.     private DoctrineReceiver $receiver;
  28.     private DoctrineSender $sender;
  29.     public function __construct(Connection $connectionSerializerInterface $serializer)
  30.     {
  31.         $this->connection $connection;
  32.         $this->serializer $serializer;
  33.     }
  34.     public function get(): iterable
  35.     {
  36.         return $this->getReceiver()->get();
  37.     }
  38.     public function ack(Envelope $envelope): void
  39.     {
  40.         $this->getReceiver()->ack($envelope);
  41.     }
  42.     public function reject(Envelope $envelope): void
  43.     {
  44.         $this->getReceiver()->reject($envelope);
  45.     }
  46.     public function getMessageCount(): int
  47.     {
  48.         return $this->getReceiver()->getMessageCount();
  49.     }
  50.     public function all(int $limit null): iterable
  51.     {
  52.         return $this->getReceiver()->all($limit);
  53.     }
  54.     public function find(mixed $id): ?Envelope
  55.     {
  56.         return $this->getReceiver()->find($id);
  57.     }
  58.     public function send(Envelope $envelope): Envelope
  59.     {
  60.         return $this->getSender()->send($envelope);
  61.     }
  62.     public function setup(): void
  63.     {
  64.         $this->connection->setup();
  65.     }
  66.     /**
  67.      * Adds the Table to the Schema if this transport uses this connection.
  68.      */
  69.     public function configureSchema(Schema $schemaDbalConnection $forConnection): void
  70.     {
  71.         $this->connection->configureSchema($schema$forConnection);
  72.     }
  73.     /**
  74.      * Adds extra SQL if the given table was created by the Connection.
  75.      *
  76.      * @return string[]
  77.      */
  78.     public function getExtraSetupSqlForTable(Table $createdTable): array
  79.     {
  80.         return $this->connection->getExtraSetupSqlForTable($createdTable);
  81.     }
  82.     private function getReceiver(): DoctrineReceiver
  83.     {
  84.         return $this->receiver ??= new DoctrineReceiver($this->connection$this->serializer);
  85.     }
  86.     private function getSender(): DoctrineSender
  87.     {
  88.         return $this->sender ??= new DoctrineSender($this->connection$this->serializer);
  89.     }
  90. }