src/Core/Framework/MessageQueue/Subscriber/MessageQueueStatsSubscriber.php line 62

  1. <?php declare(strict_types=1);
  2. namespace Shopware\Core\Framework\MessageQueue\Subscriber;
  3. use Shopware\Core\Framework\Increment\IncrementGatewayRegistry;
  4. use Shopware\Core\Framework\Log\Package;
  5. use Symfony\Component\EventDispatcher\EventSubscriberInterface;
  6. use Symfony\Component\Messenger\Envelope;
  7. use Symfony\Component\Messenger\Event\SendMessageToTransportsEvent;
  8. use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
  9. use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent;
  10. /**
  11.  * @internal
  12.  */
  13. #[Package('system-settings')]
  14. class MessageQueueStatsSubscriber implements EventSubscriberInterface
  15. {
  16.     /**
  17.      * @internal
  18.      */
  19.     public function __construct(private readonly IncrementGatewayRegistry $gatewayRegistry)
  20.     {
  21.     }
  22.     public static function getSubscribedEvents(): array
  23.     {
  24.         return [
  25.             // must have higher priority than SendFailedMessageToFailureTransportListener
  26.             WorkerMessageFailedEvent::class => ['onMessageFailed'99],
  27.             WorkerMessageHandledEvent::class => 'onMessageHandled',
  28.             SendMessageToTransportsEvent::class => ['onMessageSent'99],
  29.         ];
  30.     }
  31.     public function onMessageFailed(WorkerMessageFailedEvent $event): void
  32.     {
  33.         if ($event->willRetry()) {
  34.             return;
  35.         }
  36.         $this->handle($event->getEnvelope(), false);
  37.     }
  38.     public function onMessageHandled(WorkerMessageHandledEvent $event): void
  39.     {
  40.         $this->handle($event->getEnvelope(), false);
  41.     }
  42.     public function onMessageSent(SendMessageToTransportsEvent $event): void
  43.     {
  44.         $this->handle($event->getEnvelope(), true);
  45.     }
  46.     private function handle(Envelope $envelopebool $increment): void
  47.     {
  48.         $name $envelope->getMessage()::class;
  49.         $gateway $this->gatewayRegistry->get(IncrementGatewayRegistry::MESSAGE_QUEUE_POOL);
  50.         if ($increment) {
  51.             $gateway->increment('message_queue_stats'$name);
  52.             return;
  53.         }
  54.         $gateway->decrement('message_queue_stats'$name);
  55.     }
  56. }