| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268 | 
							- <?php
 
- /*
 
-  * This file is part of the Symfony package.
 
-  *
 
-  * (c) Fabien Potencier <fabien@symfony.com>
 
-  *
 
-  * For the full copyright and license information, please view the LICENSE
 
-  * file that was distributed with this source code.
 
-  */
 
- namespace Symfony\Component\Messenger;
 
- use Psr\Log\LoggerInterface;
 
- use Symfony\Component\EventDispatcher\Event;
 
- use Symfony\Component\EventDispatcher\LegacyEventDispatcherProxy;
 
- use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
 
- use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent;
 
- use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
 
- use Symfony\Component\Messenger\Event\WorkerRunningEvent;
 
- use Symfony\Component\Messenger\Event\WorkerStartedEvent;
 
- use Symfony\Component\Messenger\Event\WorkerStoppedEvent;
 
- use Symfony\Component\Messenger\Exception\HandlerFailedException;
 
- use Symfony\Component\Messenger\Exception\RejectRedeliveredMessageException;
 
- use Symfony\Component\Messenger\Exception\RuntimeException;
 
- use Symfony\Component\Messenger\Stamp\AckStamp;
 
- use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp;
 
- use Symfony\Component\Messenger\Stamp\FlushBatchHandlersStamp;
 
- use Symfony\Component\Messenger\Stamp\NoAutoAckStamp;
 
- use Symfony\Component\Messenger\Stamp\ReceivedStamp;
 
- use Symfony\Component\Messenger\Transport\Receiver\QueueReceiverInterface;
 
- use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
 
- use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
 
- /**
 
-  * @author Samuel Roze <samuel.roze@gmail.com>
 
-  * @author Tobias Schultze <http://tobion.de>
 
-  *
 
-  * @final
 
-  */
 
- class Worker
 
