ConsumeMessagesCommand.php 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267
  1. <?php
  2. /*
  3. * This file is part of the Symfony package.
  4. *
  5. * (c) Fabien Potencier <fabien@symfony.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Symfony\Component\Messenger\Command;
  11. use Psr\Container\ContainerInterface;
  12. use Psr\Log\LoggerInterface;
  13. use Symfony\Component\Console\Command\Command;
  14. use Symfony\Component\Console\Completion\CompletionInput;
  15. use Symfony\Component\Console\Completion\CompletionSuggestions;
  16. use Symfony\Component\Console\Exception\RuntimeException;
  17. use Symfony\Component\Console\Input\InputArgument;
  18. use Symfony\Component\Console\Input\InputInterface;
  19. use Symfony\Component\Console\Input\InputOption;
  20. use Symfony\Component\Console\Output\ConsoleOutputInterface;
  21. use Symfony\Component\Console\Output\OutputInterface;
  22. use Symfony\Component\Console\Question\ChoiceQuestion;
  23. use Symfony\Component\Console\Style\SymfonyStyle;
  24. use Symfony\Component\EventDispatcher\EventDispatcherInterface;
  25. use Symfony\Component\Messenger\EventListener\ResetServicesListener;
  26. use Symfony\Component\Messenger\EventListener\StopWorkerOnFailureLimitListener;
  27. use Symfony\Component\Messenger\EventListener\StopWorkerOnMemoryLimitListener;
  28. use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener;
  29. use Symfony\Component\Messenger\EventListener\StopWorkerOnTimeLimitListener;
  30. use Symfony\Component\Messenger\RoutableMessageBus;
  31. use Symfony\Component\Messenger\Worker;
  32. /**
  33. * @author Samuel Roze <samuel.roze@gmail.com>
  34. */
  35. class ConsumeMessagesCommand extends Command
  36. {
  37. protected static $defaultName = 'messenger:consume';
  38. protected static $defaultDescription = 'Consume messages';
  39. private $routableBus;
  40. private $receiverLocator;
  41. private $eventDispatcher;
  42. private $logger;
  43. private $receiverNames;
  44. private $resetServicesListener;
  45. private $busIds;
  46. public function __construct(RoutableMessageBus $routableBus, ContainerInterface $receiverLocator, EventDispatcherInterface $eventDispatcher, LoggerInterface $logger = null, array $receiverNames = [], ResetServicesListener $resetServicesListener = null, array $busIds = [])
  47. {
  48. $this->routableBus = $routableBus;
  49. $this->receiverLocator = $receiverLocator;
  50. $this->eventDispatcher = $eventDispatcher;
  51. $this->logger = $logger;
  52. $this->receiverNames = $receiverNames;
  53. $this->resetServicesListener = $resetServicesListener;
  54. $this->busIds = $busIds;
  55. parent::__construct();
  56. }
  57. /**
  58. * {@inheritdoc}
  59. */
  60. protected function configure(): void
  61. {
  62. $defaultReceiverName = 1 === \count($this->receiverNames) ? current($this->receiverNames) : null;
  63. $this
  64. ->setDefinition([
  65. new InputArgument('receivers', InputArgument::IS_ARRAY, 'Names of the receivers/transports to consume in order of priority', $defaultReceiverName ? [$defaultReceiverName] : []),
  66. new InputOption('limit', 'l', InputOption::VALUE_REQUIRED, 'Limit the number of received messages'),
  67. new InputOption('failure-limit', 'f', InputOption::VALUE_REQUIRED, 'The number of failed messages the worker can consume'),
  68. new InputOption('memory-limit', 'm', InputOption::VALUE_REQUIRED, 'The memory limit the worker can consume'),
  69. new InputOption('time-limit', 't', InputOption::VALUE_REQUIRED, 'The time limit in seconds the worker can handle new messages'),
  70. new InputOption('sleep', null, InputOption::VALUE_REQUIRED, 'Seconds to sleep before asking for new messages after no messages were found', 1),
  71. new InputOption('bus', 'b', InputOption::VALUE_REQUIRED, 'Name of the bus to which received messages should be dispatched (if not passed, bus is determined automatically)'),
  72. new InputOption('queues', null, InputOption::VALUE_REQUIRED | InputOption::VALUE_IS_ARRAY, 'Limit receivers to only consume from the specified queues'),
  73. new InputOption('no-reset', null, InputOption::VALUE_NONE, 'Do not reset container services after each message'),
  74. ])
  75. ->setDescription(self::$defaultDescription)
  76. ->setHelp(<<<'EOF'
  77. The <info>%command.name%</info> command consumes messages and dispatches them to the message bus.
  78. <info>php %command.full_name% <receiver-name></info>
  79. To receive from multiple transports, pass each name:
  80. <info>php %command.full_name% receiver1 receiver2</info>
  81. Use the --limit option to limit the number of messages received:
  82. <info>php %command.full_name% <receiver-name> --limit=10</info>
  83. Use the --failure-limit option to stop the worker when the given number of failed messages is reached:
  84. <info>php %command.full_name% <receiver-name> --failure-limit=2</info>
  85. Use the --memory-limit option to stop the worker if it exceeds a given memory usage limit. You can use shorthand byte values [K, M or G]:
  86. <info>php %command.full_name% <receiver-name> --memory-limit=128M</info>
  87. Use the --time-limit option to stop the worker when the given time limit (in seconds) is reached.
  88. If a message is being handled, the worker will stop after the processing is finished:
  89. <info>php %command.full_name% <receiver-name> --time-limit=3600</info>
  90. Use the --bus option to specify the message bus to dispatch received messages
  91. to instead of trying to determine it automatically. This is required if the
  92. messages didn't originate from Messenger:
  93. <info>php %command.full_name% <receiver-name> --bus=event_bus</info>
  94. Use the --queues option to limit a receiver to only certain queues (only supported by some receivers):
  95. <info>php %command.full_name% <receiver-name> --queues=fasttrack</info>
  96. Use the --no-reset option to prevent services resetting after each message (may lead to leaking services' state between messages):
  97. <info>php %command.full_name% <receiver-name> --no-reset</info>
  98. EOF
  99. )
  100. ;
  101. }
  102. /**
  103. * {@inheritdoc}
  104. */
  105. protected function interact(InputInterface $input, OutputInterface $output)
  106. {
  107. $io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
  108. if ($this->receiverNames && 0 === \count($input->getArgument('receivers'))) {
  109. $io->block('Which transports/receivers do you want to consume?', null, 'fg=white;bg=blue', ' ', true);
  110. $io->writeln('Choose which receivers you want to consume messages from in order of priority.');
  111. if (\count($this->receiverNames) > 1) {
  112. $io->writeln(sprintf('Hint: to consume from multiple, use a list of their names, e.g. <comment>%s</comment>', implode(', ', $this->receiverNames)));
  113. }
  114. $question = new ChoiceQuestion('Select receivers to consume:', $this->receiverNames, 0);
  115. $question->setMultiselect(true);
  116. $input->setArgument('receivers', $io->askQuestion($question));
  117. }
  118. if (0 === \count($input->getArgument('receivers'))) {
  119. throw new RuntimeException('Please pass at least one receiver.');
  120. }
  121. }
  122. /**
  123. * {@inheritdoc}
  124. */
  125. protected function execute(InputInterface $input, OutputInterface $output)
  126. {
  127. $receivers = [];
  128. foreach ($receiverNames = $input->getArgument('receivers') as $receiverName) {
  129. if (!$this->receiverLocator->has($receiverName)) {
  130. $message = sprintf('The receiver "%s" does not exist.', $receiverName);
  131. if ($this->receiverNames) {
  132. $message .= sprintf(' Valid receivers are: %s.', implode(', ', $this->receiverNames));
  133. }
  134. throw new RuntimeException($message);
  135. }
  136. $receivers[$receiverName] = $this->receiverLocator->get($receiverName);
  137. }
  138. if (null !== $this->resetServicesListener && !$input->getOption('no-reset')) {
  139. $this->eventDispatcher->addSubscriber($this->resetServicesListener);
  140. }
  141. $stopsWhen = [];
  142. if ($limit = $input->getOption('limit')) {
  143. $stopsWhen[] = "processed {$limit} messages";
  144. $this->eventDispatcher->addSubscriber(new StopWorkerOnMessageLimitListener($limit, $this->logger));
  145. }
  146. if ($failureLimit = $input->getOption('failure-limit')) {
  147. $stopsWhen[] = "reached {$failureLimit} failed messages";
  148. $this->eventDispatcher->addSubscriber(new StopWorkerOnFailureLimitListener($failureLimit, $this->logger));
  149. }
  150. if ($memoryLimit = $input->getOption('memory-limit')) {
  151. $stopsWhen[] = "exceeded {$memoryLimit} of memory";
  152. $this->eventDispatcher->addSubscriber(new StopWorkerOnMemoryLimitListener($this->convertToBytes($memoryLimit), $this->logger));
  153. }
  154. if (null !== ($timeLimit = $input->getOption('time-limit'))) {
  155. $stopsWhen[] = "been running for {$timeLimit}s";
  156. $this->eventDispatcher->addSubscriber(new StopWorkerOnTimeLimitListener($timeLimit, $this->logger));
  157. }
  158. $stopsWhen[] = 'received a stop signal via the messenger:stop-workers command';
  159. $io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
  160. $io->success(sprintf('Consuming messages from transport%s "%s".', \count($receivers) > 0 ? 's' : '', implode(', ', $receiverNames)));
  161. if ($stopsWhen) {
  162. $last = array_pop($stopsWhen);
  163. $stopsWhen = ($stopsWhen ? implode(', ', $stopsWhen).' or ' : '').$last;
  164. $io->comment("The worker will automatically exit once it has {$stopsWhen}.");
  165. }
  166. $io->comment('Quit the worker with CONTROL-C.');
  167. if (OutputInterface::VERBOSITY_VERBOSE > $output->getVerbosity()) {
  168. $io->comment('Re-run the command with a -vv option to see logs about consumed messages.');
  169. }
  170. $bus = $input->getOption('bus') ? $this->routableBus->getMessageBus($input->getOption('bus')) : $this->routableBus;
  171. $worker = new Worker($receivers, $bus, $this->eventDispatcher, $this->logger);
  172. $options = [
  173. 'sleep' => $input->getOption('sleep') * 1000000,
  174. ];
  175. if ($queues = $input->getOption('queues')) {
  176. $options['queues'] = $queues;
  177. }
  178. $worker->run($options);
  179. return 0;
  180. }
  181. public function complete(CompletionInput $input, CompletionSuggestions $suggestions): void
  182. {
  183. if ($input->mustSuggestArgumentValuesFor('receivers')) {
  184. $suggestions->suggestValues(array_diff($this->receiverNames, array_diff($input->getArgument('receivers'), [$input->getCompletionValue()])));
  185. return;
  186. }
  187. if ($input->mustSuggestOptionValuesFor('bus')) {
  188. $suggestions->suggestValues($this->busIds);
  189. }
  190. }
  191. private function convertToBytes(string $memoryLimit): int
  192. {
  193. $memoryLimit = strtolower($memoryLimit);
  194. $max = ltrim($memoryLimit, '+');
  195. if (str_starts_with($max, '0x')) {
  196. $max = \intval($max, 16);
  197. } elseif (str_starts_with($max, '0')) {
  198. $max = \intval($max, 8);
  199. } else {
  200. $max = (int) $max;
  201. }
  202. switch (substr(rtrim($memoryLimit, 'b'), -1)) {
  203. case 't': $max *= 1024;
  204. // no break
  205. case 'g': $max *= 1024;
  206. // no break
  207. case 'm': $max *= 1024;
  208. // no break
  209. case 'k': $max *= 1024;
  210. }
  211. return $max;
  212. }
  213. }