HandleMessageMiddleware.php 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. <?php
  2. /*
  3. * This file is part of the Symfony package.
  4. *
  5. * (c) Fabien Potencier <fabien@symfony.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Symfony\Component\Messenger\Middleware;
  11. use Psr\Log\LoggerAwareTrait;
  12. use Psr\Log\NullLogger;
  13. use Symfony\Component\Messenger\Envelope;
  14. use Symfony\Component\Messenger\Exception\HandlerFailedException;
  15. use Symfony\Component\Messenger\Exception\LogicException;
  16. use Symfony\Component\Messenger\Exception\NoHandlerForMessageException;
  17. use Symfony\Component\Messenger\Handler\Acknowledger;
  18. use Symfony\Component\Messenger\Handler\HandlerDescriptor;
  19. use Symfony\Component\Messenger\Handler\HandlersLocatorInterface;
  20. use Symfony\Component\Messenger\Stamp\AckStamp;
  21. use Symfony\Component\Messenger\Stamp\FlushBatchHandlersStamp;
  22. use Symfony\Component\Messenger\Stamp\HandledStamp;
  23. use Symfony\Component\Messenger\Stamp\NoAutoAckStamp;
  24. /**
  25. * @author Samuel Roze <samuel.roze@gmail.com>
  26. */
  27. class HandleMessageMiddleware implements MiddlewareInterface
  28. {
  29. use LoggerAwareTrait;
  30. private $handlersLocator;
  31. private $allowNoHandlers;
  32. public function __construct(HandlersLocatorInterface $handlersLocator, bool $allowNoHandlers = false)
  33. {
  34. $this->handlersLocator = $handlersLocator;
  35. $this->allowNoHandlers = $allowNoHandlers;
  36. $this->logger = new NullLogger();
  37. }
  38. /**
  39. * {@inheritdoc}
  40. *
  41. * @throws NoHandlerForMessageException When no handler is found and $allowNoHandlers is false
  42. */
  43. public function handle(Envelope $envelope, StackInterface $stack): Envelope
  44. {
  45. $handler = null;
  46. $message = $envelope->getMessage();
  47. $context = [
  48. 'class' => \get_class($message),
  49. ];
  50. $exceptions = [];
  51. foreach ($this->handlersLocator->getHandlers($envelope) as $handlerDescriptor) {
  52. if ($this->messageHasAlreadyBeenHandled($envelope, $handlerDescriptor)) {
  53. continue;
  54. }
  55. try {
  56. $handler = $handlerDescriptor->getHandler();
  57. $batchHandler = $handlerDescriptor->getBatchHandler();
  58. /** @var AckStamp $ackStamp */
  59. if ($batchHandler && $ackStamp = $envelope->last(AckStamp::class)) {
  60. $ack = new Acknowledger(get_debug_type($batchHandler), static function (\Throwable $e = null, $result = null) use ($envelope, $ackStamp, $handlerDescriptor) {
  61. if (null !== $e) {
  62. $e = new HandlerFailedException($envelope, [$e]);
  63. } else {
  64. $envelope = $envelope->with(HandledStamp::fromDescriptor($handlerDescriptor, $result));
  65. }
  66. $ackStamp->ack($envelope, $e);
  67. });
  68. $result = $handler($message, $ack);
  69. if (!\is_int($result) || 0 > $result) {
  70. throw new LogicException(sprintf('A handler implementing BatchHandlerInterface must return the size of the current batch as a positive integer, "%s" returned from "%s".', \is_int($result) ? $result : get_debug_type($result), get_debug_type($batchHandler)));
  71. }
  72. if (!$ack->isAcknowledged()) {
  73. $envelope = $envelope->with(new NoAutoAckStamp($handlerDescriptor));
  74. } elseif ($ack->getError()) {
  75. throw $ack->getError();
  76. } else {
  77. $result = $ack->getResult();
  78. }
  79. } else {
  80. $result = $handler($message);
  81. }
  82. $handledStamp = HandledStamp::fromDescriptor($handlerDescriptor, $result);
  83. $envelope = $envelope->with($handledStamp);
  84. $this->logger->info('Message {class} handled by {handler}', $context + ['handler' => $handledStamp->getHandlerName()]);
  85. } catch (\Throwable $e) {
  86. $exceptions[] = $e;
  87. }
  88. }
  89. /** @var FlushBatchHandlersStamp $flushStamp */
  90. if ($flushStamp = $envelope->last(FlushBatchHandlersStamp::class)) {
  91. /** @var NoAutoAckStamp $stamp */
  92. foreach ($envelope->all(NoAutoAckStamp::class) as $stamp) {
  93. try {
  94. $handler = $stamp->getHandlerDescriptor()->getBatchHandler();
  95. $handler->flush($flushStamp->force());
  96. } catch (\Throwable $e) {
  97. $exceptions[] = $e;
  98. }
  99. }
  100. }
  101. if (null === $handler) {
  102. if (!$this->allowNoHandlers) {
  103. throw new NoHandlerForMessageException(sprintf('No handler for message "%s".', $context['class']));
  104. }
  105. $this->logger->info('No handler for message {class}', $context);
  106. }
  107. if (\count($exceptions)) {
  108. throw new HandlerFailedException($envelope, $exceptions);
  109. }
  110. return $stack->next()->handle($envelope, $stack);
  111. }
  112. private function messageHasAlreadyBeenHandled(Envelope $envelope, HandlerDescriptor $handlerDescriptor): bool
  113. {
  114. /** @var HandledStamp $stamp */
  115. foreach ($envelope->all(HandledStamp::class) as $stamp) {
  116. if ($stamp->getHandlerName() === $handlerDescriptor->getName()) {
  117. return true;
  118. }
  119. }
  120. return false;
  121. }
  122. }