Worker.php 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268
  1. <?php
  2. /*
  3. * This file is part of the Symfony package.
  4. *
  5. * (c) Fabien Potencier <fabien@symfony.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Symfony\Component\Messenger;
  11. use Psr\Log\LoggerInterface;
  12. use Symfony\Component\EventDispatcher\Event;
  13. use Symfony\Component\EventDispatcher\LegacyEventDispatcherProxy;
  14. use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
  15. use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent;
  16. use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
  17. use Symfony\Component\Messenger\Event\WorkerRunningEvent;
  18. use Symfony\Component\Messenger\Event\WorkerStartedEvent;
  19. use Symfony\Component\Messenger\Event\WorkerStoppedEvent;
  20. use Symfony\Component\Messenger\Exception\HandlerFailedException;
  21. use Symfony\Component\Messenger\Exception\RejectRedeliveredMessageException;
  22. use Symfony\Component\Messenger\Exception\RuntimeException;
  23. use Symfony\Component\Messenger\Stamp\AckStamp;
  24. use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp;
  25. use Symfony\Component\Messenger\Stamp\FlushBatchHandlersStamp;
  26. use Symfony\Component\Messenger\Stamp\NoAutoAckStamp;
  27. use Symfony\Component\Messenger\Stamp\ReceivedStamp;
  28. use Symfony\Component\Messenger\Transport\Receiver\QueueReceiverInterface;
  29. use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
  30. use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
  31. /**
  32. * @author Samuel Roze <samuel.roze@gmail.com>
  33. * @author Tobias Schultze <http://tobion.de>
  34. *
  35. * @final
  36. */
  37. class Worker
  38. {
  39. private $receivers;
  40. private $bus;
  41. private $eventDispatcher;
  42. private $logger;
  43. private $shouldStop = false;
  44. private $metadata;
  45. private $acks = [];
  46. private $unacks;
  47. /**
  48. * @param ReceiverInterface[] $receivers Where the key is the transport name
  49. */
  50. public function __construct(array $receivers, MessageBusInterface $bus, EventDispatcherInterface $eventDispatcher = null, LoggerInterface $logger = null)
  51. {
  52. $this->receivers = $receivers;
  53. $this->bus = $bus;
  54. $this->logger = $logger;
  55. $this->eventDispatcher = class_exists(Event::class) ? LegacyEventDispatcherProxy::decorate($eventDispatcher) : $eventDispatcher;
  56. $this->metadata = new WorkerMetadata([
  57. 'transportNames' => array_keys($receivers),
  58. ]);
  59. $this->unacks = new \SplObjectStorage();
  60. }
  61. /**
  62. * Receive the messages and dispatch them to the bus.
  63. *
  64. * Valid options are:
  65. * * sleep (default: 1000000): Time in microseconds to sleep after no messages are found
  66. * * queues: The queue names to consume from, instead of consuming from all queues. When this is used, all receivers must implement the QueueReceiverInterface
  67. */
  68. public function run(array $options = []): void
  69. {
  70. $options = array_merge([
  71. 'sleep' => 1000000,
  72. ], $options);
  73. $queueNames = $options['queues'] ?? null;
  74. $this->metadata->set(['queueNames' => $queueNames]);
  75. $this->dispatchEvent(new WorkerStartedEvent($this));
  76. if ($queueNames) {
  77. // if queue names are specified, all receivers must implement the QueueReceiverInterface
  78. foreach ($this->receivers as $transportName => $receiver) {
  79. if (!$receiver instanceof QueueReceiverInterface) {
  80. throw new RuntimeException(sprintf('Receiver for "%s" does not implement "%s".', $transportName, QueueReceiverInterface::class));
  81. }
  82. }
  83. }
  84. while (!$this->shouldStop) {
  85. $envelopeHandled = false;
  86. $envelopeHandledStart = microtime(true);
  87. foreach ($this->receivers as $transportName => $receiver) {
  88. if ($queueNames) {
  89. $envelopes = $receiver->getFromQueues($queueNames);
  90. } else {
  91. $envelopes = $receiver->get();
  92. }
  93. foreach ($envelopes as $envelope) {
  94. $envelopeHandled = true;
  95. $this->handleMessage($envelope, $transportName);
  96. $this->dispatchEvent(new WorkerRunningEvent($this, false));
  97. if ($this->shouldStop) {
  98. break 2;
  99. }
  100. }
  101. // after handling a single receiver, quit and start the loop again
  102. // this should prevent multiple lower priority receivers from
  103. // blocking too long before the higher priority are checked
  104. if ($envelopeHandled) {
  105. break;
  106. }
  107. }
  108. if (!$envelopeHandled && $this->flush(false)) {
  109. continue;
  110. }
  111. if (!$envelopeHandled) {
  112. $this->dispatchEvent(new WorkerRunningEvent($this, true));
  113. if (0 < $sleep = (int) ($options['sleep'] - 1e6 * (microtime(true) - $envelopeHandledStart))) {
  114. usleep($sleep);
  115. }
  116. }
  117. }
  118. $this->flush(true);
  119. $this->dispatchEvent(new WorkerStoppedEvent($this));
  120. }
  121. private function handleMessage(Envelope $envelope, string $transportName): void
  122. {
  123. $event = new WorkerMessageReceivedEvent($envelope, $transportName);
  124. $this->dispatchEvent($event);
  125. $envelope = $event->getEnvelope();
  126. if (!$event->shouldHandle()) {
  127. return;
  128. }
  129. $acked = false;
  130. $ack = function (Envelope $envelope, \Throwable $e = null) use ($transportName, &$acked) {
  131. $acked = true;
  132. $this->acks[] = [$transportName, $envelope, $e];
  133. };
  134. try {
  135. $e = null;
  136. $envelope = $this->bus->dispatch($envelope->with(new ReceivedStamp($transportName), new ConsumedByWorkerStamp(), new AckStamp($ack)));
  137. } catch (\Throwable $e) {
  138. }
  139. $noAutoAckStamp = $envelope->last(NoAutoAckStamp::class);
  140. if (!$acked && !$noAutoAckStamp) {
  141. $this->acks[] = [$transportName, $envelope, $e];
  142. } elseif ($noAutoAckStamp) {
  143. $this->unacks[$noAutoAckStamp->getHandlerDescriptor()->getBatchHandler()] = [$envelope->withoutAll(AckStamp::class), $transportName];
  144. }
  145. $this->ack();
  146. }
  147. private function ack(): bool
  148. {
  149. $acks = $this->acks;
  150. $this->acks = [];
  151. foreach ($acks as [$transportName, $envelope, $e]) {
  152. $receiver = $this->receivers[$transportName];
  153. if (null !== $e) {
  154. if ($rejectFirst = $e instanceof RejectRedeliveredMessageException) {
  155. // redelivered messages are rejected first so that continuous failures in an event listener or while
  156. // publishing for retry does not cause infinite redelivery loops
  157. $receiver->reject($envelope);
  158. }
  159. if ($e instanceof HandlerFailedException) {
  160. $envelope = $e->getEnvelope();
  161. }
  162. $failedEvent = new WorkerMessageFailedEvent($envelope, $transportName, $e);
  163. $this->dispatchEvent($failedEvent);
  164. $envelope = $failedEvent->getEnvelope();
  165. if (!$rejectFirst) {
  166. $receiver->reject($envelope);
  167. }
  168. continue;
  169. }
  170. $handledEvent = new WorkerMessageHandledEvent($envelope, $transportName);
  171. $this->dispatchEvent($handledEvent);
  172. $envelope = $handledEvent->getEnvelope();
  173. if (null !== $this->logger) {
  174. $message = $envelope->getMessage();
  175. $context = [
  176. 'class' => \get_class($message),
  177. ];
  178. $this->logger->info('{class} was handled successfully (acknowledging to transport).', $context);
  179. }
  180. $receiver->ack($envelope);
  181. }
  182. return (bool) $acks;
  183. }
  184. private function flush(bool $force): bool
  185. {
  186. $unacks = $this->unacks;
  187. if (!$unacks->count()) {
  188. return false;
  189. }
  190. $this->unacks = new \SplObjectStorage();
  191. foreach ($unacks as $batchHandler) {
  192. [$envelope, $transportName] = $unacks[$batchHandler];
  193. try {
  194. $this->bus->dispatch($envelope->with(new FlushBatchHandlersStamp($force)));
  195. $envelope = $envelope->withoutAll(NoAutoAckStamp::class);
  196. unset($unacks[$batchHandler], $batchHandler);
  197. } catch (\Throwable $e) {
  198. $this->acks[] = [$transportName, $envelope, $e];
  199. }
  200. }
  201. return $this->ack();
  202. }
  203. public function stop(): void
  204. {
  205. if (null !== $this->logger) {
  206. $this->logger->info('Stopping worker.', ['transport_names' => $this->metadata->getTransportNames()]);
  207. }
  208. $this->shouldStop = true;
  209. }
  210. public function getMetadata(): WorkerMetadata
  211. {
  212. return $this->metadata;
  213. }
  214. private function dispatchEvent(object $event): void
  215. {
  216. if (null === $this->eventDispatcher) {
  217. return;
  218. }
  219. $this->eventDispatcher->dispatch($event);
  220. }
  221. }