system.queue.inc 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370
  1. <?php
  2. /**
  3. * @file
  4. * Queue functionality.
  5. */
  6. /**
  7. * @defgroup queue Queue operations
  8. * @{
  9. * Queue items to allow later processing.
  10. *
  11. * The queue system allows placing items in a queue and processing them later.
  12. * The system tries to ensure that only one consumer can process an item.
  13. *
  14. * Before a queue can be used it needs to be created by
  15. * DrupalQueueInterface::createQueue().
  16. *
  17. * Items can be added to the queue by passing an arbitrary data object to
  18. * DrupalQueueInterface::createItem().
  19. *
  20. * To process an item, call DrupalQueueInterface::claimItem() and specify how
  21. * long you want to have a lease for working on that item. When finished
  22. * processing, the item needs to be deleted by calling
  23. * DrupalQueueInterface::deleteItem(). If the consumer dies, the item will be
  24. * made available again by the DrupalQueueInterface implementation once the
  25. * lease expires. Another consumer will then be able to receive it when calling
  26. * DrupalQueueInterface::claimItem(). Due to this, the processing code should
  27. * be aware that an item might be handed over for processing more than once.
  28. *
  29. * The $item object used by the DrupalQueueInterface can contain arbitrary
  30. * metadata depending on the implementation. Systems using the interface should
  31. * only rely on the data property which will contain the information passed to
  32. * DrupalQueueInterface::createItem(). The full queue item returned by
  33. * DrupalQueueInterface::claimItem() needs to be passed to
  34. * DrupalQueueInterface::deleteItem() once processing is completed.
  35. *
  36. * There are two kinds of queue backends available: reliable, which preserves
  37. * the order of messages and guarantees that every item will be executed at
  38. * least once. The non-reliable kind only does a best effort to preserve order
  39. * in messages and to execute them at least once but there is a small chance
  40. * that some items get lost. For example, some distributed back-ends like
  41. * Amazon SQS will be managing jobs for a large set of producers and consumers
  42. * where a strict FIFO ordering will likely not be preserved. Another example
  43. * would be an in-memory queue backend which might lose items if it crashes.
  44. * However, such a backend would be able to deal with significantly more writes
  45. * than a reliable queue and for many tasks this is more important. See
  46. * aggregator_cron() for an example of how to effectively utilize a
  47. * non-reliable queue. Another example is doing Twitter statistics -- the small
  48. * possibility of losing a few items is insignificant next to power of the
  49. * queue being able to keep up with writes. As described in the processing
  50. * section, regardless of the queue being reliable or not, the processing code
  51. * should be aware that an item might be handed over for processing more than
  52. * once (because the processing code might time out before it finishes).
  53. */
  54. /**
  55. * Factory class for interacting with queues.
  56. */
  57. class DrupalQueue {
  58. /**
  59. * Returns the queue object for a given name.
  60. *
  61. * The following variables can be set by variable_set or $conf overrides:
  62. * - queue_class_$name: the class to be used for the queue $name.
  63. * - queue_default_class: the class to use when queue_class_$name is not
  64. * defined. Defaults to SystemQueue, a reliable backend using SQL.
  65. * - queue_default_reliable_class: the class to use when queue_class_$name is
  66. * not defined and the queue_default_class is not reliable. Defaults to
  67. * SystemQueue.
  68. *
  69. * @param $name
  70. * Arbitrary string. The name of the queue to work with.
  71. * @param $reliable
  72. * TRUE if the ordering of items and guaranteeing every item executes at
  73. * least once is important, FALSE if scalability is the main concern.
  74. *
  75. * @return
  76. * The queue object for a given name.
  77. */
  78. public static function get($name, $reliable = FALSE) {
  79. static $queues;
  80. if (!isset($queues[$name])) {
  81. $class = variable_get('queue_class_' . $name, NULL);
  82. if (!$class) {
  83. $class = variable_get('queue_default_class', 'SystemQueue');
  84. }
  85. $object = new $class($name);
  86. if ($reliable && !$object instanceof DrupalReliableQueueInterface) {
  87. $class = variable_get('queue_default_reliable_class', 'SystemQueue');
  88. $object = new $class($name);
  89. }
  90. $queues[$name] = $object;
  91. }
  92. return $queues[$name];
  93. }
  94. }
  95. interface DrupalQueueInterface {
  96. /**
  97. * Add a queue item and store it directly to the queue.
  98. *
  99. * @param $data
  100. * Arbitrary data to be associated with the new task in the queue.
  101. * @return
  102. * TRUE if the item was successfully created and was (best effort) added
  103. * to the queue, otherwise FALSE. We don't guarantee the item was
  104. * committed to disk etc, but as far as we know, the item is now in the
  105. * queue.
  106. */
  107. public function createItem($data);
  108. /**
  109. * Retrieve the number of items in the queue.
  110. *
  111. * This is intended to provide a "best guess" count of the number of items in
  112. * the queue. Depending on the implementation and the setup, the accuracy of
  113. * the results of this function may vary.
  114. *
  115. * e.g. On a busy system with a large number of consumers and items, the
  116. * result might only be valid for a fraction of a second and not provide an
  117. * accurate representation.
  118. *
  119. * @return
  120. * An integer estimate of the number of items in the queue.
  121. */
  122. public function numberOfItems();
  123. /**
  124. * Claim an item in the queue for processing.
  125. *
  126. * @param $lease_time
  127. * How long the processing is expected to take in seconds, defaults to an
  128. * hour. After this lease expires, the item will be reset and another
  129. * consumer can claim the item. For idempotent tasks (which can be run
  130. * multiple times without side effects), shorter lease times would result
  131. * in lower latency in case a consumer fails. For tasks that should not be
  132. * run more than once (non-idempotent), a larger lease time will make it
  133. * more rare for a given task to run multiple times in cases of failure,
  134. * at the cost of higher latency.
  135. * @return
  136. * On success we return an item object. If the queue is unable to claim an
  137. * item it returns false. This implies a best effort to retrieve an item
  138. * and either the queue is empty or there is some other non-recoverable
  139. * problem.
  140. */
  141. public function claimItem($lease_time = 3600);
  142. /**
  143. * Delete a finished item from the queue.
  144. *
  145. * @param $item
  146. * The item returned by DrupalQueueInterface::claimItem().
  147. */
  148. public function deleteItem($item);
  149. /**
  150. * Release an item that the worker could not process, so another
  151. * worker can come in and process it before the timeout expires.
  152. *
  153. * @param $item
  154. * @return boolean
  155. */
  156. public function releaseItem($item);
  157. /**
  158. * Create a queue.
  159. *
  160. * Called during installation and should be used to perform any necessary
  161. * initialization operations. This should not be confused with the
  162. * constructor for these objects, which is called every time an object is
  163. * instantiated to operate on a queue. This operation is only needed the
  164. * first time a given queue is going to be initialized (for example, to make
  165. * a new database table or directory to hold tasks for the queue -- it
  166. * depends on the queue implementation if this is necessary at all).
  167. */
  168. public function createQueue();
  169. /**
  170. * Delete a queue and every item in the queue.
  171. */
  172. public function deleteQueue();
  173. }
  174. /**
  175. * Reliable queue interface.
  176. *
  177. * Classes implementing this interface preserve the order of messages and
  178. * guarantee that every item will be executed at least once.
  179. */
  180. interface DrupalReliableQueueInterface extends DrupalQueueInterface {
  181. }
  182. /**
  183. * Default queue implementation.
  184. */
  185. class SystemQueue implements DrupalReliableQueueInterface {
  186. /**
  187. * The name of the queue this instance is working with.
  188. *
  189. * @var string
  190. */
  191. protected $name;
  192. public function __construct($name) {
  193. $this->name = $name;
  194. }
  195. public function createItem($data) {
  196. // During a Drupal 6.x to 7.x update, drupal_get_schema() does not contain
  197. // the queue table yet, so we cannot rely on drupal_write_record().
  198. $query = db_insert('queue')
  199. ->fields(array(
  200. 'name' => $this->name,
  201. 'data' => serialize($data),
  202. // We cannot rely on REQUEST_TIME because many items might be created
  203. // by a single request which takes longer than 1 second.
  204. 'created' => time(),
  205. ));
  206. return (bool) $query->execute();
  207. }
  208. public function numberOfItems() {
  209. return db_query('SELECT COUNT(item_id) FROM {queue} WHERE name = :name', array(':name' => $this->name))->fetchField();
  210. }
  211. public function claimItem($lease_time = 30) {
  212. // Claim an item by updating its expire fields. If claim is not successful
  213. // another thread may have claimed the item in the meantime. Therefore loop
  214. // until an item is successfully claimed or we are reasonably sure there
  215. // are no unclaimed items left.
  216. while (TRUE) {
  217. $item = db_query_range('SELECT data, item_id FROM {queue} q WHERE expire = 0 AND name = :name ORDER BY created, item_id ASC', 0, 1, array(':name' => $this->name))->fetchObject();
  218. if ($item) {
  219. // Try to update the item. Only one thread can succeed in UPDATEing the
  220. // same row. We cannot rely on REQUEST_TIME because items might be
  221. // claimed by a single consumer which runs longer than 1 second. If we
  222. // continue to use REQUEST_TIME instead of the current time(), we steal
  223. // time from the lease, and will tend to reset items before the lease
  224. // should really expire.
  225. $update = db_update('queue')
  226. ->fields(array(
  227. 'expire' => time() + $lease_time,
  228. ))
  229. ->condition('item_id', $item->item_id)
  230. ->condition('expire', 0);
  231. // If there are affected rows, this update succeeded.
  232. if ($update->execute()) {
  233. $item->data = unserialize($item->data);
  234. return $item;
  235. }
  236. }
  237. else {
  238. // No items currently available to claim.
  239. return FALSE;
  240. }
  241. }
  242. }
  243. public function releaseItem($item) {
  244. $update = db_update('queue')
  245. ->fields(array(
  246. 'expire' => 0,
  247. ))
  248. ->condition('item_id', $item->item_id);
  249. return $update->execute();
  250. }
  251. public function deleteItem($item) {
  252. db_delete('queue')
  253. ->condition('item_id', $item->item_id)
  254. ->execute();
  255. }
  256. public function createQueue() {
  257. // All tasks are stored in a single database table (which is created when
  258. // Drupal is first installed) so there is nothing we need to do to create
  259. // a new queue.
  260. }
  261. public function deleteQueue() {
  262. db_delete('queue')
  263. ->condition('name', $this->name)
  264. ->execute();
  265. }
  266. }
  267. /**
  268. * Static queue implementation.
  269. *
  270. * This allows "undelayed" variants of processes relying on the Queue
  271. * interface. The queue data resides in memory. It should only be used for
  272. * items that will be queued and dequeued within a given page request.
  273. */
  274. class MemoryQueue implements DrupalQueueInterface {
  275. /**
  276. * The queue data.
  277. *
  278. * @var array
  279. */
  280. protected $queue;
  281. /**
  282. * Counter for item ids.
  283. *
  284. * @var int
  285. */
  286. protected $id_sequence;
  287. /**
  288. * Start working with a queue.
  289. *
  290. * @param $name
  291. * Arbitrary string. The name of the queue to work with.
  292. */
  293. public function __construct($name) {
  294. $this->queue = array();
  295. $this->id_sequence = 0;
  296. }
  297. public function createItem($data) {
  298. $item = new stdClass();
  299. $item->item_id = $this->id_sequence++;
  300. $item->data = $data;
  301. $item->created = time();
  302. $item->expire = 0;
  303. $this->queue[$item->item_id] = $item;
  304. }
  305. public function numberOfItems() {
  306. return count($this->queue);
  307. }
  308. public function claimItem($lease_time = 30) {
  309. foreach ($this->queue as $key => $item) {
  310. if ($item->expire == 0) {
  311. $item->expire = time() + $lease_time;
  312. $this->queue[$key] = $item;
  313. return $item;
  314. }
  315. }
  316. return FALSE;
  317. }
  318. public function deleteItem($item) {
  319. unset($this->queue[$item->item_id]);
  320. }
  321. public function releaseItem($item) {
  322. if (isset($this->queue[$item->item_id]) && $this->queue[$item->item_id]->expire != 0) {
  323. $this->queue[$item->item_id]->expire = 0;
  324. return TRUE;
  325. }
  326. return FALSE;
  327. }
  328. public function createQueue() {
  329. // Nothing needed here.
  330. }
  331. public function deleteQueue() {
  332. $this->queue = array();
  333. $this->id_sequence = 0;
  334. }
  335. }
  336. /**
  337. * @} End of "defgroup queue".
  338. */