src/Core/Framework/MessageQueue/ScheduledTask/Scheduler/TaskScheduler.php line 131

  1. <?php declare(strict_types=1);
  2. namespace Shopware\Core\Framework\MessageQueue\ScheduledTask\Scheduler;
  3. use Shopware\Core\Defaults;
  4. use Shopware\Core\Framework\Context;
  5. use Shopware\Core\Framework\DataAbstractionLayer\EntityRepository;
  6. use Shopware\Core\Framework\DataAbstractionLayer\Search\Aggregation\Metric\MinAggregation;
  7. use Shopware\Core\Framework\DataAbstractionLayer\Search\AggregationResult\AggregationResult;
  8. use Shopware\Core\Framework\DataAbstractionLayer\Search\AggregationResult\Metric\MinResult;
  9. use Shopware\Core\Framework\DataAbstractionLayer\Search\Criteria;
  10. use Shopware\Core\Framework\DataAbstractionLayer\Search\Filter\EqualsAnyFilter;
  11. use Shopware\Core\Framework\DataAbstractionLayer\Search\Filter\EqualsFilter;
  12. use Shopware\Core\Framework\DataAbstractionLayer\Search\Filter\NotFilter;
  13. use Shopware\Core\Framework\DataAbstractionLayer\Search\Filter\RangeFilter;
  14. use Shopware\Core\Framework\Log\Package;
  15. use Shopware\Core\Framework\MessageQueue\ScheduledTask\ScheduledTask;
  16. use Shopware\Core\Framework\MessageQueue\ScheduledTask\ScheduledTaskDefinition;
  17. use Shopware\Core\Framework\MessageQueue\ScheduledTask\ScheduledTaskEntity;
  18. use Symfony\Component\DependencyInjection\ParameterBag\ParameterBagInterface;
  19. use Symfony\Component\Messenger\MessageBusInterface;
  20. #[Package('core')]
  21. final class TaskScheduler
  22. {
  23.     /**
  24.      * @internal
  25.      */
  26.     public function __construct(private readonly EntityRepository $scheduledTaskRepository, private readonly MessageBusInterface $bus, private readonly ParameterBagInterface $parameterBag)
  27.     {
  28.     }
  29.     public function queueScheduledTasks(): void
  30.     {
  31.         $criteria $this->buildCriteriaForAllScheduledTask();
  32.         $context Context::createDefaultContext();
  33.         $tasks $this->scheduledTaskRepository->search($criteria$context)->getEntities();
  34.         if (\count($tasks) === 0) {
  35.             return;
  36.         }
  37.         // Tasks **must not** be queued before their state in the database has been updated. Otherwise,
  38.         // a worker could have already fetched the task and set its state to running before it gets set to
  39.         // queued, thus breaking the task.
  40.         /** @var ScheduledTaskEntity $task */
  41.         foreach ($tasks as $task) {
  42.             $this->queueTask($task$context);
  43.         }
  44.     }
  45.     public function getNextExecutionTime(): ?\DateTimeInterface
  46.     {
  47.         $criteria $this->buildCriteriaForNextScheduledTask();
  48.         /** @var AggregationResult $aggregation */
  49.         $aggregation $this->scheduledTaskRepository
  50.             ->aggregate($criteriaContext::createDefaultContext())
  51.             ->get('nextExecutionTime');
  52.         /** @var MinResult $aggregation */
  53.         if (!$aggregation instanceof MinResult) {
  54.             return null;
  55.         }
  56.         if ($aggregation->getMin() === null) {
  57.             return null;
  58.         }
  59.         return new \DateTime((string) $aggregation->getMin());
  60.     }
  61.     public function getMinRunInterval(): ?int
  62.     {
  63.         $criteria $this->buildCriteriaForMinRunInterval();
  64.         $aggregation $this->scheduledTaskRepository
  65.             ->aggregate($criteriaContext::createDefaultContext())
  66.             ->get('runInterval');
  67.         /** @var MinResult $aggregation */
  68.         if (!$aggregation instanceof MinResult) {
  69.             return null;
  70.         }
  71.         if ($aggregation->getMin() === null) {
  72.             return null;
  73.         }
  74.         return (int) $aggregation->getMin();
  75.     }
  76.     private function buildCriteriaForAllScheduledTask(): Criteria
  77.     {
  78.         $criteria = new Criteria();
  79.         $criteria->addFilter(
  80.             new RangeFilter(
  81.                 'nextExecutionTime',
  82.                 [
  83.                     RangeFilter::LT => (new \DateTime())->format(Defaults::STORAGE_DATE_TIME_FORMAT),
  84.                 ]
  85.             ),
  86.             new EqualsAnyFilter('status', [
  87.                 ScheduledTaskDefinition::STATUS_SCHEDULED,
  88.                 ScheduledTaskDefinition::STATUS_SKIPPED,
  89.             ])
  90.         );
  91.         return $criteria;
  92.     }
  93.     private function queueTask(ScheduledTaskEntity $taskEntityContext $context): void
  94.     {
  95.         $taskClass $taskEntity->getScheduledTaskClass();
  96.         if (!\is_a($taskClassScheduledTask::class, true)) {
  97.             throw new \RuntimeException(sprintf(
  98.                 'Tried to schedule "%s", but class does not extend ScheduledTask',
  99.                 $taskClass
  100.             ));
  101.         }
  102.         if (!$taskClass::shouldRun($this->parameterBag)) {
  103.             $this->scheduledTaskRepository->update([
  104.                 [
  105.                     'id' => $taskEntity->getId(),
  106.                     'nextExecutionTime' => $this->calculateNextExecutionTime($taskEntity),
  107.                     'status' => ScheduledTaskDefinition::STATUS_SKIPPED,
  108.                 ],
  109.             ], $context);
  110.             return;
  111.         }
  112.         $this->scheduledTaskRepository->update([
  113.             [
  114.                 'id' => $taskEntity->getId(),
  115.                 'status' => ScheduledTaskDefinition::STATUS_QUEUED,
  116.             ],
  117.         ], $context);
  118.         $task = new $taskClass();
  119.         $task->setTaskId($taskEntity->getId());
  120.         $this->bus->dispatch($task);
  121.     }
  122.     private function buildCriteriaForNextScheduledTask(): Criteria
  123.     {
  124.         $criteria = new Criteria();
  125.         $criteria->addFilter(
  126.             new EqualsAnyFilter('status', [
  127.                 ScheduledTaskDefinition::STATUS_SCHEDULED,
  128.                 ScheduledTaskDefinition::STATUS_SKIPPED,
  129.             ])
  130.         )
  131.         ->addAggregation(new MinAggregation('nextExecutionTime''nextExecutionTime'));
  132.         return $criteria;
  133.     }
  134.     private function buildCriteriaForMinRunInterval(): Criteria
  135.     {
  136.         $criteria = new Criteria();
  137.         $criteria->addFilter(
  138.             new NotFilter(NotFilter::CONNECTION_AND, [
  139.                 new EqualsFilter('status'ScheduledTaskDefinition::STATUS_INACTIVE),
  140.             ])
  141.         )
  142.         ->addAggregation(new MinAggregation('runInterval''runInterval'));
  143.         return $criteria;
  144.     }
  145.     private function calculateNextExecutionTime(ScheduledTaskEntity $taskEntity): \DateTimeImmutable
  146.     {
  147.         $now = new \DateTimeImmutable();
  148.         $nextExecutionTimeString $taskEntity->getNextExecutionTime()->format(Defaults::STORAGE_DATE_TIME_FORMAT);
  149.         $nextExecutionTime = new \DateTimeImmutable($nextExecutionTimeString);
  150.         $newNextExecutionTime $nextExecutionTime->modify(sprintf('+%d seconds'$taskEntity->getRunInterval()));
  151.         return $newNextExecutionTime $now $now $newNextExecutionTime;
  152.     }
  153. }