FailedMessagesRetryCommand.php 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. <?php
  2. /*
  3. * This file is part of the Symfony package.
  4. *
  5. * (c) Fabien Potencier <fabien@symfony.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Symfony\Component\Messenger\Command;
  11. use Psr\Log\LoggerInterface;
  12. use Symfony\Component\Console\Exception\RuntimeException;
  13. use Symfony\Component\Console\Input\InputArgument;
  14. use Symfony\Component\Console\Input\InputInterface;
  15. use Symfony\Component\Console\Input\InputOption;
  16. use Symfony\Component\Console\Output\ConsoleOutputInterface;
  17. use Symfony\Component\Console\Output\OutputInterface;
  18. use Symfony\Component\Console\Style\SymfonyStyle;
  19. use Symfony\Component\EventDispatcher\EventDispatcherInterface;
  20. use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
  21. use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener;
  22. use Symfony\Component\Messenger\Exception\LogicException;
  23. use Symfony\Component\Messenger\MessageBusInterface;
  24. use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
  25. use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
  26. use Symfony\Component\Messenger\Transport\Receiver\SingleMessageReceiver;
  27. use Symfony\Component\Messenger\Worker;
  28. /**
  29. * @author Ryan Weaver <ryan@symfonycasts.com>
  30. */
  31. class FailedMessagesRetryCommand extends AbstractFailedMessagesCommand
  32. {
  33. protected static $defaultName = 'messenger:failed:retry';
  34. protected static $defaultDescription = 'Retry one or more messages from the failure transport';
  35. private $eventDispatcher;
  36. private $messageBus;
  37. private $logger;
  38. public function __construct(?string $globalReceiverName, $failureTransports, MessageBusInterface $messageBus, EventDispatcherInterface $eventDispatcher, LoggerInterface $logger = null)
  39. {
  40. $this->eventDispatcher = $eventDispatcher;
  41. $this->messageBus = $messageBus;
  42. $this->logger = $logger;
  43. parent::__construct($globalReceiverName, $failureTransports);
  44. }
  45. /**
  46. * {@inheritdoc}
  47. */
  48. protected function configure(): void
  49. {
  50. $this
  51. ->setDefinition([
  52. new InputArgument('id', InputArgument::IS_ARRAY, 'Specific message id(s) to retry'),
  53. new InputOption('force', null, InputOption::VALUE_NONE, 'Force action without confirmation'),
  54. new InputOption('transport', null, InputOption::VALUE_OPTIONAL, 'Use a specific failure transport', self::DEFAULT_TRANSPORT_OPTION),
  55. ])
  56. ->setDescription(self::$defaultDescription)
  57. ->setHelp(<<<'EOF'
  58. The <info>%command.name%</info> retries message in the failure transport.
  59. <info>php %command.full_name%</info>
  60. The command will interactively ask if each message should be retried
  61. or discarded.
  62. Some transports support retrying a specific message id, which comes
  63. from the <info>messenger:failed:show</info> command.
  64. <info>php %command.full_name% {id}</info>
  65. Or pass multiple ids at once to process multiple messages:
  66. <info>php %command.full_name% {id1} {id2} {id3}</info>
  67. EOF
  68. )
  69. ;
  70. }
  71. /**
  72. * {@inheritdoc}
  73. */
  74. protected function execute(InputInterface $input, OutputInterface $output)
  75. {
  76. $this->eventDispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(1));
  77. $io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
  78. $io->comment('Quit this command with CONTROL-C.');
  79. if (!$output->isVeryVerbose()) {
  80. $io->comment('Re-run the command with a -vv option to see logs about consumed messages.');
  81. }
  82. $failureTransportName = $input->getOption('transport');
  83. if (self::DEFAULT_TRANSPORT_OPTION === $failureTransportName) {
  84. $this->printWarningAvailableFailureTransports($io, $this->getGlobalFailureReceiverName());
  85. }
  86. if ('' === $failureTransportName || null === $failureTransportName) {
  87. $failureTransportName = $this->interactiveChooseFailureTransport($io);
  88. }
  89. $failureTransportName = self::DEFAULT_TRANSPORT_OPTION === $failureTransportName ? $this->getGlobalFailureReceiverName() : $failureTransportName;
  90. $receiver = $this->getReceiver($failureTransportName);
  91. $this->printPendingMessagesMessage($receiver, $io);
  92. $io->writeln(sprintf('To retry all the messages, run <comment>messenger:consume %s</comment>', $failureTransportName));
  93. $shouldForce = $input->getOption('force');
  94. $ids = $input->getArgument('id');
  95. if (0 === \count($ids)) {
  96. if (!$input->isInteractive()) {
  97. throw new RuntimeException('Message id must be passed when in non-interactive mode.');
  98. }
  99. $this->runInteractive($failureTransportName, $io, $shouldForce);
  100. return 0;
  101. }
  102. $this->retrySpecificIds($failureTransportName, $ids, $io, $shouldForce);
  103. $io->success('All done!');
  104. return 0;
  105. }
  106. private function runInteractive(string $failureTransportName, SymfonyStyle $io, bool $shouldForce)
  107. {
  108. $receiver = $this->failureTransports->get($failureTransportName);
  109. $count = 0;
  110. if ($receiver instanceof ListableReceiverInterface) {
  111. // for listable receivers, find the messages one-by-one
  112. // this avoids using get(), which for some less-robust
  113. // transports (like Doctrine), will cause the message
  114. // to be temporarily "acked", even if the user aborts
  115. // handling the message
  116. while (true) {
  117. $ids = [];
  118. foreach ($receiver->all(1) as $envelope) {
  119. ++$count;
  120. $id = $this->getMessageId($envelope);
  121. if (null === $id) {
  122. throw new LogicException(sprintf('The "%s" receiver is able to list messages by id but the envelope is missing the TransportMessageIdStamp stamp.', $failureTransportName));
  123. }
  124. $ids[] = $id;
  125. }
  126. // break the loop if all messages are consumed
  127. if (0 === \count($ids)) {
  128. break;
  129. }
  130. $this->retrySpecificIds($failureTransportName, $ids, $io, $shouldForce);
  131. }
  132. } else {
  133. // get() and ask messages one-by-one
  134. $count = $this->runWorker($failureTransportName, $receiver, $io, $shouldForce);
  135. }
  136. // avoid success message if nothing was processed
  137. if (1 <= $count) {
  138. $io->success('All failed messages have been handled or removed!');
  139. }
  140. }
  141. private function runWorker(string $failureTransportName, ReceiverInterface $receiver, SymfonyStyle $io, bool $shouldForce): int
  142. {
  143. $count = 0;
  144. $listener = function (WorkerMessageReceivedEvent $messageReceivedEvent) use ($io, $receiver, $shouldForce, &$count) {
  145. ++$count;
  146. $envelope = $messageReceivedEvent->getEnvelope();
  147. $this->displaySingleMessage($envelope, $io);
  148. $shouldHandle = $shouldForce || $io->confirm('Do you want to retry (yes) or delete this message (no)?');
  149. if ($shouldHandle) {
  150. return;
  151. }
  152. $messageReceivedEvent->shouldHandle(false);
  153. $receiver->reject($envelope);
  154. };
  155. $this->eventDispatcher->addListener(WorkerMessageReceivedEvent::class, $listener);
  156. $worker = new Worker(
  157. [$failureTransportName => $receiver],
  158. $this->messageBus,
  159. $this->eventDispatcher,
  160. $this->logger
  161. );
  162. try {
  163. $worker->run();
  164. } finally {
  165. $this->eventDispatcher->removeListener(WorkerMessageReceivedEvent::class, $listener);
  166. }
  167. return $count;
  168. }
  169. private function retrySpecificIds(string $failureTransportName, array $ids, SymfonyStyle $io, bool $shouldForce)
  170. {
  171. $receiver = $this->getReceiver($failureTransportName);
  172. if (!$receiver instanceof ListableReceiverInterface) {
  173. throw new RuntimeException(sprintf('The "%s" receiver does not support retrying messages by id.', $failureTransportName));
  174. }
  175. foreach ($ids as $id) {
  176. $envelope = $receiver->find($id);
  177. if (null === $envelope) {
  178. throw new RuntimeException(sprintf('The message "%s" was not found.', $id));
  179. }
  180. $singleReceiver = new SingleMessageReceiver($receiver, $envelope);
  181. $this->runWorker($failureTransportName, $singleReceiver, $io, $shouldForce);
  182. }
  183. }
  184. }