123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226 |
- <?php
- /*
- * This file is part of the Symfony package.
- *
- * (c) Fabien Potencier <fabien@symfony.com>
- *
- * For the full copyright and license information, please view the LICENSE
- * file that was distributed with this source code.
- */
- namespace Symfony\Component\Messenger\Command;
- use Psr\Log\LoggerInterface;
- use Symfony\Component\Console\Exception\RuntimeException;
- use Symfony\Component\Console\Input\InputArgument;
- use Symfony\Component\Console\Input\InputInterface;
- use Symfony\Component\Console\Input\InputOption;
- use Symfony\Component\Console\Output\ConsoleOutputInterface;
- use Symfony\Component\Console\Output\OutputInterface;
- use Symfony\Component\Console\Style\SymfonyStyle;
- use Symfony\Component\EventDispatcher\EventDispatcherInterface;
- use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
- use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener;
- use Symfony\Component\Messenger\Exception\LogicException;
- use Symfony\Component\Messenger\MessageBusInterface;
- use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
- use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
- use Symfony\Component\Messenger\Transport\Receiver\SingleMessageReceiver;
- use Symfony\Component\Messenger\Worker;
- /**
- * @author Ryan Weaver <ryan@symfonycasts.com>
- */
- class FailedMessagesRetryCommand extends AbstractFailedMessagesCommand
- {
- protected static $defaultName = 'messenger:failed:retry';
- protected static $defaultDescription = 'Retry one or more messages from the failure transport';
- private $eventDispatcher;
- private $messageBus;
- private $logger;
- public function __construct(?string $globalReceiverName, $failureTransports, MessageBusInterface $messageBus, EventDispatcherInterface $eventDispatcher, LoggerInterface $logger = null)
- {
- $this->eventDispatcher = $eventDispatcher;
- $this->messageBus = $messageBus;
- $this->logger = $logger;
- parent::__construct($globalReceiverName, $failureTransports);
- }
- /**
- * {@inheritdoc}
- */
- protected function configure(): void
- {
- $this
- ->setDefinition([
- new InputArgument('id', InputArgument::IS_ARRAY, 'Specific message id(s) to retry'),
- new InputOption('force', null, InputOption::VALUE_NONE, 'Force action without confirmation'),
- new InputOption('transport', null, InputOption::VALUE_OPTIONAL, 'Use a specific failure transport', self::DEFAULT_TRANSPORT_OPTION),
- ])
- ->setDescription(self::$defaultDescription)
- ->setHelp(<<<'EOF'
- The <info>%command.name%</info> retries message in the failure transport.
- <info>php %command.full_name%</info>
- The command will interactively ask if each message should be retried
- or discarded.
- Some transports support retrying a specific message id, which comes
- from the <info>messenger:failed:show</info> command.
- <info>php %command.full_name% {id}</info>
- Or pass multiple ids at once to process multiple messages:
- <info>php %command.full_name% {id1} {id2} {id3}</info>
- EOF
- )
- ;
- }
- /**
- * {@inheritdoc}
- */
- protected function execute(InputInterface $input, OutputInterface $output)
- {
- $this->eventDispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(1));
- $io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
- $io->comment('Quit this command with CONTROL-C.');
- if (!$output->isVeryVerbose()) {
- $io->comment('Re-run the command with a -vv option to see logs about consumed messages.');
- }
- $failureTransportName = $input->getOption('transport');
- if (self::DEFAULT_TRANSPORT_OPTION === $failureTransportName) {
- $this->printWarningAvailableFailureTransports($io, $this->getGlobalFailureReceiverName());
- }
- if ('' === $failureTransportName || null === $failureTransportName) {
- $failureTransportName = $this->interactiveChooseFailureTransport($io);
- }
- $failureTransportName = self::DEFAULT_TRANSPORT_OPTION === $failureTransportName ? $this->getGlobalFailureReceiverName() : $failureTransportName;
- $receiver = $this->getReceiver($failureTransportName);
- $this->printPendingMessagesMessage($receiver, $io);
- $io->writeln(sprintf('To retry all the messages, run <comment>messenger:consume %s</comment>', $failureTransportName));
- $shouldForce = $input->getOption('force');
- $ids = $input->getArgument('id');
- if (0 === \count($ids)) {
- if (!$input->isInteractive()) {
- throw new RuntimeException('Message id must be passed when in non-interactive mode.');
- }
- $this->runInteractive($failureTransportName, $io, $shouldForce);
- return 0;
- }
- $this->retrySpecificIds($failureTransportName, $ids, $io, $shouldForce);
- $io->success('All done!');
- return 0;
- }
- private function runInteractive(string $failureTransportName, SymfonyStyle $io, bool $shouldForce)
- {
- $receiver = $this->failureTransports->get($failureTransportName);
- $count = 0;
- if ($receiver instanceof ListableReceiverInterface) {
- // for listable receivers, find the messages one-by-one
- // this avoids using get(), which for some less-robust
- // transports (like Doctrine), will cause the message
- // to be temporarily "acked", even if the user aborts
- // handling the message
- while (true) {
- $ids = [];
- foreach ($receiver->all(1) as $envelope) {
- ++$count;
- $id = $this->getMessageId($envelope);
- if (null === $id) {
- throw new LogicException(sprintf('The "%s" receiver is able to list messages by id but the envelope is missing the TransportMessageIdStamp stamp.', $failureTransportName));
- }
- $ids[] = $id;
- }
- // break the loop if all messages are consumed
- if (0 === \count($ids)) {
- break;
- }
- $this->retrySpecificIds($failureTransportName, $ids, $io, $shouldForce);
- }
- } else {
- // get() and ask messages one-by-one
- $count = $this->runWorker($failureTransportName, $receiver, $io, $shouldForce);
- }
- // avoid success message if nothing was processed
- if (1 <= $count) {
- $io->success('All failed messages have been handled or removed!');
- }
- }
- private function runWorker(string $failureTransportName, ReceiverInterface $receiver, SymfonyStyle $io, bool $shouldForce): int
- {
- $count = 0;
- $listener = function (WorkerMessageReceivedEvent $messageReceivedEvent) use ($io, $receiver, $shouldForce, &$count) {
- ++$count;
- $envelope = $messageReceivedEvent->getEnvelope();
- $this->displaySingleMessage($envelope, $io);
- $shouldHandle = $shouldForce || $io->confirm('Do you want to retry (yes) or delete this message (no)?');
- if ($shouldHandle) {
- return;
- }
- $messageReceivedEvent->shouldHandle(false);
- $receiver->reject($envelope);
- };
- $this->eventDispatcher->addListener(WorkerMessageReceivedEvent::class, $listener);
- $worker = new Worker(
- [$failureTransportName => $receiver],
- $this->messageBus,
- $this->eventDispatcher,
- $this->logger
- );
- try {
- $worker->run();
- } finally {
- $this->eventDispatcher->removeListener(WorkerMessageReceivedEvent::class, $listener);
- }
- return $count;
- }
- private function retrySpecificIds(string $failureTransportName, array $ids, SymfonyStyle $io, bool $shouldForce)
- {
- $receiver = $this->getReceiver($failureTransportName);
- if (!$receiver instanceof ListableReceiverInterface) {
- throw new RuntimeException(sprintf('The "%s" receiver does not support retrying messages by id.', $failureTransportName));
- }
- foreach ($ids as $id) {
- $envelope = $receiver->find($id);
- if (null === $envelope) {
- throw new RuntimeException(sprintf('The message "%s" was not found.', $id));
- }
- $singleReceiver = new SingleMessageReceiver($receiver, $envelope);
- $this->runWorker($failureTransportName, $singleReceiver, $io, $shouldForce);
- }
- }
- }
|