AbstractFailedMessagesCommand.php 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298
  1. <?php
  2. /*
  3. * This file is part of the Symfony package.
  4. *
  5. * (c) Fabien Potencier <fabien@symfony.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Symfony\Component\Messenger\Command;
  11. use Symfony\Component\Console\Command\Command;
  12. use Symfony\Component\Console\Completion\CompletionInput;
  13. use Symfony\Component\Console\Completion\CompletionSuggestions;
  14. use Symfony\Component\Console\Helper\Dumper;
  15. use Symfony\Component\Console\Question\ChoiceQuestion;
  16. use Symfony\Component\Console\Style\SymfonyStyle;
  17. use Symfony\Component\DependencyInjection\ServiceLocator;
  18. use Symfony\Component\ErrorHandler\Exception\FlattenException;
  19. use Symfony\Component\Messenger\Envelope;
  20. use Symfony\Component\Messenger\Exception\InvalidArgumentException;
  21. use Symfony\Component\Messenger\Stamp\ErrorDetailsStamp;
  22. use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
  23. use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
  24. use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
  25. use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
  26. use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
  27. use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
  28. use Symfony\Component\VarDumper\Caster\Caster;
  29. use Symfony\Component\VarDumper\Caster\TraceStub;
  30. use Symfony\Component\VarDumper\Cloner\ClonerInterface;
  31. use Symfony\Component\VarDumper\Cloner\Stub;
  32. use Symfony\Component\VarDumper\Cloner\VarCloner;
  33. use Symfony\Contracts\Service\ServiceProviderInterface;
  34. /**
  35. * @author Ryan Weaver <ryan@symfonycasts.com>
  36. *
  37. * @internal
  38. */
  39. abstract class AbstractFailedMessagesCommand extends Command
  40. {
  41. protected const DEFAULT_TRANSPORT_OPTION = 'choose';
  42. protected $failureTransports;
  43. private $globalFailureReceiverName;
  44. /**
  45. * @param ServiceProviderInterface $failureTransports
  46. */
  47. public function __construct(?string $globalFailureReceiverName, $failureTransports)
  48. {
  49. $this->failureTransports = $failureTransports;
  50. if (!$failureTransports instanceof ServiceProviderInterface) {
  51. trigger_deprecation('symfony/messenger', '5.3', 'Passing a receiver as 2nd argument to "%s()" is deprecated, pass a service locator instead.', __METHOD__);
  52. if (null === $globalFailureReceiverName) {
  53. throw new InvalidArgumentException(sprintf('The argument "globalFailureReceiver" from method "%s()" must be not null if 2nd argument is not a ServiceLocator.', __METHOD__));
  54. }
  55. $this->failureTransports = new ServiceLocator([$globalFailureReceiverName => static function () use ($failureTransports) { return $failureTransports; }]);
  56. }
  57. $this->globalFailureReceiverName = $globalFailureReceiverName;
  58. parent::__construct();
  59. }
  60. protected function getReceiverName(): string
  61. {
  62. trigger_deprecation('symfony/messenger', '5.3', 'The method "%s()" is deprecated, use getGlobalFailureReceiverName() instead.', __METHOD__);
  63. return $this->globalFailureReceiverName;
  64. }
  65. protected function getGlobalFailureReceiverName(): ?string
  66. {
  67. return $this->globalFailureReceiverName;
  68. }
  69. /**
  70. * @return mixed
  71. */
  72. protected function getMessageId(Envelope $envelope)
  73. {
  74. /** @var TransportMessageIdStamp $stamp */
  75. $stamp = $envelope->last(TransportMessageIdStamp::class);
  76. return null !== $stamp ? $stamp->getId() : null;
  77. }
  78. protected function displaySingleMessage(Envelope $envelope, SymfonyStyle $io)
  79. {
  80. $io->title('Failed Message Details');
  81. /** @var SentToFailureTransportStamp|null $sentToFailureTransportStamp */
  82. $sentToFailureTransportStamp = $envelope->last(SentToFailureTransportStamp::class);
  83. /** @var RedeliveryStamp|null $lastRedeliveryStamp */
  84. $lastRedeliveryStamp = $envelope->last(RedeliveryStamp::class);
  85. /** @var ErrorDetailsStamp|null $lastErrorDetailsStamp */
  86. $lastErrorDetailsStamp = $envelope->last(ErrorDetailsStamp::class);
  87. $lastRedeliveryStampWithException = $this->getLastRedeliveryStampWithException($envelope, true);
  88. $rows = [
  89. ['Class', \get_class($envelope->getMessage())],
  90. ];
  91. if (null !== $id = $this->getMessageId($envelope)) {
  92. $rows[] = ['Message Id', $id];
  93. }
  94. if (null === $sentToFailureTransportStamp) {
  95. $io->warning('Message does not appear to have been sent to this transport after failing');
  96. } else {
  97. $failedAt = '';
  98. $errorMessage = '';
  99. $errorCode = '';
  100. $errorClass = '(unknown)';
  101. if (null !== $lastRedeliveryStamp) {
  102. $failedAt = $lastRedeliveryStamp->getRedeliveredAt()->format('Y-m-d H:i:s');
  103. }
  104. if (null !== $lastErrorDetailsStamp) {
  105. $errorMessage = $lastErrorDetailsStamp->getExceptionMessage();
  106. $errorCode = $lastErrorDetailsStamp->getExceptionCode();
  107. $errorClass = $lastErrorDetailsStamp->getExceptionClass();
  108. } elseif (null !== $lastRedeliveryStampWithException) {
  109. // Try reading the errorMessage for messages that are still in the queue without the new ErrorDetailStamps.
  110. $errorMessage = $lastRedeliveryStampWithException->getExceptionMessage();
  111. if (null !== $lastRedeliveryStampWithException->getFlattenException()) {
  112. $errorClass = $lastRedeliveryStampWithException->getFlattenException()->getClass();
  113. }
  114. }
  115. $rows = array_merge($rows, [
  116. ['Failed at', $failedAt],
  117. ['Error', $errorMessage],
  118. ['Error Code', $errorCode],
  119. ['Error Class', $errorClass],
  120. ['Transport', $sentToFailureTransportStamp->getOriginalReceiverName()],
  121. ]);
  122. }
  123. $io->table([], $rows);
  124. /** @var RedeliveryStamp[] $redeliveryStamps */
  125. $redeliveryStamps = $envelope->all(RedeliveryStamp::class);
  126. $io->writeln(' Message history:');
  127. foreach ($redeliveryStamps as $redeliveryStamp) {
  128. $io->writeln(sprintf(' * Message failed at <info>%s</info> and was redelivered', $redeliveryStamp->getRedeliveredAt()->format('Y-m-d H:i:s')));
  129. }
  130. $io->newLine();
  131. if ($io->isVeryVerbose()) {
  132. $io->title('Message:');
  133. $dump = new Dumper($io, null, $this->createCloner());
  134. $io->writeln($dump($envelope->getMessage()));
  135. $io->title('Exception:');
  136. $flattenException = null;
  137. if (null !== $lastErrorDetailsStamp) {
  138. $flattenException = $lastErrorDetailsStamp->getFlattenException();
  139. } elseif (null !== $lastRedeliveryStampWithException) {
  140. $flattenException = $lastRedeliveryStampWithException->getFlattenException();
  141. }
  142. $io->writeln(null === $flattenException ? '(no data)' : $dump($flattenException));
  143. } else {
  144. $io->writeln(' Re-run command with <info>-vv</info> to see more message & error details.');
  145. }
  146. }
  147. protected function printPendingMessagesMessage(ReceiverInterface $receiver, SymfonyStyle $io)
  148. {
  149. if ($receiver instanceof MessageCountAwareInterface) {
  150. if (1 === $receiver->getMessageCount()) {
  151. $io->writeln('There is <comment>1</comment> message pending in the failure transport.');
  152. } else {
  153. $io->writeln(sprintf('There are <comment>%d</comment> messages pending in the failure transport.', $receiver->getMessageCount()));
  154. }
  155. }
  156. }
  157. /**
  158. * @param string|null $name
  159. */
  160. protected function getReceiver(/* string $name = null */): ReceiverInterface
  161. {
  162. if (1 > \func_num_args() && __CLASS__ !== static::class && __CLASS__ !== (new \ReflectionMethod($this, __FUNCTION__))->getDeclaringClass()->getName() && !$this instanceof \PHPUnit\Framework\MockObject\MockObject && !$this instanceof \Prophecy\Prophecy\ProphecySubjectInterface && !$this instanceof \Mockery\MockInterface) {
  163. trigger_deprecation('symfony/messenger', '5.3', 'The "%s()" method will have a new "string $name" argument in version 6.0, not defining it is deprecated.', __METHOD__);
  164. }
  165. $name = \func_num_args() > 0 ? func_get_arg(0) : null;
  166. if (null === $name = $name ?? $this->globalFailureReceiverName) {
  167. throw new InvalidArgumentException(sprintf('No default failure transport is defined. Available transports are: "%s".', implode('", "', array_keys($this->failureTransports->getProvidedServices()))));
  168. }
  169. if (!$this->failureTransports->has($name)) {
  170. throw new InvalidArgumentException(sprintf('The "%s" failure transport was not found. Available transports are: "%s".', $name, implode('", "', array_keys($this->failureTransports->getProvidedServices()))));
  171. }
  172. return $this->failureTransports->get($name);
  173. }
  174. protected function getLastRedeliveryStampWithException(Envelope $envelope): ?RedeliveryStamp
  175. {
  176. if (null === \func_get_args()[1]) {
  177. trigger_deprecation('symfony/messenger', '5.2', sprintf('Using the "getLastRedeliveryStampWithException" method in the "%s" class is deprecated, use the "Envelope::last(%s)" instead.', self::class, ErrorDetailsStamp::class));
  178. }
  179. // Use ErrorDetailsStamp instead if it is available
  180. if (null !== $envelope->last(ErrorDetailsStamp::class)) {
  181. return null;
  182. }
  183. /** @var RedeliveryStamp $stamp */
  184. foreach (array_reverse($envelope->all(RedeliveryStamp::class)) as $stamp) {
  185. if (null !== $stamp->getExceptionMessage()) {
  186. return $stamp;
  187. }
  188. }
  189. return null;
  190. }
  191. private function createCloner(): ?ClonerInterface
  192. {
  193. if (!class_exists(VarCloner::class)) {
  194. return null;
  195. }
  196. $cloner = new VarCloner();
  197. $cloner->addCasters([FlattenException::class => function (FlattenException $flattenException, array $a, Stub $stub): array {
  198. $stub->class = $flattenException->getClass();
  199. return [
  200. Caster::PREFIX_VIRTUAL.'message' => $flattenException->getMessage(),
  201. Caster::PREFIX_VIRTUAL.'code' => $flattenException->getCode(),
  202. Caster::PREFIX_VIRTUAL.'file' => $flattenException->getFile(),
  203. Caster::PREFIX_VIRTUAL.'line' => $flattenException->getLine(),
  204. Caster::PREFIX_VIRTUAL.'trace' => new TraceStub($flattenException->getTrace()),
  205. ];
  206. }]);
  207. return $cloner;
  208. }
  209. protected function printWarningAvailableFailureTransports(SymfonyStyle $io, ?string $failureTransportName): void
  210. {
  211. $failureTransports = array_keys($this->failureTransports->getProvidedServices());
  212. $failureTransportsCount = \count($failureTransports);
  213. if ($failureTransportsCount > 1) {
  214. $io->writeln([
  215. sprintf('> Loading messages from the <comment>global</comment> failure transport <comment>%s</comment>.', $failureTransportName),
  216. '> To use a different failure transport, pass <comment>--transport=</comment>.',
  217. sprintf('> Available failure transports are: <comment>%s</comment>', implode(', ', $failureTransports)),
  218. "\n",
  219. ]);
  220. }
  221. }
  222. protected function interactiveChooseFailureTransport(SymfonyStyle $io)
  223. {
  224. $failedTransports = array_keys($this->failureTransports->getProvidedServices());
  225. $question = new ChoiceQuestion('Select failed transport:', $failedTransports, 0);
  226. $question->setMultiselect(false);
  227. return $io->askQuestion($question);
  228. }
  229. public function complete(CompletionInput $input, CompletionSuggestions $suggestions): void
  230. {
  231. if ($input->mustSuggestOptionValuesFor('transport')) {
  232. $suggestions->suggestValues(array_keys($this->failureTransports->getProvidedServices()));
  233. return;
  234. }
  235. if ($input->mustSuggestArgumentValuesFor('id')) {
  236. $transport = $input->getOption('transport');
  237. $transport = self::DEFAULT_TRANSPORT_OPTION === $transport ? $this->getGlobalFailureReceiverName() : $transport;
  238. $receiver = $this->getReceiver($transport);
  239. if (!$receiver instanceof ListableReceiverInterface) {
  240. return;
  241. }
  242. $ids = [];
  243. foreach ($receiver->all(50) as $envelope) {
  244. $ids[] = $this->getMessageId($envelope);
  245. }
  246. $suggestions->suggestValues($ids);
  247. return;
  248. }
  249. }
  250. }