src/Core/Framework/MessageQueue/Api/ConsumeMessagesController.php line 62

  1. <?php declare(strict_types=1);
  2. namespace Shopware\Core\Framework\MessageQueue\Api;
  3. use Shopware\Core\Framework\Log\Package;
  4. use Shopware\Core\Framework\MessageQueue\Subscriber\CountHandledMessagesListener;
  5. use Shopware\Core\Framework\MessageQueue\Subscriber\EarlyReturnMessagesListener;
  6. use Shopware\Core\Framework\MessageQueue\Subscriber\MessageQueueStatsSubscriber;
  7. use Shopware\Core\Framework\Util\MemorySizeCalculator;
  8. use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
  9. use Symfony\Component\DependencyInjection\ServiceLocator;
  10. use Symfony\Component\EventDispatcher\EventDispatcher;
  11. use Symfony\Component\HttpFoundation\JsonResponse;
  12. use Symfony\Component\HttpFoundation\Request;
  13. use Symfony\Component\Messenger\EventListener\DispatchPcntlSignalListener;
  14. use Symfony\Component\Messenger\EventListener\StopWorkerOnMemoryLimitListener;
  15. use Symfony\Component\Messenger\EventListener\StopWorkerOnRestartSignalListener;
  16. use Symfony\Component\Messenger\EventListener\StopWorkerOnSigtermSignalListener;
  17. use Symfony\Component\Messenger\MessageBusInterface;
  18. use Symfony\Component\Messenger\Worker;
  19. use Symfony\Component\Routing\Annotation\Route;
  20. #[Route(defaults: ['_routeScope' => ['api']])]
  21. #[Package('system-settings')]
  22. class ConsumeMessagesController extends AbstractController
  23. {
  24.     /**
  25.      * @internal
  26.      */
  27.     public function __construct(private readonly ServiceLocator $receiverLocator, private readonly MessageBusInterface $bus, private readonly StopWorkerOnRestartSignalListener $stopWorkerOnRestartSignalListener, private readonly StopWorkerOnSigtermSignalListener $stopWorkerOnSigtermSignalListener, private readonly DispatchPcntlSignalListener $dispatchPcntlSignalListener, private readonly EarlyReturnMessagesListener $earlyReturnListener, private readonly MessageQueueStatsSubscriber $statsSubscriber, private readonly string $defaultTransportName, private readonly string $memoryLimit)
  28.     {
  29.     }
  30.     #[Route(path'/api/_action/message-queue/consume'name'api.action.message-queue.consume'methods: ['POST'])]
  31.     public function consumeMessages(Request $request): JsonResponse
  32.     {
  33.         $receiverName $request->get('receiver');
  34.         if (!$receiverName || !$this->receiverLocator->has($receiverName)) {
  35.             throw new \RuntimeException('No receiver name provided.');
  36.         }
  37.         $receiver $this->receiverLocator->get($receiverName);
  38.         $workerDispatcher = new EventDispatcher();
  39.         $listener = new CountHandledMessagesListener();
  40.         $workerDispatcher->addSubscriber($listener);
  41.         $workerDispatcher->addSubscriber($this->statsSubscriber);
  42.         $workerDispatcher->addSubscriber($this->stopWorkerOnRestartSignalListener);
  43.         $workerDispatcher->addSubscriber($this->stopWorkerOnSigtermSignalListener);
  44.         $workerDispatcher->addSubscriber($this->dispatchPcntlSignalListener);
  45.         $workerDispatcher->addSubscriber($this->earlyReturnListener);
  46.         if ($this->memoryLimit !== '-1') {
  47.             $workerDispatcher->addSubscriber(new StopWorkerOnMemoryLimitListener(
  48.                 MemorySizeCalculator::convertToBytes($this->memoryLimit)
  49.             ));
  50.         }
  51.         $worker = new Worker([$this->defaultTransportName => $receiver], $this->bus$workerDispatcher);
  52.         $worker->run();
  53.         return new JsonResponse(['handledMessages' => $listener->getHandledMessages()]);
  54.     }
  55. }