PhpRedis.php 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. <?php
  2. /**
  3. * @todo
  4. * Set high expire value to the hash for rotation when memory is empty
  5. * React upon cache clear all and rebuild path list?
  6. */
  7. class Redis_Queue_PhpRedis extends Redis_Queue_Base
  8. {
  9. public function createItem($data)
  10. {
  11. $client = $this->getClient();
  12. $dKey = $this->getKeyForData();
  13. $qKey = $this->getKeyForQueue();
  14. // Identifier does not not need to be in the transaction,
  15. // in case of any error we'll just skip a value in the sequence.
  16. $id = $client->hincrby($dKey, self::QUEUE_HKEY_SEQ, 1);
  17. $record = new stdClass();
  18. $record->qid = $id;
  19. $record->data = $data;
  20. $record->timestamp = time();
  21. $pipe = $client->multi(Redis::PIPELINE);
  22. // Thanks to the redis_queue standalone module maintainers for
  23. // this piece of code, very effective. Note that we added the
  24. // pipeline thought.
  25. $pipe->hsetnx($dKey, $id, serialize($record));
  26. $pipe->llen($qKey);
  27. $pipe->lpush($qKey, $id);
  28. $ret = $pipe->exec();
  29. if (!$success = ($ret[0] && $ret[1] < $ret[2])) {
  30. if ($ret[0]) {
  31. // HSETNEX worked but not the PUSH command we therefore
  32. // need to drop the inserted data. I would have prefered
  33. // a DISCARD instead but we are in pipelined transaction
  34. // we cannot actually do a DISCARD here.
  35. $client->hdel($dKey, $id);
  36. }
  37. }
  38. return $success;
  39. }
  40. public function numberOfItems()
  41. {
  42. return $this->getClient()->llen($this->getKeyForQueue());
  43. }
  44. public function claimItem($lease_time = 30)
  45. {
  46. // @todo Deal with lease
  47. $client = $this->getClient();
  48. $id = $client->rpoplpush(
  49. $this->getKeyForQueue(),
  50. $this->getKeyForClaimed()
  51. );
  52. if ($id) {
  53. if ($item = $client->hget($this->getKeyForData(), $id)) {
  54. if ($item = unserialize($item)) {
  55. return $item;
  56. }
  57. }
  58. }
  59. return false;
  60. }
  61. public function deleteItem($item)
  62. {
  63. $pipe = $this->getClient()->multi(Redis::PIPELINE);
  64. $pipe->lrem($this->getKeyForQueue(), $item->qid);
  65. $pipe->lrem($this->getKeyForClaimed(), $item->qid);
  66. $pipe->hdel($this->getKeyForData(), $item->qid);
  67. $pipe->exec();
  68. }
  69. public function releaseItem($item)
  70. {
  71. $pipe = $this->getClient()->multi(Redis::PIPELINE);
  72. $pipe->lrem($this->getKeyForClaimed(), $item->qid, -1);
  73. $pipe->lpush($this->getKeyForQueue(), $item->qid);
  74. $ret = $pipe->exec();
  75. return $ret[0] && $ret[1];
  76. }
  77. public function createQueue()
  78. {
  79. }
  80. public function deleteQueue()
  81. {
  82. $this->getClient()->del(
  83. $this->getKeyForQueue(),
  84. $this->getKeyForClaimed(),
  85. $this->getKeyForData()
  86. );
  87. }
  88. }