JobScheduler.inc 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. <?php
  2. /**
  3. * @file
  4. * JobScheduler class.
  5. */
  6. /**
  7. * Use to make Job Scheduler exceptions identifiable by type.
  8. */
  9. class JobSchedulerException extends Exception {}
  10. /**
  11. * Manage scheduled jobs.
  12. */
  13. class JobScheduler {
  14. /**
  15. * The name of this scheduler.
  16. */
  17. protected $name;
  18. /**
  19. * Produces a single instance of JobScheduler for a schedule name.
  20. */
  21. public static function get($name) {
  22. static $schedulers;
  23. // Instantiante a new scheduler for $name if we haven't done so yet.
  24. if (!isset($schedulers[$name])) {
  25. $class = variable_get('job_scheduler_class_' . $name, 'JobScheduler');
  26. $schedulers[$name] = new $class($name);
  27. }
  28. return $schedulers[$name];
  29. }
  30. /**
  31. * Creates a JobScheduler object.
  32. */
  33. protected function __construct($name) {
  34. $this->name = $name;
  35. }
  36. /**
  37. * Returns scheduler info.
  38. *
  39. * @see hook_cron_job_scheduler_info()
  40. *
  41. * @throws JobSchedulerException.
  42. */
  43. public function info() {
  44. if ($info = job_scheduler_info($this->name)) {
  45. return $info;
  46. }
  47. throw new JobSchedulerException('Could not find Job Scheduler cron information for ' . check_plain($this->name));
  48. }
  49. /**
  50. * Add a job to the schedule, replace any existing job.
  51. *
  52. * A job is uniquely identified by $job = array(type, id).
  53. *
  54. * @param $job
  55. * An array that must contain the following keys:
  56. * 'type' - A string identifier of the type of job.
  57. * 'id' - A numeric identifier of the job.
  58. * 'period' - The time when the task should be executed.
  59. * 'periodic' - True if the task should be repeated periodically.
  60. *
  61. * @code
  62. * function worker_callback($job) {
  63. * // Work off job.
  64. * // Set next time to be called. If this portion of the code is not
  65. * // reached for some reason, the scheduler will keep periodically invoking
  66. * // the callback() with the period value initially specified.
  67. * $scheduler->set($job);
  68. * }
  69. * @endcode
  70. */
  71. public function set($job) {
  72. $job['name'] = $this->name;
  73. $job['last'] = REQUEST_TIME;
  74. if (!empty($job['crontab'])) {
  75. $crontab = new JobSchedulerCronTab($job['crontab']);
  76. $job['next'] = $crontab->nextTime(REQUEST_TIME);
  77. }
  78. else {
  79. $job['next'] = REQUEST_TIME + $job['period'];
  80. }
  81. $job['scheduled'] = 0;
  82. $this->remove($job);
  83. drupal_write_record('job_schedule', $job);
  84. }
  85. /**
  86. * Reserve a job.
  87. */
  88. protected function reserve($job) {
  89. $job['name'] = $this->name;
  90. $job['scheduled'] =
  91. $job['last'] = REQUEST_TIME;
  92. $job['next'] = $job['period'] + REQUEST_TIME;
  93. drupal_write_record('job_schedule', $job, array('name', 'type', 'id'));
  94. }
  95. /**
  96. * Remove a job from the schedule, replace any existing job.
  97. *
  98. * A job is uniquely identified by $job = array(type, id).
  99. */
  100. public function remove($job) {
  101. db_delete('job_schedule')
  102. ->condition('name', $this->name)
  103. ->condition('type', $job['type'])
  104. ->condition('id', isset($job['id']) ? $job['id'] : 0)
  105. ->execute();
  106. }
  107. /**
  108. * Remove all jobs for a given type.
  109. */
  110. public function removeAll($type) {
  111. db_delete('job_schedule')
  112. ->condition('name', $this->name)
  113. ->condition('type', $type)
  114. ->execute();
  115. }
  116. /**
  117. * Dispatches a job.
  118. *
  119. * Executes a worker callback or if schedule declares a queue name, queues a
  120. * job for execution.
  121. *
  122. * @param $job
  123. * A $job array as passed into set() or read from job_schedule table.
  124. *
  125. * @throws Exception
  126. * Exceptions thrown by code called by this method are passed on.
  127. */
  128. public function dispatch($job) {
  129. $info = $this->info();
  130. if (!$job['periodic']) {
  131. $this->remove($job);
  132. }
  133. if (!empty($info['queue name'])) {
  134. if (DrupalQueue::get($info['queue name'])->createItem($job)) {
  135. $this->reserve($job);
  136. }
  137. }
  138. else {
  139. $this->execute($job);
  140. }
  141. }
  142. /**
  143. * Executes a job that
  144. *
  145. * @param $job
  146. * A $job array as passed into set() or read from job_schedule table.
  147. *
  148. * @throws Exception
  149. * Exceptions thrown by code called by this method are passed on.
  150. */
  151. public function execute($job) {
  152. $info = $this->info();
  153. // If the job is periodic, re-schedule it before calling the worker, just in case
  154. if ($job['periodic']) {
  155. $job['last'] = REQUEST_TIME;
  156. $this->reschedule($job);
  157. }
  158. if (function_exists($info['worker callback'])) {
  159. call_user_func($info['worker callback'], $job);
  160. }
  161. else {
  162. // @todo If worker doesn't exist anymore we should do something about it, remove and throw exception?
  163. $this->remove($job);
  164. throw new JobSchedulerException('Could not find worker callback function: ' . $info['worker callback']);
  165. }
  166. }
  167. /**
  168. * Re-schedule a job if intended to run again
  169. *
  170. * (If cannot determine the next time, drop the job)
  171. */
  172. public function reschedule($job) {
  173. $job['scheduled'] = 0;
  174. if (!empty($job['crontab'])) {
  175. $crontab = new JobSchedulerCronTab($job['crontab']);
  176. $job['next'] = $crontab->nextTime($job['last']);
  177. }
  178. else {
  179. $job['next'] = $job['last'] + $job['period'];
  180. }
  181. if ($job['next']) {
  182. drupal_write_record('job_schedule', $job, array('item_id'));
  183. }
  184. else {
  185. // If no next time, it may mean it wont run again the next year (crontab)
  186. $this->remove($job);
  187. }
  188. }
  189. /**
  190. * Check whether a job exists in the queue and update its parameters if so
  191. */
  192. public function check($job) {
  193. $job += array('id' => 0, 'period' => 0, 'crontab' => '');
  194. $existing = db_select('job_schedule')
  195. ->fields('job_schedule')
  196. ->condition('name', $this->name)
  197. ->condition('type', $job['type'])
  198. ->condition('id', $job['id'])
  199. ->execute()
  200. ->fetchAssoc();
  201. // If existing, and changed period or crontab, we need to reschedule the job
  202. if ($existing) {
  203. if ($job['period'] != $existing['period'] || $job['crontab'] != $existing['crontab']) {
  204. $existing['period'] = $job['period'];
  205. $existing['crontab'] = $job['crontab'];
  206. $this->reschedule($existing);
  207. }
  208. return $existing;
  209. }
  210. }
  211. }