- {
 
-     private $receivers;
 
-     private $bus;
 
-     private $eventDispatcher;
 
-     private $logger;
 
-     private $shouldStop = false;
 
-     private $metadata;
 
-     private $acks = [];
 
-     private $unacks;
 
-     /**
 
-      * @param ReceiverInterface[] $receivers Where the key is the transport name
 
-      */
 
-     public function __construct(array $receivers, MessageBusInterface $bus, EventDispatcherInterface $eventDispatcher = null, LoggerInterface $logger = null)
 
-     {
 
-         $this->receivers = $receivers;
 
-         $this->bus = $bus;
 
-         $this->logger = $logger;
 
-         $this->eventDispatcher = class_exists(Event::class) ? LegacyEventDispatcherProxy::decorate($eventDispatcher) : $eventDispatcher;
 
-         $this->metadata = new WorkerMetadata([
 
-             'transportNames' => array_keys($receivers),
 
-         ]);
 
-         $this->unacks = new \SplObjectStorage();
 
-     }
 
-     /**
 
-      * Receive the messages and dispatch them to the bus.
 
-      *
 
-      * Valid options are:
 
-      *  * sleep (default: 1000000): Time in microseconds to sleep after no messages are found
 
-      *  * queues: The queue names to consume from, instead of consuming from all queues. When this is used, all receivers must implement the QueueReceiverInterface
 
-      */
 
-     public function run(array $options = []): void
 
-     {
 
-         $options = array_merge([
 
-             'sleep' => 1000000,
 
-         ], $options);
 
-         $queueNames = $options['queues'] ?? null;
 
-         $this->metadata->set(['queueNames' => $queueNames]);
 
-         $this->dispatchEvent(new WorkerStartedEvent($this));
 
-         if ($queueNames) {
 
-             // if queue names are specified, all receivers must implement the QueueReceiverInterface
 
-             foreach ($this->receivers as $transportName => $receiver) {
 
-                 if (!$receiver instanceof QueueReceiverInterface) {
 
-                     throw new RuntimeException(sprintf('Receiver for "%s" does not implement "%s".', $transportName, QueueReceiverInterface::class));
 
-                 }
 
-             }
 
-         }
 
-         while (!$this->shouldStop) {
 
-             $envelopeHandled = false;
 
-             $envelopeHandledStart = microtime(true);
 
-             foreach ($this->receivers as $transportName => $receiver) {
 
-                 if ($queueNames) {
 
-                     $envelopes = $receiver->getFromQueues($queueNames);
 
-                 } else {
 
-                     $envelopes = $receiver->get();
 
-                 }
 
-                 foreach ($envelopes as $envelope) {
 
-                     $envelopeHandled = true;
 
-                     $this->handleMessage($envelope, $transportName);
 
-                     $this->dispatchEvent(new WorkerRunningEvent($this, false));
 
-                     if ($this->shouldStop) {
 
-                         break 2;
 
-                     }
 
-                 }
 
-                 // after handling a single receiver, quit and start the loop again
 
-                 // this should prevent multiple lower priority receivers from
 
-                 // blocking too long before the higher priority are checked
 
-                 if ($envelopeHandled) {
 
-                     break;
 
-                 }
 
-             }
 
-             if (!$envelopeHandled && $this->flush(false)) {
 
-                 continue;
 
-             }
 
-             if (!$envelopeHandled) {
 
-                 $this->dispatchEvent(new WorkerRunningEvent($this, true));
 
-                 if (0 < $sleep = (int) ($options['sleep'] - 1e6 * (microtime(true) - $envelopeHandledStart))) {
 
-                     usleep($sleep);
 
-                 }
 
-             }
 
-         }
 
-         $this->flush(true);
 
-         $this->dispatchEvent(new WorkerStoppedEvent($this));
 
-     }
 
-     private function handleMessage(Envelope $envelope, string $transportName): void
 
-     {
 
-         $event = new WorkerMessageReceivedEvent($envelope, $transportName);
 
-         $this->dispatchEvent($event);
 
-         $envelope = $event->getEnvelope();
 
-         if (!$event->shouldHandle()) {
 
-             return;
 
-         }
 
-         $acked = false;
 
-         $ack = function (Envelope $envelope, \Throwable $e = null) use ($transportName, &$acked) {
 
-             $acked = true;
 
-             $this->acks[] = [$transportName, $envelope, $e];
 
-         };
 
-         try {
 
-             $e = null;
 
-             $envelope = $this->bus->dispatch($envelope->with(new ReceivedStamp($transportName), new ConsumedByWorkerStamp(), new AckStamp($ack)));
 
-         } catch (\Throwable $e) {
 
-         }
 
-         $noAutoAckStamp = $envelope->last(NoAutoAckStamp::class);
 
-         if (!$acked && !$noAutoAckStamp) {
 
-             $this->acks[] = [$transportName, $envelope, $e];
 
-         } elseif ($noAutoAckStamp) {
 
-             $this->unacks[$noAutoAckStamp->getHandlerDescriptor()->getBatchHandler()] = [$envelope->withoutAll(AckStamp::class), $transportName];
 
-         }
 
-         $this->ack();
 
-     }
 
-     private function ack(): bool
 
-     {
 
-         $acks = $this->acks;
 
-         $this->acks = [];
 
-         foreach ($acks as [$transportName, $envelope, $e]) {
 
-             $receiver = $this->receivers[$transportName];
 
-             if (null !== $e) {
 
-                 if ($rejectFirst = $e instanceof RejectRedeliveredMessageException) {
 
-                     // redelivered messages are rejected first so that continuous failures in an event listener or while
 
-                     // publishing for retry does not cause infinite redelivery loops
 
-                     $receiver->reject($envelope);
 
-                 }
 
-                 if ($e instanceof HandlerFailedException) {
 
-                     $envelope = $e->getEnvelope();
 
-                 }
 
-                 $failedEvent = new WorkerMessageFailedEvent($envelope, $transportName, $e);
 
-                 $this->dispatchEvent($failedEvent);
 
-                 $envelope = $failedEvent->getEnvelope();
 
-                 if (!$rejectFirst) {
 
-                     $receiver->reject($envelope);
 
-                 }
 
-                 continue;
 
-             }
 
-             $handledEvent = new WorkerMessageHandledEvent($envelope, $transportName);
 
-             $this->dispatchEvent($handledEvent);
 
-             $envelope = $handledEvent->getEnvelope();
 
-             if (null !== $this->logger) {
 
-                 $message = $envelope->getMessage();
 
-                 $context = [
 
-                     'class' => \get_class($message),
 
-                 ];
 
-                 $this->logger->info('{class} was handled successfully (acknowledging to transport).', $context);
 
-             }
 
-             $receiver->ack($envelope);
 
-         }
 
-         return (bool) $acks;
 
-     }
 
-     private function flush(bool $force): bool
 
-     {
 
-         $unacks = $this->unacks;
 
-         if (!$unacks->count()) {
 
-             return false;
 
-         }
 
-         $this->unacks = new \SplObjectStorage();
 
-         foreach ($unacks as $batchHandler) {
 
-             [$envelope, $transportName] = $unacks[$batchHandler];
 
-             try {
 
-                 $this->bus->dispatch($envelope->with(new FlushBatchHandlersStamp($force)));
 
-                 $envelope = $envelope->withoutAll(NoAutoAckStamp::class);
 
-                 unset($unacks[$batchHandler], $batchHandler);
 
-             } catch (\Throwable $e) {
 
-                 $this->acks[] = [$transportName, $envelope, $e];
 
-             }
 
-         }
 
-         return $this->ack();
 
-     }
 
-     public function stop(): void
 
-     {
 
-         if (null !== $this->logger) {
 
-             $this->logger->info('Stopping worker.', ['transport_names' => $this->metadata->getTransportNames()]);
 
-         }
 
-         $this->shouldStop = true;
 
-     }
 
-     public function getMetadata(): WorkerMetadata
 
-     {
 
-         return $this->metadata;
 
-     }
 
-     private function dispatchEvent(object $event): void
 
-     {
 
-         if (null === $this->eventDispatcher) {
 
-             return;
 
-         }
 
-         $this->eventDispatcher->dispatch($event);
 
-     }
 
- }
 
 
  |