vendor/symfony/doctrine-messenger/Transport/Connection.php line 208

  1. <?php
  2. /*
  3.  * This file is part of the Symfony package.
  4.  *
  5.  * (c) Fabien Potencier <fabien@symfony.com>
  6.  *
  7.  * For the full copyright and license information, please view the LICENSE
  8.  * file that was distributed with this source code.
  9.  */
  10. namespace Symfony\Component\Messenger\Bridge\Doctrine\Transport;
  11. use Doctrine\DBAL\Connection as DBALConnection;
  12. use Doctrine\DBAL\Driver\Exception as DriverException;
  13. use Doctrine\DBAL\Driver\Result as DriverResult;
  14. use Doctrine\DBAL\Exception as DBALException;
  15. use Doctrine\DBAL\Exception\TableNotFoundException;
  16. use Doctrine\DBAL\LockMode;
  17. use Doctrine\DBAL\Platforms\MySQLPlatform;
  18. use Doctrine\DBAL\Platforms\OraclePlatform;
  19. use Doctrine\DBAL\Query\QueryBuilder;
  20. use Doctrine\DBAL\Result;
  21. use Doctrine\DBAL\Schema\AbstractSchemaManager;
  22. use Doctrine\DBAL\Schema\Comparator;
  23. use Doctrine\DBAL\Schema\Schema;
  24. use Doctrine\DBAL\Schema\SchemaDiff;
  25. use Doctrine\DBAL\Schema\Synchronizer\SchemaSynchronizer;
  26. use Doctrine\DBAL\Schema\Table;
  27. use Doctrine\DBAL\Types\Types;
  28. use Symfony\Component\Messenger\Exception\InvalidArgumentException;
  29. use Symfony\Component\Messenger\Exception\TransportException;
  30. use Symfony\Contracts\Service\ResetInterface;
  31. /**
  32.  * @internal
  33.  *
  34.  * @author Vincent Touzet <vincent.touzet@gmail.com>
  35.  * @author Kévin Dunglas <dunglas@gmail.com>
  36.  */
  37. class Connection implements ResetInterface
  38. {
  39.     protected const TABLE_OPTION_NAME '_symfony_messenger_table_name';
  40.     protected const DEFAULT_OPTIONS = [
  41.         'table_name' => 'messenger_messages',
  42.         'queue_name' => 'default',
  43.         'redeliver_timeout' => 3600,
  44.         'auto_setup' => true,
  45.     ];
  46.     /**
  47.      * Configuration of the connection.
  48.      *
  49.      * Available options:
  50.      *
  51.      * * table_name: name of the table
  52.      * * connection: name of the Doctrine's entity manager
  53.      * * queue_name: name of the queue
  54.      * * redeliver_timeout: Timeout before redeliver messages still in handling state (i.e: delivered_at is not null and message is still in table). Default: 3600
  55.      * * auto_setup: Whether the table should be created automatically during send / get. Default: true
  56.      */
  57.     protected $configuration = [];
  58.     protected $driverConnection;
  59.     protected $queueEmptiedAt;
  60.     private ?SchemaSynchronizer $schemaSynchronizer;
  61.     private bool $autoSetup;
  62.     public function __construct(array $configurationDBALConnection $driverConnectionSchemaSynchronizer $schemaSynchronizer null)
  63.     {
  64.         $this->configuration array_replace_recursive(static::DEFAULT_OPTIONS$configuration);
  65.         $this->driverConnection $driverConnection;
  66.         $this->schemaSynchronizer $schemaSynchronizer;
  67.         $this->autoSetup $this->configuration['auto_setup'];
  68.     }
  69.     public function reset()
  70.     {
  71.         $this->queueEmptiedAt null;
  72.     }
  73.     public function getConfiguration(): array
  74.     {
  75.         return $this->configuration;
  76.     }
  77.     public static function buildConfiguration(string $dsn, array $options = []): array
  78.     {
  79.         if (false === $components parse_url($dsn)) {
  80.             throw new InvalidArgumentException(sprintf('The given Doctrine Messenger DSN "%s" is invalid.'$dsn));
  81.         }
  82.         $query = [];
  83.         if (isset($components['query'])) {
  84.             parse_str($components['query'], $query);
  85.         }
  86.         $configuration = ['connection' => $components['host']];
  87.         $configuration += $query $options + static::DEFAULT_OPTIONS;
  88.         $configuration['auto_setup'] = filter_var($configuration['auto_setup'], \FILTER_VALIDATE_BOOL);
  89.         // check for extra keys in options
  90.         $optionsExtraKeys array_diff(array_keys($options), array_keys(static::DEFAULT_OPTIONS));
  91.         if (\count($optionsExtraKeys)) {
  92.             throw new InvalidArgumentException(sprintf('Unknown option found: [%s]. Allowed options are [%s].'implode(', '$optionsExtraKeys), implode(', 'array_keys(static::DEFAULT_OPTIONS))));
  93.         }
  94.         // check for extra keys in options
  95.         $queryExtraKeys array_diff(array_keys($query), array_keys(static::DEFAULT_OPTIONS));
  96.         if (\count($queryExtraKeys)) {
  97.             throw new InvalidArgumentException(sprintf('Unknown option found in DSN: [%s]. Allowed options are [%s].'implode(', '$queryExtraKeys), implode(', 'array_keys(static::DEFAULT_OPTIONS))));
  98.         }
  99.         return $configuration;
  100.     }
  101.     /**
  102.      * @param int $delay The delay in milliseconds
  103.      *
  104.      * @return string The inserted id
  105.      *
  106.      * @throws DBALException
  107.      */
  108.     public function send(string $body, array $headersint $delay 0): string
  109.     {
  110.         $now = new \DateTimeImmutable();
  111.         $availableAt $now->modify(sprintf('+%d seconds'$delay 1000));
  112.         $queryBuilder $this->driverConnection->createQueryBuilder()
  113.             ->insert($this->configuration['table_name'])
  114.             ->values([
  115.                 'body' => '?',
  116.                 'headers' => '?',
  117.                 'queue_name' => '?',
  118.                 'created_at' => '?',
  119.                 'available_at' => '?',
  120.             ]);
  121.         $this->executeStatement($queryBuilder->getSQL(), [
  122.             $body,
  123.             json_encode($headers),
  124.             $this->configuration['queue_name'],
  125.             $now,
  126.             $availableAt,
  127.         ], [
  128.             null,
  129.             null,
  130.             null,
  131.             Types::DATETIME_MUTABLE,
  132.             Types::DATETIME_MUTABLE,
  133.         ]);
  134.         return $this->driverConnection->lastInsertId();
  135.     }
  136.     public function get(): ?array
  137.     {
  138.         if ($this->driverConnection->getDatabasePlatform() instanceof MySQLPlatform) {
  139.             try {
  140.                 $this->driverConnection->delete($this->configuration['table_name'], ['delivered_at' => '9999-12-31 23:59:59']);
  141.             } catch (DriverException $e) {
  142.                 // Ignore the exception
  143.             }
  144.         }
  145.         get:
  146.         $this->driverConnection->beginTransaction();
  147.         try {
  148.             $query $this->createAvailableMessagesQueryBuilder()
  149.                 ->orderBy('available_at''ASC')
  150.                 ->setMaxResults(1);
  151.             if ($this->driverConnection->getDatabasePlatform() instanceof OraclePlatform) {
  152.                 $query->select('m.id');
  153.             }
  154.             // Append pessimistic write lock to FROM clause if db platform supports it
  155.             $sql $query->getSQL();
  156.             if (($fromPart $query->getQueryPart('from')) &&
  157.                 ($table $fromPart[0]['table'] ?? null) &&
  158.                 ($alias $fromPart[0]['alias'] ?? null)
  159.             ) {
  160.                 $fromClause sprintf('%s %s'$table$alias);
  161.                 $sql str_replace(
  162.                     sprintf('FROM %s WHERE'$fromClause),
  163.                     sprintf('FROM %s WHERE'$this->driverConnection->getDatabasePlatform()->appendLockHint($fromClauseLockMode::PESSIMISTIC_WRITE)),
  164.                     $sql
  165.                 );
  166.             }
  167.             // Wrap the rownum query in a sub-query to allow writelocks without ORA-02014 error
  168.             if ($this->driverConnection->getDatabasePlatform() instanceof OraclePlatform) {
  169.                 $sql $this->createQueryBuilder('w')
  170.                     ->where('w.id IN ('.str_replace('SELECT a.* FROM''SELECT a.id FROM'$sql).')')
  171.                     ->getSQL();
  172.             }
  173.             // use SELECT ... FOR UPDATE to lock table
  174.             $stmt $this->executeQuery(
  175.                 $sql.' '.$this->driverConnection->getDatabasePlatform()->getWriteLockSQL(),
  176.                 $query->getParameters(),
  177.                 $query->getParameterTypes()
  178.             );
  179.             $doctrineEnvelope $stmt instanceof Result || $stmt instanceof DriverResult $stmt->fetchAssociative() : $stmt->fetch();
  180.             if (false === $doctrineEnvelope) {
  181.                 $this->driverConnection->commit();
  182.                 $this->queueEmptiedAt microtime(true) * 1000;
  183.                 return null;
  184.             }
  185.             // Postgres can "group" notifications having the same channel and payload
  186.             // We need to be sure to empty the queue before blocking again
  187.             $this->queueEmptiedAt null;
  188.             $doctrineEnvelope $this->decodeEnvelopeHeaders($doctrineEnvelope);
  189.             $queryBuilder $this->driverConnection->createQueryBuilder()
  190.                 ->update($this->configuration['table_name'])
  191.                 ->set('delivered_at''?')
  192.                 ->where('id = ?');
  193.             $now = new \DateTimeImmutable();
  194.             $this->executeStatement($queryBuilder->getSQL(), [
  195.                 $now,
  196.                 $doctrineEnvelope['id'],
  197.             ], [
  198.                 Types::DATETIME_MUTABLE,
  199.             ]);
  200.             $this->driverConnection->commit();
  201.             return $doctrineEnvelope;
  202.         } catch (\Throwable $e) {
  203.             $this->driverConnection->rollBack();
  204.             if ($this->autoSetup && $e instanceof TableNotFoundException) {
  205.                 $this->setup();
  206.                 goto get;
  207.             }
  208.             throw $e;
  209.         }
  210.     }
  211.     public function ack(string $id): bool
  212.     {
  213.         try {
  214.             if ($this->driverConnection->getDatabasePlatform() instanceof MySQLPlatform) {
  215.                 return $this->driverConnection->update($this->configuration['table_name'], ['delivered_at' => '9999-12-31 23:59:59'], ['id' => $id]) > 0;
  216.             }
  217.             return $this->driverConnection->delete($this->configuration['table_name'], ['id' => $id]) > 0;
  218.         } catch (DBALException $exception) {
  219.             throw new TransportException($exception->getMessage(), 0$exception);
  220.         }
  221.     }
  222.     public function reject(string $id): bool
  223.     {
  224.         try {
  225.             if ($this->driverConnection->getDatabasePlatform() instanceof MySQLPlatform) {
  226.                 return $this->driverConnection->update($this->configuration['table_name'], ['delivered_at' => '9999-12-31 23:59:59'], ['id' => $id]) > 0;
  227.             }
  228.             return $this->driverConnection->delete($this->configuration['table_name'], ['id' => $id]) > 0;
  229.         } catch (DBALException $exception) {
  230.             throw new TransportException($exception->getMessage(), 0$exception);
  231.         }
  232.     }
  233.     public function setup(): void
  234.     {
  235.         $configuration $this->driverConnection->getConfiguration();
  236.         $assetFilter $configuration->getSchemaAssetsFilter();
  237.         $configuration->setSchemaAssetsFilter(null);
  238.         $this->updateSchema();
  239.         $configuration->setSchemaAssetsFilter($assetFilter);
  240.         $this->autoSetup false;
  241.     }
  242.     public function getMessageCount(): int
  243.     {
  244.         $queryBuilder $this->createAvailableMessagesQueryBuilder()
  245.             ->select('COUNT(m.id) AS message_count')
  246.             ->setMaxResults(1);
  247.         $stmt $this->executeQuery($queryBuilder->getSQL(), $queryBuilder->getParameters(), $queryBuilder->getParameterTypes());
  248.         return $stmt instanceof Result || $stmt instanceof DriverResult $stmt->fetchOne() : $stmt->fetchColumn();
  249.     }
  250.     public function findAll(int $limit null): array
  251.     {
  252.         $queryBuilder $this->createAvailableMessagesQueryBuilder();
  253.         if (null !== $limit) {
  254.             $queryBuilder->setMaxResults($limit);
  255.         }
  256.         $stmt $this->executeQuery($queryBuilder->getSQL(), $queryBuilder->getParameters(), $queryBuilder->getParameterTypes());
  257.         $data $stmt instanceof Result || $stmt instanceof DriverResult $stmt->fetchAllAssociative() : $stmt->fetchAll();
  258.         return array_map(function ($doctrineEnvelope) {
  259.             return $this->decodeEnvelopeHeaders($doctrineEnvelope);
  260.         }, $data);
  261.     }
  262.     public function find(mixed $id): ?array
  263.     {
  264.         $queryBuilder $this->createQueryBuilder()
  265.             ->where('m.id = ? and m.queue_name = ?');
  266.         $stmt $this->executeQuery($queryBuilder->getSQL(), [$id$this->configuration['queue_name']]);
  267.         $data $stmt instanceof Result || $stmt instanceof DriverResult $stmt->fetchAssociative() : $stmt->fetch();
  268.         return false === $data null $this->decodeEnvelopeHeaders($data);
  269.     }
  270.     /**
  271.      * @internal
  272.      */
  273.     public function configureSchema(Schema $schemaDBALConnection $forConnection): void
  274.     {
  275.         // only update the schema for this connection
  276.         if ($forConnection !== $this->driverConnection) {
  277.             return;
  278.         }
  279.         if ($schema->hasTable($this->configuration['table_name'])) {
  280.             return;
  281.         }
  282.         $this->addTableToSchema($schema);
  283.     }
  284.     /**
  285.      * @internal
  286.      */
  287.     public function getExtraSetupSqlForTable(Table $createdTable): array
  288.     {
  289.         return [];
  290.     }
  291.     private function createAvailableMessagesQueryBuilder(): QueryBuilder
  292.     {
  293.         $now = new \DateTimeImmutable();
  294.         $redeliverLimit $now->modify(sprintf('-%d seconds'$this->configuration['redeliver_timeout']));
  295.         return $this->createQueryBuilder()
  296.             ->where('m.delivered_at is null OR m.delivered_at < ?')
  297.             ->andWhere('m.available_at <= ?')
  298.             ->andWhere('m.queue_name = ?')
  299.             ->setParameters([
  300.                 $redeliverLimit,
  301.                 $now,
  302.                 $this->configuration['queue_name'],
  303.             ], [
  304.                 Types::DATETIME_MUTABLE,
  305.                 Types::DATETIME_MUTABLE,
  306.             ]);
  307.     }
  308.     private function createQueryBuilder(string $alias 'm'): QueryBuilder
  309.     {
  310.         $queryBuilder $this->driverConnection->createQueryBuilder()
  311.             ->from($this->configuration['table_name'], $alias);
  312.         $alias .= '.';
  313.         if (!$this->driverConnection->getDatabasePlatform() instanceof OraclePlatform) {
  314.             return $queryBuilder->select($alias.'*');
  315.         }
  316.         // Oracle databases use UPPER CASE on tables and column identifiers.
  317.         // Column alias is added to force the result to be lowercase even when the actual field is all caps.
  318.         return $queryBuilder->select(str_replace(', '', '.$alias,
  319.             $alias.'id AS "id", body AS "body", headers AS "headers", queue_name AS "queue_name", '.
  320.             'created_at AS "created_at", available_at AS "available_at", '.
  321.             'delivered_at AS "delivered_at"'
  322.         ));
  323.     }
  324.     private function executeQuery(string $sql, array $parameters = [], array $types = [])
  325.     {
  326.         try {
  327.             $stmt $this->driverConnection->executeQuery($sql$parameters$types);
  328.         } catch (TableNotFoundException $e) {
  329.             if ($this->driverConnection->isTransactionActive()) {
  330.                 throw $e;
  331.             }
  332.             // create table
  333.             if ($this->autoSetup) {
  334.                 $this->setup();
  335.             }
  336.             $stmt $this->driverConnection->executeQuery($sql$parameters$types);
  337.         }
  338.         return $stmt;
  339.     }
  340.     protected function executeStatement(string $sql, array $parameters = [], array $types = [])
  341.     {
  342.         try {
  343.             if (method_exists($this->driverConnection'executeStatement')) {
  344.                 $stmt $this->driverConnection->executeStatement($sql$parameters$types);
  345.             } else {
  346.                 $stmt $this->driverConnection->executeUpdate($sql$parameters$types);
  347.             }
  348.         } catch (TableNotFoundException $e) {
  349.             if ($this->driverConnection->isTransactionActive()) {
  350.                 throw $e;
  351.             }
  352.             // create table
  353.             if ($this->autoSetup) {
  354.                 $this->setup();
  355.             }
  356.             if (method_exists($this->driverConnection'executeStatement')) {
  357.                 $stmt $this->driverConnection->executeStatement($sql$parameters$types);
  358.             } else {
  359.                 $stmt $this->driverConnection->executeUpdate($sql$parameters$types);
  360.             }
  361.         }
  362.         return $stmt;
  363.     }
  364.     private function getSchema(): Schema
  365.     {
  366.         $schema = new Schema([], [], $this->createSchemaManager()->createSchemaConfig());
  367.         $this->addTableToSchema($schema);
  368.         return $schema;
  369.     }
  370.     private function addTableToSchema(Schema $schema): void
  371.     {
  372.         $table $schema->createTable($this->configuration['table_name']);
  373.         // add an internal option to mark that we created this & the non-namespaced table name
  374.         $table->addOption(self::TABLE_OPTION_NAME$this->configuration['table_name']);
  375.         $table->addColumn('id'Types::BIGINT)
  376.             ->setAutoincrement(true)
  377.             ->setNotnull(true);
  378.         $table->addColumn('body'Types::TEXT)
  379.             ->setNotnull(true);
  380.         $table->addColumn('headers'Types::TEXT)
  381.             ->setNotnull(true);
  382.         $table->addColumn('queue_name'Types::STRING)
  383.             ->setLength(190// MySQL 5.6 only supports 191 characters on an indexed column in utf8mb4 mode
  384.             ->setNotnull(true);
  385.         $table->addColumn('created_at'Types::DATETIME_MUTABLE)
  386.             ->setNotnull(true);
  387.         $table->addColumn('available_at'Types::DATETIME_MUTABLE)
  388.             ->setNotnull(true);
  389.         $table->addColumn('delivered_at'Types::DATETIME_MUTABLE)
  390.             ->setNotnull(false);
  391.         $table->setPrimaryKey(['id']);
  392.         $table->addIndex(['queue_name']);
  393.         $table->addIndex(['available_at']);
  394.         $table->addIndex(['delivered_at']);
  395.     }
  396.     private function decodeEnvelopeHeaders(array $doctrineEnvelope): array
  397.     {
  398.         $doctrineEnvelope['headers'] = json_decode($doctrineEnvelope['headers'], true);
  399.         return $doctrineEnvelope;
  400.     }
  401.     private function updateSchema(): void
  402.     {
  403.         if (null !== $this->schemaSynchronizer) {
  404.             $this->schemaSynchronizer->updateSchema($this->getSchema(), true);
  405.             return;
  406.         }
  407.         $schemaManager $this->createSchemaManager();
  408.         $comparator $this->createComparator($schemaManager);
  409.         $schemaDiff $this->compareSchemas($comparator$schemaManager->createSchema(), $this->getSchema());
  410.         foreach ($schemaDiff->toSaveSql($this->driverConnection->getDatabasePlatform()) as $sql) {
  411.             if (method_exists($this->driverConnection'executeStatement')) {
  412.                 $this->driverConnection->executeStatement($sql);
  413.             } else {
  414.                 $this->driverConnection->exec($sql);
  415.             }
  416.         }
  417.     }
  418.     private function createSchemaManager(): AbstractSchemaManager
  419.     {
  420.         return method_exists($this->driverConnection'createSchemaManager')
  421.             ? $this->driverConnection->createSchemaManager()
  422.             : $this->driverConnection->getSchemaManager();
  423.     }
  424.     private function createComparator(AbstractSchemaManager $schemaManager): Comparator
  425.     {
  426.         return method_exists($schemaManager'createComparator')
  427.             ? $schemaManager->createComparator()
  428.             : new Comparator();
  429.     }
  430.     private function compareSchemas(Comparator $comparatorSchema $fromSchema $to): SchemaDiff
  431.     {
  432.         return method_exists($comparator'compareSchemas')
  433.             ? $comparator->compareSchemas($from$to)
  434.             : $comparator->compare($from$to);
  435.     }
  436. }