src/Elasticsearch/Admin/AdminSearchRegistry.php line 136

  1. <?php declare(strict_types=1);
  2. namespace Shopware\Elasticsearch\Admin;
  3. use Doctrine\DBAL\Connection;
  4. use Doctrine\DBAL\Exception;
  5. use OpenSearch\Client;
  6. use Shopware\Core\Framework\DataAbstractionLayer\Event\EntityWrittenContainerEvent;
  7. use Shopware\Core\Framework\Event\ProgressAdvancedEvent;
  8. use Shopware\Core\Framework\Event\ProgressFinishedEvent;
  9. use Shopware\Core\Framework\Event\ProgressStartedEvent;
  10. use Shopware\Core\Framework\Log\Package;
  11. use Shopware\Core\Framework\Uuid\Uuid;
  12. use Shopware\Elasticsearch\Admin\Indexer\AbstractAdminIndexer;
  13. use Shopware\Elasticsearch\Exception\ElasticsearchIndexingException;
  14. use Symfony\Component\EventDispatcher\EventSubscriberInterface;
  15. use Symfony\Component\Messenger\Handler\MessageHandlerInterface;
  16. use Symfony\Component\Messenger\MessageBusInterface;
  17. use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
  18. /**
  19.  * @internal
  20.  * @final
  21.  */
  22. #[Package('system-settings')]
  23. class AdminSearchRegistry implements MessageHandlerInterfaceEventSubscriberInterface
  24. {
  25.     /**
  26.      * @var array<string, mixed>
  27.      */
  28.     private readonly array $indexer;
  29.     /**
  30.      * @var array<mixed>
  31.      */
  32.     private readonly array $config;
  33.     /**
  34.      * @param AbstractAdminIndexer[] $indexer
  35.      * @param array<mixed> $config
  36.      * @param array<mixed> $mapping
  37.      */
  38.     public function __construct(
  39.         $indexer,
  40.         private readonly Connection $connection,
  41.         private readonly MessageBusInterface $queue,
  42.         private readonly EventDispatcherInterface $dispatcher,
  43.         private readonly Client $client,
  44.         private readonly AdminElasticsearchHelper $adminEsHelper,
  45.         array $config,
  46.         private readonly array $mapping
  47.     ) {
  48.         $this->indexer $indexer instanceof \Traversable iterator_to_array($indexer) : $indexer;
  49.         if (isset($config['settings']['index'])) {
  50.             if (\array_key_exists('number_of_shards'$config['settings']['index']) && $config['settings']['index']['number_of_shards'] === null) {
  51.                 unset($config['settings']['index']['number_of_shards']);
  52.             }
  53.             if (\array_key_exists('number_of_replicas'$config['settings']['index']) && $config['settings']['index']['number_of_replicas'] === null) {
  54.                 unset($config['settings']['index']['number_of_replicas']);
  55.             }
  56.         }
  57.         $this->config $config;
  58.     }
  59.     public function __invoke(AdminSearchIndexingMessage $message): void
  60.     {
  61.         $indexer $this->getIndexer($message->getEntity());
  62.         $documents $indexer->fetch($message->getIds());
  63.         $this->push($indexer$message->getIndices(), $documents$message->getIds());
  64.     }
  65.     public static function getSubscribedEvents(): array
  66.     {
  67.         return [
  68.             EntityWrittenContainerEvent::class => [
  69.                 ['refresh', -1000],
  70.             ],
  71.         ];
  72.     }
  73.     /**
  74.      * @return iterable<class-string>
  75.      */
  76.     public static function getHandledMessages(): iterable
  77.     {
  78.         return [
  79.             AdminSearchIndexingMessage::class,
  80.         ];
  81.     }
  82.     public function iterate(AdminIndexingBehavior $indexingBehavior): void
  83.     {
  84.         if ($this->adminEsHelper->getEnabled() === false) {
  85.             return;
  86.         }
  87.         /** @var array<string> $entities */
  88.         $entities array_keys($this->indexer);
  89.         if ($indexingBehavior->getOnlyEntities()) {
  90.             $entities array_intersect($entities$indexingBehavior->getOnlyEntities());
  91.         } elseif ($indexingBehavior->getSkipEntities()) {
  92.             $entities array_diff($entities$indexingBehavior->getSkipEntities());
  93.         }
  94.         $indices $this->createIndices($entities);
  95.         foreach ($entities as $entityName) {
  96.             $indexer $this->getIndexer($entityName);
  97.             $iterator $indexer->getIterator();
  98.             $this->dispatcher->dispatch(new ProgressStartedEvent($indexer->getName(), $iterator->fetchCount()));
  99.             while ($ids $iterator->fetch()) {
  100.                 // we provide no queue when the data is sent by the admin
  101.                 if ($indexingBehavior->getNoQueue() === true) {
  102.                     $this->__invoke(new AdminSearchIndexingMessage($indexer->getEntity(), $indexer->getName(), $indices$ids));
  103.                 } else {
  104.                     $this->queue->dispatch(new AdminSearchIndexingMessage($indexer->getEntity(), $indexer->getName(), $indices$ids));
  105.                 }
  106.                 $this->dispatcher->dispatch(new ProgressAdvancedEvent(\count($ids)));
  107.             }
  108.             $this->dispatcher->dispatch(new ProgressFinishedEvent($indexer->getName()));
  109.         }
  110.         $this->swapAlias($indices);
  111.     }
  112.     public function refresh(EntityWrittenContainerEvent $event): void
  113.     {
  114.         if ($this->adminEsHelper->getEnabled() === false || !$this->isIndexedEntityWritten($event)) {
  115.             return;
  116.         }
  117.         if ($this->adminEsHelper->getRefreshIndices()) {
  118.             $this->refreshIndices();
  119.         }
  120.         /** @var array<string, string> $indices */
  121.         $indices $this->connection->fetchAllKeyValue('SELECT `alias`, `index` FROM admin_elasticsearch_index_task');
  122.         if (empty($indices)) {
  123.             return;
  124.         }
  125.         foreach ($this->indexer as $indexer) {
  126.             $ids $event->getPrimaryKeys($indexer->getEntity());
  127.             if (empty($ids)) {
  128.                 continue;
  129.             }
  130.             $documents $indexer->fetch($ids);
  131.             $this->push($indexer$indices$documents$ids);
  132.         }
  133.     }
  134.     /**
  135.      * @return AbstractAdminIndexer[]
  136.      */
  137.     public function getIndexers(): iterable
  138.     {
  139.         return $this->indexer;
  140.     }
  141.     public function getIndexer(string $name): AbstractAdminIndexer
  142.     {
  143.         $indexer $this->indexer[$name] ?? null;
  144.         if ($indexer) {
  145.             return $indexer;
  146.         }
  147.         throw new ElasticsearchIndexingException([\sprintf('Indexer for name %s not found'$name)]);
  148.     }
  149.     private function isIndexedEntityWritten(EntityWrittenContainerEvent $event): bool
  150.     {
  151.         foreach ($this->indexer as $indexer) {
  152.             $ids $event->getPrimaryKeys($indexer->getEntity());
  153.             if (!empty($ids)) {
  154.                 return true;
  155.             }
  156.         }
  157.         return false;
  158.     }
  159.     /**
  160.      * @param array<string, string> $indices
  161.      * @param array<string, array<string|int, string>> $data
  162.      * @param array<string> $ids
  163.      */
  164.     private function push(AbstractAdminIndexer $indexer, array $indices, array $data, array $ids): void
  165.     {
  166.         $alias $this->adminEsHelper->getIndex($indexer->getName());
  167.         if (!isset($indices[$alias])) {
  168.             return;
  169.         }
  170.         $toRemove array_filter($ids, static fn (string $id): bool => !isset($data[$id]));
  171.         $documents = [];
  172.         foreach ($data as $id => $document) {
  173.             $documents[] = ['index' => ['_id' => $id]];
  174.             $documents[] = \array_replace(
  175.                 ['entityName' => $indexer->getEntity(), 'parameters' => [], 'textBoosted' => '''text' => ''],
  176.                 $document
  177.             );
  178.         }
  179.         foreach ($toRemove as $id) {
  180.             $documents[] = ['delete' => ['_id' => $id]];
  181.         }
  182.         $arguments = [
  183.             'index' => $indices[$alias],
  184.             'body' => $documents,
  185.         ];
  186.         $result $this->client->bulk($arguments);
  187.         if (\is_array($result) && !empty($result['errors'])) {
  188.             $errors $this->parseErrors($result);
  189.             throw new ElasticsearchIndexingException($errors);
  190.         }
  191.     }
  192.     /**
  193.      * @param array<string> $entities
  194.      *
  195.      * @throws Exception
  196.      *
  197.      * @return array<string, string>
  198.      */
  199.     private function createIndices(array $entities): array
  200.     {
  201.         $indexTasks = [];
  202.         $indices = [];
  203.         foreach ($entities as $entityName) {
  204.             $indexer $this->getIndexer($entityName);
  205.             $alias $this->adminEsHelper->getIndex($indexer->getName());
  206.             $index $alias '_' time();
  207.             if ($this->indexExists($index)) {
  208.                 continue;
  209.             }
  210.             $indices[$alias] = $index;
  211.             $this->create($indexer$index$alias);
  212.             $iterator $indexer->getIterator();
  213.             $indexTasks[] = [
  214.                 'id' => Uuid::randomBytes(),
  215.                 '`entity`' => $indexer->getEntity(),
  216.                 '`index`' => $index,
  217.                 '`alias`' => $alias,
  218.                 '`doc_count`' => $iterator->fetchCount(),
  219.             ];
  220.         }
  221.         $this->connection->executeStatement(
  222.             'DELETE FROM admin_elasticsearch_index_task WHERE `entity` IN (:entities)',
  223.             ['entities' => $entities],
  224.             ['entities' => Connection::PARAM_STR_ARRAY]
  225.         );
  226.         foreach ($indexTasks as $task) {
  227.             $this->connection->insert('admin_elasticsearch_index_task'$task);
  228.         }
  229.         return $indices;
  230.     }
  231.     private function refreshIndices(): void
  232.     {
  233.         $entities = [];
  234.         $indexTasks = [];
  235.         foreach ($this->indexer as $indexer) {
  236.             $alias $this->adminEsHelper->getIndex($indexer->getName());
  237.             if ($this->aliasExists($alias)) {
  238.                 continue;
  239.             }
  240.             $index $alias '_' time();
  241.             $this->create($indexer$index$alias);
  242.             $entities[] = $indexer->getEntity();
  243.             $iterator $indexer->getIterator();
  244.             $indexTasks[] = [
  245.                 'id' => Uuid::randomBytes(),
  246.                 '`entity`' => $indexer->getEntity(),
  247.                 '`index`' => $index,
  248.                 '`alias`' => $alias,
  249.                 '`doc_count`' => $iterator->fetchCount(),
  250.             ];
  251.         }
  252.         $this->connection->executeStatement(
  253.             'DELETE FROM admin_elasticsearch_index_task WHERE `entity` IN (:entities)',
  254.             ['entities' => $entities],
  255.             ['entities' => Connection::PARAM_STR_ARRAY]
  256.         );
  257.         foreach ($indexTasks as $task) {
  258.             $this->connection->insert('admin_elasticsearch_index_task'$task);
  259.         }
  260.     }
  261.     private function create(AbstractAdminIndexer $indexerstring $indexstring $alias): void
  262.     {
  263.         $mapping $indexer->mapping([
  264.             'properties' => [
  265.                 'id' => ['type' => 'keyword'],
  266.                 'textBoosted' => ['type' => 'text'],
  267.                 'text' => ['type' => 'text'],
  268.                 'entityName' => ['type' => 'keyword'],
  269.                 'parameters' => ['type' => 'keyword'],
  270.             ],
  271.         ]);
  272.         $mapping array_merge_recursive($mapping$this->mapping);
  273.         $body array_merge(
  274.             $this->config,
  275.             ['mappings' => $mapping]
  276.         );
  277.         $this->client->indices()->create([
  278.             'index' => $index,
  279.             'body' => $body,
  280.         ]);
  281.         $this->createAliasIfNotExisting($index$alias);
  282.     }
  283.     private function indexExists(string $name): bool
  284.     {
  285.         return $this->client->indices()->exists(['index' => $name]);
  286.     }
  287.     private function aliasExists(string $alias): bool
  288.     {
  289.         return $this->client->indices()->existsAlias(['name' => $alias]);
  290.     }
  291.     /**
  292.      * @param array<string, array<array<string, mixed>>> $result
  293.      *
  294.      * @return array<array{reason: string}|string>
  295.      */
  296.     private function parseErrors(array $result): array
  297.     {
  298.         $errors = [];
  299.         foreach ($result['items'] as $item) {
  300.             $item $item['index'] ?? $item['delete'];
  301.             if (\in_array($item['status'], [200201], true)) {
  302.                 continue;
  303.             }
  304.             $errors[] = [
  305.                 'index' => $item['_index'],
  306.                 'id' => $item['_id'],
  307.                 'type' => $item['error']['type'] ?? $item['_type'],
  308.                 'reason' => $item['error']['reason'] ?? $item['result'],
  309.             ];
  310.         }
  311.         return $errors;
  312.     }
  313.     private function createAliasIfNotExisting(string $indexstring $alias): void
  314.     {
  315.         $exist $this->client->indices()->existsAlias(['name' => $alias]);
  316.         if ($exist) {
  317.             return;
  318.         }
  319.         $this->putAlias($index$alias);
  320.     }
  321.     /**
  322.      * @param array<string, string> $indices
  323.      */
  324.     private function swapAlias($indices): void
  325.     {
  326.         foreach ($indices as $alias => $index) {
  327.             $exist $this->client->indices()->existsAlias(['name' => $alias]);
  328.             if (!$exist) {
  329.                 $this->putAlias($index$alias);
  330.                 return;
  331.             }
  332.             $current $this->client->indices()->getAlias(['name' => $alias]);
  333.             if (!isset($current[$index])) {
  334.                 $this->putAlias($index$alias);
  335.             }
  336.             unset($current[$index]);
  337.             $current array_keys($current);
  338.             foreach ($current as $value) {
  339.                 $this->client->indices()->delete(['index' => $value]);
  340.             }
  341.         }
  342.     }
  343.     private function putAlias(string $indexstring $alias): void
  344.     {
  345.         $this->client->indices()->refresh([
  346.             'index' => $index,
  347.         ]);
  348.         $this->client->indices()->putAlias(['index' => $index'name' => $alias]);
  349.     }
  350. }