Batch.php 3.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. <?php
  2. namespace Guzzle\Batch;
  3. use Guzzle\Batch\Exception\BatchTransferException;
  4. /**
  5. * Default batch implementation used to convert queued items into smaller chunks of batches using a
  6. * {@see BatchDivisorIterface} and transfers each batch using a {@see BatchTransferInterface}.
  7. *
  8. * Any exception encountered during a flush operation will throw a {@see BatchTransferException} object containing the
  9. * batch that failed. After an exception is encountered, you can flush the batch again to attempt to finish transferring
  10. * any previously created batches or queued items.
  11. */
  12. class Batch implements BatchInterface
  13. {
  14. /** @var \SplQueue Queue of items in the queue */
  15. protected $queue;
  16. /** @var array Divided batches to be transferred */
  17. protected $dividedBatches;
  18. /** @var BatchTransferInterface */
  19. protected $transferStrategy;
  20. /** @var BatchDivisorInterface */
  21. protected $divisionStrategy;
  22. /**
  23. * @param BatchTransferInterface $transferStrategy Strategy used to transfer items
  24. * @param BatchDivisorInterface $divisionStrategy Divisor used to create batches
  25. */
  26. public function __construct(BatchTransferInterface $transferStrategy, BatchDivisorInterface $divisionStrategy)
  27. {
  28. $this->transferStrategy = $transferStrategy;
  29. $this->divisionStrategy = $divisionStrategy;
  30. $this->queue = new \SplQueue();
  31. $this->queue->setIteratorMode(\SplQueue::IT_MODE_DELETE);
  32. $this->dividedBatches = array();
  33. }
  34. public function add($item)
  35. {
  36. $this->queue->enqueue($item);
  37. return $this;
  38. }
  39. public function flush()
  40. {
  41. $this->createBatches();
  42. $items = array();
  43. foreach ($this->dividedBatches as $batchIndex => $dividedBatch) {
  44. while ($dividedBatch->valid()) {
  45. $batch = $dividedBatch->current();
  46. $dividedBatch->next();
  47. try {
  48. $this->transferStrategy->transfer($batch);
  49. $items = array_merge($items, $batch);
  50. } catch (\Exception $e) {
  51. throw new BatchTransferException($batch, $items, $e, $this->transferStrategy, $this->divisionStrategy);
  52. }
  53. }
  54. // Keep the divided batch down to a minimum in case of a later exception
  55. unset($this->dividedBatches[$batchIndex]);
  56. }
  57. return $items;
  58. }
  59. public function isEmpty()
  60. {
  61. return count($this->queue) == 0 && count($this->dividedBatches) == 0;
  62. }
  63. /**
  64. * Create batches for any queued items
  65. */
  66. protected function createBatches()
  67. {
  68. if (count($this->queue)) {
  69. if ($batches = $this->divisionStrategy->createBatches($this->queue)) {
  70. // Convert arrays into iterators
  71. if (is_array($batches)) {
  72. $batches = new \ArrayIterator($batches);
  73. }
  74. $this->dividedBatches[] = $batches;
  75. }
  76. }
  77. }
  78. }