BatchHandlerTrait.php 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. <?php
  2. /*
  3. * This file is part of the Symfony package.
  4. *
  5. * (c) Fabien Potencier <fabien@symfony.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Symfony\Component\Messenger\Handler;
  11. use Symfony\Component\Messenger\Exception\LogicException;
  12. /**
  13. * @author Nicolas Grekas <p@tchwork.com>
  14. */
  15. trait BatchHandlerTrait
  16. {
  17. private $jobs = [];
  18. /**
  19. * {@inheritdoc}
  20. */
  21. public function flush(bool $force): void
  22. {
  23. if ($jobs = $this->jobs) {
  24. $this->jobs = [];
  25. $this->process($jobs);
  26. }
  27. }
  28. /**
  29. * @param Acknowledger|null $ack The function to call to ack/nack the $message.
  30. * The message should be handled synchronously when null.
  31. *
  32. * @return mixed The number of pending messages in the batch if $ack is not null,
  33. * the result from handling the message otherwise
  34. */
  35. private function handle(object $message, ?Acknowledger $ack)
  36. {
  37. if (null === $ack) {
  38. $ack = new Acknowledger(get_debug_type($this));
  39. $this->jobs[] = [$message, $ack];
  40. $this->flush(true);
  41. return $ack->getResult();
  42. }
  43. $this->jobs[] = [$message, $ack];
  44. if (!$this->shouldFlush()) {
  45. return \count($this->jobs);
  46. }
  47. $this->flush(true);
  48. return 0;
  49. }
  50. private function shouldFlush(): bool
  51. {
  52. return 10 <= \count($this->jobs);
  53. }
  54. /**
  55. * Completes the jobs in the list.
  56. *
  57. * @list<array{0: object, 1: Acknowledger}> $jobs A list of pairs of messages and their corresponding acknowledgers
  58. */
  59. private function process(array $jobs): void
  60. {
  61. throw new LogicException(sprintf('"%s" should implement abstract method "process()".', get_debug_type($this)));
  62. }
  63. }