123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371 |
- <?php
- class DrupalQueue {
-
- public static function get($name, $reliable = FALSE) {
- static $queues;
- if (!isset($queues[$name])) {
- $class = variable_get('queue_class_' . $name, NULL);
- if (!$class) {
- $class = variable_get('queue_default_class', 'SystemQueue');
- }
- $object = new $class($name);
- if ($reliable && !$object instanceof DrupalReliableQueueInterface) {
- $class = variable_get('queue_default_reliable_class', 'SystemQueue');
- $object = new $class($name);
- }
- $queues[$name] = $object;
- }
- return $queues[$name];
- }
- }
- interface DrupalQueueInterface {
-
- public function createItem($data);
-
- public function numberOfItems();
-
- public function claimItem($lease_time = 3600);
-
- public function deleteItem($item);
-
- public function releaseItem($item);
-
- public function createQueue();
-
- public function deleteQueue();
- }
- interface DrupalReliableQueueInterface extends DrupalQueueInterface {
- }
- class SystemQueue implements DrupalReliableQueueInterface {
-
- protected $name;
- public function __construct($name) {
- $this->name = $name;
- }
- public function createItem($data) {
-
-
- $query = db_insert('queue')
- ->fields(array(
- 'name' => $this->name,
- 'data' => serialize($data),
-
-
- 'created' => time(),
- ));
- return (bool) $query->execute();
- }
- public function numberOfItems() {
- return db_query('SELECT COUNT(item_id) FROM {queue} WHERE name = :name', array(':name' => $this->name))->fetchField();
- }
- public function claimItem($lease_time = 30) {
-
-
-
-
- while (TRUE) {
- $item = db_query_range('SELECT data, item_id FROM {queue} q WHERE expire = 0 AND name = :name ORDER BY created, item_id ASC', 0, 1, array(':name' => $this->name))->fetchObject();
- if ($item) {
-
-
-
-
-
-
- $update = db_update('queue')
- ->fields(array(
- 'expire' => time() + $lease_time,
- ))
- ->condition('item_id', $item->item_id)
- ->condition('expire', 0);
-
- if ($update->execute()) {
- $item->data = unserialize($item->data);
- return $item;
- }
- }
- else {
-
- return FALSE;
- }
- }
- }
- public function releaseItem($item) {
- $update = db_update('queue')
- ->fields(array(
- 'expire' => 0,
- ))
- ->condition('item_id', $item->item_id);
- return $update->execute();
- }
- public function deleteItem($item) {
- db_delete('queue')
- ->condition('item_id', $item->item_id)
- ->execute();
- }
- public function createQueue() {
-
-
-
- }
- public function deleteQueue() {
- db_delete('queue')
- ->condition('name', $this->name)
- ->execute();
- }
- }
- class MemoryQueue implements DrupalQueueInterface {
-
- protected $queue;
-
- protected $id_sequence;
-
- public function __construct($name) {
- $this->queue = array();
- $this->id_sequence = 0;
- }
- public function createItem($data) {
- $item = new stdClass();
- $item->item_id = $this->id_sequence++;
- $item->data = $data;
- $item->created = time();
- $item->expire = 0;
- $this->queue[$item->item_id] = $item;
- return TRUE;
- }
- public function numberOfItems() {
- return count($this->queue);
- }
- public function claimItem($lease_time = 30) {
- foreach ($this->queue as $key => $item) {
- if ($item->expire == 0) {
- $item->expire = time() + $lease_time;
- $this->queue[$key] = $item;
- return $item;
- }
- }
- return FALSE;
- }
- public function deleteItem($item) {
- unset($this->queue[$item->item_id]);
- }
- public function releaseItem($item) {
- if (isset($this->queue[$item->item_id]) && $this->queue[$item->item_id]->expire != 0) {
- $this->queue[$item->item_id]->expire = 0;
- return TRUE;
- }
- return FALSE;
- }
- public function createQueue() {
-
- }
- public function deleteQueue() {
- $this->queue = array();
- $this->id_sequence = 0;
- }
- }
|