| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370 | <?php/** * @file * Queue functionality. *//** * @defgroup queue Queue operations * @{ * Queue items to allow later processing. * * The queue system allows placing items in a queue and processing them later. * The system tries to ensure that only one consumer can process an item. * * Before a queue can be used it needs to be created by * DrupalQueueInterface::createQueue(). * * Items can be added to the queue by passing an arbitrary data object to * DrupalQueueInterface::createItem(). * * To process an item, call DrupalQueueInterface::claimItem() and specify how * long you want to have a lease for working on that item. When finished * processing, the item needs to be deleted by calling * DrupalQueueInterface::deleteItem(). If the consumer dies, the item will be * made available again by the DrupalQueueInterface implementation once the * lease expires. Another consumer will then be able to receive it when calling * DrupalQueueInterface::claimItem(). Due to this, the processing code should * be aware that an item might be handed over for processing more than once. * * The $item object used by the DrupalQueueInterface can contain arbitrary * metadata depending on the implementation. Systems using the interface should * only rely on the data property which will contain the information passed to * DrupalQueueInterface::createItem(). The full queue item returned by * DrupalQueueInterface::claimItem() needs to be passed to * DrupalQueueInterface::deleteItem() once processing is completed. * * There are two kinds of queue backends available: reliable, which preserves * the order of messages and guarantees that every item will be executed at * least once. The non-reliable kind only does a best effort to preserve order * in messages and to execute them at least once but there is a small chance * that some items get lost. For example, some distributed back-ends like * Amazon SQS will be managing jobs for a large set of producers and consumers * where a strict FIFO ordering will likely not be preserved. Another example * would be an in-memory queue backend which might lose items if it crashes. * However, such a backend would be able to deal with significantly more writes * than a reliable queue and for many tasks this is more important. See * aggregator_cron() for an example of how to effectively utilize a * non-reliable queue. Another example is doing Twitter statistics -- the small * possibility of losing a few items is insignificant next to power of the * queue being able to keep up with writes. As described in the processing * section, regardless of the queue being reliable or not, the processing code * should be aware that an item might be handed over for processing more than * once (because the processing code might time out before it finishes). *//** * Factory class for interacting with queues. */class DrupalQueue {  /**   * Returns the queue object for a given name.   *   * The following variables can be set by variable_set or $conf overrides:   * - queue_class_$name: the class to be used for the queue $name.   * - queue_default_class: the class to use when queue_class_$name is not   *   defined. Defaults to SystemQueue, a reliable backend using SQL.   * - queue_default_reliable_class: the class to use when queue_class_$name is   *   not defined and the queue_default_class is not reliable. Defaults to   *   SystemQueue.   *   * @param $name   *   Arbitrary string. The name of the queue to work with.   * @param $reliable   *   TRUE if the ordering of items and guaranteeing every item executes at   *   least once is important, FALSE if scalability is the main concern.   *   * @return   *   The queue object for a given name.   */  public static function get($name, $reliable = FALSE) {    static $queues;    if (!isset($queues[$name])) {      $class = variable_get('queue_class_' . $name, NULL);      if (!$class) {        $class = variable_get('queue_default_class', 'SystemQueue');      }      $object = new $class($name);      if ($reliable && !$object instanceof DrupalReliableQueueInterface) {        $class = variable_get('queue_default_reliable_class', 'SystemQueue');        $object = new $class($name);      }      $queues[$name] = $object;    }    return $queues[$name];  }}interface DrupalQueueInterface {  /**   * Add a queue item and store it directly to the queue.   *   * @param $data   *   Arbitrary data to be associated with the new task in the queue.   * @return   *   TRUE if the item was successfully created and was (best effort) added   *   to the queue, otherwise FALSE. We don't guarantee the item was   *   committed to disk etc, but as far as we know, the item is now in the   *   queue.   */  public function createItem($data);  /**   * Retrieve the number of items in the queue.   *   * This is intended to provide a "best guess" count of the number of items in   * the queue. Depending on the implementation and the setup, the accuracy of   * the results of this function may vary.   *   * e.g. On a busy system with a large number of consumers and items, the   * result might only be valid for a fraction of a second and not provide an   * accurate representation.   *   * @return   *   An integer estimate of the number of items in the queue.   */  public function numberOfItems();  /**   * Claim an item in the queue for processing.   *   * @param $lease_time   *   How long the processing is expected to take in seconds, defaults to an   *   hour. After this lease expires, the item will be reset and another   *   consumer can claim the item. For idempotent tasks (which can be run   *   multiple times without side effects), shorter lease times would result   *   in lower latency in case a consumer fails. For tasks that should not be   *   run more than once (non-idempotent), a larger lease time will make it   *   more rare for a given task to run multiple times in cases of failure,   *   at the cost of higher latency.   * @return   *   On success we return an item object. If the queue is unable to claim an   *   item it returns false. This implies a best effort to retrieve an item   *   and either the queue is empty or there is some other non-recoverable   *   problem.   */  public function claimItem($lease_time = 3600);  /**   * Delete a finished item from the queue.   *   * @param $item   *   The item returned by DrupalQueueInterface::claimItem().   */  public function deleteItem($item);  /**   * Release an item that the worker could not process, so another   * worker can come in and process it before the timeout expires.   *   * @param $item   * @return boolean   */  public function releaseItem($item);  /**   * Create a queue.   *   * Called during installation and should be used to perform any necessary   * initialization operations. This should not be confused with the   * constructor for these objects, which is called every time an object is   * instantiated to operate on a queue. This operation is only needed the   * first time a given queue is going to be initialized (for example, to make   * a new database table or directory to hold tasks for the queue -- it   * depends on the queue implementation if this is necessary at all).   */  public function createQueue();  /**   * Delete a queue and every item in the queue.   */  public function deleteQueue();}/** * Reliable queue interface. * * Classes implementing this interface preserve the order of messages and * guarantee that every item will be executed at least once. */interface DrupalReliableQueueInterface extends DrupalQueueInterface {}/** * Default queue implementation. */class SystemQueue implements DrupalReliableQueueInterface {  /**   * The name of the queue this instance is working with.   *   * @var string   */  protected $name;  public function __construct($name) {    $this->name = $name;  }  public function createItem($data) {    // During a Drupal 6.x to 7.x update, drupal_get_schema() does not contain    // the queue table yet, so we cannot rely on drupal_write_record().    $query = db_insert('queue')      ->fields(array(        'name' => $this->name,        'data' => serialize($data),        // We cannot rely on REQUEST_TIME because many items might be created        // by a single request which takes longer than 1 second.        'created' => time(),      ));    return (bool) $query->execute();  }  public function numberOfItems() {    return db_query('SELECT COUNT(item_id) FROM {queue} WHERE name = :name', array(':name' => $this->name))->fetchField();  }  public function claimItem($lease_time = 30) {    // Claim an item by updating its expire fields. If claim is not successful    // another thread may have claimed the item in the meantime. Therefore loop    // until an item is successfully claimed or we are reasonably sure there    // are no unclaimed items left.    while (TRUE) {      $item = db_query_range('SELECT data, item_id FROM {queue} q WHERE expire = 0 AND name = :name ORDER BY created, item_id ASC', 0, 1, array(':name' => $this->name))->fetchObject();      if ($item) {        // Try to update the item. Only one thread can succeed in UPDATEing the        // same row. We cannot rely on REQUEST_TIME because items might be        // claimed by a single consumer which runs longer than 1 second. If we        // continue to use REQUEST_TIME instead of the current time(), we steal        // time from the lease, and will tend to reset items before the lease        // should really expire.        $update = db_update('queue')          ->fields(array(            'expire' => time() + $lease_time,          ))          ->condition('item_id', $item->item_id)          ->condition('expire', 0);        // If there are affected rows, this update succeeded.        if ($update->execute()) {          $item->data = unserialize($item->data);          return $item;        }      }      else {        // No items currently available to claim.        return FALSE;      }    }  }  public function releaseItem($item) {    $update = db_update('queue')      ->fields(array(        'expire' => 0,      ))      ->condition('item_id', $item->item_id);      return $update->execute();  }  public function deleteItem($item) {    db_delete('queue')      ->condition('item_id', $item->item_id)      ->execute();  }  public function createQueue() {    // All tasks are stored in a single database table (which is created when    // Drupal is first installed) so there is nothing we need to do to create    // a new queue.  }  public function deleteQueue() {    db_delete('queue')      ->condition('name', $this->name)      ->execute();  }}/** * Static queue implementation. * * This allows "undelayed" variants of processes relying on the Queue * interface. The queue data resides in memory. It should only be used for * items that will be queued and dequeued within a given page request. */class MemoryQueue implements DrupalQueueInterface {  /**   * The queue data.   *   * @var array   */  protected $queue;  /**   * Counter for item ids.   *   * @var int   */  protected $id_sequence;  /**   * Start working with a queue.   *   * @param $name   *   Arbitrary string. The name of the queue to work with.   */  public function __construct($name) {    $this->queue = array();    $this->id_sequence = 0;  }  public function createItem($data) {    $item = new stdClass();    $item->item_id = $this->id_sequence++;    $item->data = $data;    $item->created = time();    $item->expire = 0;    $this->queue[$item->item_id] = $item;  }  public function numberOfItems() {    return count($this->queue);  }  public function claimItem($lease_time = 30) {    foreach ($this->queue as $key => $item) {      if ($item->expire == 0) {        $item->expire = time() + $lease_time;        $this->queue[$key] = $item;        return $item;      }    }    return FALSE;  }  public function deleteItem($item) {    unset($this->queue[$item->item_id]);  }  public function releaseItem($item) {    if (isset($this->queue[$item->item_id]) && $this->queue[$item->item_id]->expire != 0) {      $this->queue[$item->item_id]->expire = 0;      return TRUE;    }    return FALSE;  }  public function createQueue() {    // Nothing needed here.  }  public function deleteQueue() {    $this->queue = array();    $this->id_sequence = 0;  }}/** * @} End of "defgroup queue". */
 |