system.queue.inc 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371
  1. <?php
  2. /**
  3. * @file
  4. * Queue functionality.
  5. */
  6. /**
  7. * @defgroup queue Queue operations
  8. * @{
  9. * Queue items to allow later processing.
  10. *
  11. * The queue system allows placing items in a queue and processing them later.
  12. * The system tries to ensure that only one consumer can process an item.
  13. *
  14. * Before a queue can be used it needs to be created by
  15. * DrupalQueueInterface::createQueue().
  16. *
  17. * Items can be added to the queue by passing an arbitrary data object to
  18. * DrupalQueueInterface::createItem().
  19. *
  20. * To process an item, call DrupalQueueInterface::claimItem() and specify how
  21. * long you want to have a lease for working on that item. When finished
  22. * processing, the item needs to be deleted by calling
  23. * DrupalQueueInterface::deleteItem(). If the consumer dies, the item will be
  24. * made available again by the DrupalQueueInterface implementation once the
  25. * lease expires. Another consumer will then be able to receive it when calling
  26. * DrupalQueueInterface::claimItem(). Due to this, the processing code should
  27. * be aware that an item might be handed over for processing more than once.
  28. *
  29. * The $item object used by the DrupalQueueInterface can contain arbitrary
  30. * metadata depending on the implementation. Systems using the interface should
  31. * only rely on the data property which will contain the information passed to
  32. * DrupalQueueInterface::createItem(). The full queue item returned by
  33. * DrupalQueueInterface::claimItem() needs to be passed to
  34. * DrupalQueueInterface::deleteItem() once processing is completed.
  35. *
  36. * There are two kinds of queue backends available: reliable, which preserves
  37. * the order of messages and guarantees that every item will be executed at
  38. * least once. The non-reliable kind only does a best effort to preserve order
  39. * in messages and to execute them at least once but there is a small chance
  40. * that some items get lost. For example, some distributed back-ends like
  41. * Amazon SQS will be managing jobs for a large set of producers and consumers
  42. * where a strict FIFO ordering will likely not be preserved. Another example
  43. * would be an in-memory queue backend which might lose items if it crashes.
  44. * However, such a backend would be able to deal with significantly more writes
  45. * than a reliable queue and for many tasks this is more important. See
  46. * aggregator_cron() for an example of how to effectively utilize a
  47. * non-reliable queue. Another example is doing Twitter statistics -- the small
  48. * possibility of losing a few items is insignificant next to power of the
  49. * queue being able to keep up with writes. As described in the processing
  50. * section, regardless of the queue being reliable or not, the processing code
  51. * should be aware that an item might be handed over for processing more than
  52. * once (because the processing code might time out before it finishes).
  53. */
  54. /**
  55. * Factory class for interacting with queues.
  56. */
  57. class DrupalQueue {
  58. /**
  59. * Returns the queue object for a given name.
  60. *
  61. * The following variables can be set by variable_set or $conf overrides:
  62. * - queue_class_$name: the class to be used for the queue $name.
  63. * - queue_default_class: the class to use when queue_class_$name is not
  64. * defined. Defaults to SystemQueue, a reliable backend using SQL.
  65. * - queue_default_reliable_class: the class to use when queue_class_$name is
  66. * not defined and the queue_default_class is not reliable. Defaults to
  67. * SystemQueue.
  68. *
  69. * @param $name
  70. * Arbitrary string. The name of the queue to work with.
  71. * @param $reliable
  72. * TRUE if the ordering of items and guaranteeing every item executes at
  73. * least once is important, FALSE if scalability is the main concern.
  74. *
  75. * @return
  76. * The queue object for a given name.
  77. */
  78. public static function get($name, $reliable = FALSE) {
  79. static $queues;
  80. if (!isset($queues[$name])) {
  81. $class = variable_get('queue_class_' . $name, NULL);
  82. if (!$class) {
  83. $class = variable_get('queue_default_class', 'SystemQueue');
  84. }
  85. $object = new $class($name);
  86. if ($reliable && !$object instanceof DrupalReliableQueueInterface) {
  87. $class = variable_get('queue_default_reliable_class', 'SystemQueue');
  88. $object = new $class($name);
  89. }
  90. $queues[$name] = $object;
  91. }
  92. return $queues[$name];
  93. }
  94. }
  95. interface DrupalQueueInterface {
  96. /**
  97. * Start working with a queue.
  98. *
  99. * @param $name
  100. * Arbitrary string. The name of the queue to work with.
  101. */
  102. public function __construct($name);
  103. /**
  104. * Add a queue item and store it directly to the queue.
  105. *
  106. * @param $data
  107. * Arbitrary data to be associated with the new task in the queue.
  108. * @return
  109. * TRUE if the item was successfully created and was (best effort) added
  110. * to the queue, otherwise FALSE. We don't guarantee the item was
  111. * committed to disk etc, but as far as we know, the item is now in the
  112. * queue.
  113. */
  114. public function createItem($data);
  115. /**
  116. * Retrieve the number of items in the queue.
  117. *
  118. * This is intended to provide a "best guess" count of the number of items in
  119. * the queue. Depending on the implementation and the setup, the accuracy of
  120. * the results of this function may vary.
  121. *
  122. * e.g. On a busy system with a large number of consumers and items, the
  123. * result might only be valid for a fraction of a second and not provide an
  124. * accurate representation.
  125. *
  126. * @return
  127. * An integer estimate of the number of items in the queue.
  128. */
  129. public function numberOfItems();
  130. /**
  131. * Claim an item in the queue for processing.
  132. *
  133. * @param $lease_time
  134. * How long the processing is expected to take in seconds, defaults to an
  135. * hour. After this lease expires, the item will be reset and another
  136. * consumer can claim the item. For idempotent tasks (which can be run
  137. * multiple times without side effects), shorter lease times would result
  138. * in lower latency in case a consumer fails. For tasks that should not be
  139. * run more than once (non-idempotent), a larger lease time will make it
  140. * more rare for a given task to run multiple times in cases of failure,
  141. * at the cost of higher latency.
  142. * @return
  143. * On success we return an item object. If the queue is unable to claim an
  144. * item it returns false. This implies a best effort to retrieve an item
  145. * and either the queue is empty or there is some other non-recoverable
  146. * problem.
  147. */
  148. public function claimItem($lease_time = 3600);
  149. /**
  150. * Delete a finished item from the queue.
  151. *
  152. * @param $item
  153. * The item returned by DrupalQueueInterface::claimItem().
  154. */
  155. public function deleteItem($item);
  156. /**
  157. * Release an item that the worker could not process, so another
  158. * worker can come in and process it before the timeout expires.
  159. *
  160. * @param $item
  161. * @return boolean
  162. */
  163. public function releaseItem($item);
  164. /**
  165. * Create a queue.
  166. *
  167. * Called during installation and should be used to perform any necessary
  168. * initialization operations. This should not be confused with the
  169. * constructor for these objects, which is called every time an object is
  170. * instantiated to operate on a queue. This operation is only needed the
  171. * first time a given queue is going to be initialized (for example, to make
  172. * a new database table or directory to hold tasks for the queue -- it
  173. * depends on the queue implementation if this is necessary at all).
  174. */
  175. public function createQueue();
  176. /**
  177. * Delete a queue and every item in the queue.
  178. */
  179. public function deleteQueue();
  180. }
  181. /**
  182. * Reliable queue interface.
  183. *
  184. * Classes implementing this interface preserve the order of messages and
  185. * guarantee that every item will be executed at least once.
  186. */
  187. interface DrupalReliableQueueInterface extends DrupalQueueInterface {
  188. }
  189. /**
  190. * Default queue implementation.
  191. */
  192. class SystemQueue implements DrupalReliableQueueInterface {
  193. /**
  194. * The name of the queue this instance is working with.
  195. *
  196. * @var string
  197. */
  198. protected $name;
  199. public function __construct($name) {
  200. $this->name = $name;
  201. }
  202. public function createItem($data) {
  203. // During a Drupal 6.x to 7.x update, drupal_get_schema() does not contain
  204. // the queue table yet, so we cannot rely on drupal_write_record().
  205. $query = db_insert('queue')
  206. ->fields(array(
  207. 'name' => $this->name,
  208. 'data' => serialize($data),
  209. // We cannot rely on REQUEST_TIME because many items might be created
  210. // by a single request which takes longer than 1 second.
  211. 'created' => time(),
  212. ));
  213. return (bool) $query->execute();
  214. }
  215. public function numberOfItems() {
  216. return db_query('SELECT COUNT(item_id) FROM {queue} WHERE name = :name', array(':name' => $this->name))->fetchField();
  217. }
  218. public function claimItem($lease_time = 30) {
  219. // Claim an item by updating its expire fields. If claim is not successful
  220. // another thread may have claimed the item in the meantime. Therefore loop
  221. // until an item is successfully claimed or we are reasonably sure there
  222. // are no unclaimed items left.
  223. while (TRUE) {
  224. $item = db_query_range('SELECT data, item_id FROM {queue} q WHERE expire = 0 AND name = :name ORDER BY created ASC', 0, 1, array(':name' => $this->name))->fetchObject();
  225. if ($item) {
  226. // Try to update the item. Only one thread can succeed in UPDATEing the
  227. // same row. We cannot rely on REQUEST_TIME because items might be
  228. // claimed by a single consumer which runs longer than 1 second. If we
  229. // continue to use REQUEST_TIME instead of the current time(), we steal
  230. // time from the lease, and will tend to reset items before the lease
  231. // should really expire.
  232. $update = db_update('queue')
  233. ->fields(array(
  234. 'expire' => time() + $lease_time,
  235. ))
  236. ->condition('item_id', $item->item_id)
  237. ->condition('expire', 0);
  238. // If there are affected rows, this update succeeded.
  239. if ($update->execute()) {
  240. $item->data = unserialize($item->data);
  241. return $item;
  242. }
  243. }
  244. else {
  245. // No items currently available to claim.
  246. return FALSE;
  247. }
  248. }
  249. }
  250. public function releaseItem($item) {
  251. $update = db_update('queue')
  252. ->fields(array(
  253. 'expire' => 0,
  254. ))
  255. ->condition('item_id', $item->item_id);
  256. return $update->execute();
  257. }
  258. public function deleteItem($item) {
  259. db_delete('queue')
  260. ->condition('item_id', $item->item_id)
  261. ->execute();
  262. }
  263. public function createQueue() {
  264. // All tasks are stored in a single database table (which is created when
  265. // Drupal is first installed) so there is nothing we need to do to create
  266. // a new queue.
  267. }
  268. public function deleteQueue() {
  269. db_delete('queue')
  270. ->condition('name', $this->name)
  271. ->execute();
  272. }
  273. }
  274. /**
  275. * Static queue implementation.
  276. *
  277. * This allows "undelayed" variants of processes relying on the Queue
  278. * interface. The queue data resides in memory. It should only be used for
  279. * items that will be queued and dequeued within a given page request.
  280. */
  281. class MemoryQueue implements DrupalQueueInterface {
  282. /**
  283. * The queue data.
  284. *
  285. * @var array
  286. */
  287. protected $queue;
  288. /**
  289. * Counter for item ids.
  290. *
  291. * @var int
  292. */
  293. protected $id_sequence;
  294. public function __construct($name) {
  295. $this->queue = array();
  296. $this->id_sequence = 0;
  297. }
  298. public function createItem($data) {
  299. $item = new stdClass();
  300. $item->item_id = $this->id_sequence++;
  301. $item->data = $data;
  302. $item->created = time();
  303. $item->expire = 0;
  304. $this->queue[$item->item_id] = $item;
  305. }
  306. public function numberOfItems() {
  307. return count($this->queue);
  308. }
  309. public function claimItem($lease_time = 30) {
  310. foreach ($this->queue as $key => $item) {
  311. if ($item->expire == 0) {
  312. $item->expire = time() + $lease_time;
  313. $this->queue[$key] = $item;
  314. return $item;
  315. }
  316. }
  317. return FALSE;
  318. }
  319. public function deleteItem($item) {
  320. unset($this->queue[$item->item_id]);
  321. }
  322. public function releaseItem($item) {
  323. if (isset($this->queue[$item->item_id]) && $this->queue[$item->item_id]->expire != 0) {
  324. $this->queue[$item->item_id]->expire = 0;
  325. return TRUE;
  326. }
  327. return FALSE;
  328. }
  329. public function createQueue() {
  330. // Nothing needed here.
  331. }
  332. public function deleteQueue() {
  333. $this->queue = array();
  334. $this->id_sequence = 0;
  335. }
  336. }
  337. /**
  338. * @} End of "defgroup queue".
  339. */