BackgroundProcess.class.php 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232
  1. <?php
  2. /**
  3. * @file
  4. *
  5. * Class for handling background processes.
  6. */
  7. /**
  8. * BackgroundProcess class.
  9. */
  10. class BackgroundProcess {
  11. public $handle;
  12. public $connection;
  13. public $service_host;
  14. public $service_group;
  15. public $uid;
  16. public static function load($process) {
  17. $new = new BackgroundProcess($process->handle);
  18. @$new->callback = $process->callback;
  19. @$new->args = $process->args;
  20. @$new->uid = $process->uid;
  21. @$new->token = $process->token;
  22. @$new->service_host = $process->service_host;
  23. @$new->service_group = $process->service_group;
  24. @$new->exec_status = $process->exec_status;
  25. @$new->start_stamp = $process->start_stamp;
  26. @$new->status = $process->exec_status;
  27. @$new->start = $process->start_stamp;
  28. return $new;
  29. }
  30. /**
  31. * Constructor.
  32. *
  33. * @param type $handle
  34. * Handle to use. Optional; leave out for auto-handle.
  35. */
  36. public function __construct($handle = NULL) {
  37. $this->handle = $handle ? $handle : background_process_generate_handle('auto');
  38. $this->token = background_process_generate_handle('token');
  39. $this->service_group = variable_get('background_process_default_service_group', 'default');
  40. }
  41. public function lock($status = BACKGROUND_PROCESS_STATUS_LOCKED) {
  42. // Preliminary select to avoid unnecessary write-attempt
  43. if (background_process_get_process($this->handle)) {
  44. // watchdog('bg_process', 'Will not attempt to lock handle %handle, already exists', array('%handle' => $this->handle), WATCHDOG_NOTICE);
  45. return FALSE;
  46. }
  47. // "Lock" handle
  48. $this->start_stamp = $this->start = microtime(TRUE);
  49. if (!background_process_lock_process($this->handle, $status)) {
  50. // If this happens, we might have a race condition or an md5 clash
  51. watchdog('bg_process', 'Could not lock handle %handle', array('%handle' => $this->handle), WATCHDOG_ERROR);
  52. return FALSE;
  53. }
  54. $this->exec_status = $this->status = BACKGROUND_PROCESS_STATUS_LOCKED;
  55. $this->sendMessage('locked');
  56. return TRUE;
  57. }
  58. /**
  59. * Start background process
  60. *
  61. * Calls the service handler through http passing function arguments as serialized data
  62. * Be aware that the callback will run in a new request
  63. *
  64. * @global string $base_url
  65. * Base URL for this Drupal request
  66. *
  67. * @param $callback
  68. * Function to call.
  69. * @param $args
  70. * Array containg arguments to pass on to the callback.
  71. * @return mixed
  72. * TRUE on success, NULL on failure, FALSE on handle locked.
  73. */
  74. public function start($callback, $args = array()) {
  75. if (!$this->lock()) {
  76. return FALSE;
  77. }
  78. return $this->execute($callback, $args);
  79. }
  80. public function queue($callback, $args = array()) {
  81. if (!$this->lock(BACKGROUND_PROCESS_STATUS_QUEUED)) {
  82. return FALSE;
  83. }
  84. if (!background_process_set_process($this->handle, $callback, $this->uid, $args, $this->token)) {
  85. // Could not update process
  86. return NULL;
  87. }
  88. module_invoke_all('background_process_pre_execute', $this->handle, $callback, $args, $this->token);
  89. // Initialize progress stats
  90. $old_db = db_set_active('background_process');
  91. progress_remove_progress($this->handle);
  92. db_set_active($old_db);
  93. $queues = variable_get('background_process_queues', array());
  94. $queue_name = isset($queues[$callback]) ? 'bgp:' . $queues[$callback] : 'background_process';
  95. $queue = DrupalQueue::get($queue_name);
  96. $queue->createItem(array(rawurlencode($this->handle), rawurlencode($this->token)));
  97. _background_process_ensure_cleanup($this->handle, TRUE);
  98. }
  99. public function determineServiceHost() {
  100. // Validate explicitly selected service host
  101. $service_hosts = background_process_get_service_hosts();
  102. if ($this->service_host && empty($service_hosts[$this->service_host])) {
  103. $this->service_host = variable_get('background_process_default_service_host', 'default');
  104. if (empty($service_hosts[$this->service_host])) {
  105. $this->service_host = NULL;
  106. }
  107. }
  108. // Find service group if a service host is not explicitly specified.
  109. if (!$this->service_host) {
  110. if (!$this->service_group) {
  111. $this->service_group = variable_get('background_process_default_service_group', 'default');
  112. }
  113. if ($this->service_group) {
  114. $service_groups = variable_get('background_process_service_groups', array());
  115. if (isset($service_groups[$this->service_group])) {
  116. $service_group = $service_groups[$this->service_group];
  117. // Default method if none is provided
  118. $service_group += array(
  119. 'method' => 'background_process_service_group_round_robin'
  120. );
  121. if (is_callable($service_group['method'])) {
  122. $this->service_host = call_user_func($service_group['method'], $service_group);
  123. // Revalidate service host
  124. if ($this->service_host && empty($service_hosts[$this->service_host])) {
  125. $this->service_host = NULL;
  126. }
  127. }
  128. }
  129. }
  130. }
  131. // Fallback service host
  132. if (!$this->service_host || empty($service_hosts[$this->service_host])) {
  133. $this->service_host = variable_get('background_process_default_service_host', 'default');
  134. if (empty($service_hosts[$this->service_host])) {
  135. $this->service_host = 'default';
  136. }
  137. }
  138. return $this->service_host;
  139. }
  140. public function execute($callback, $args = array()) {
  141. if (!background_process_set_process($this->handle, $callback, $this->uid, $args, $this->token)) {
  142. // Could not update process
  143. return NULL;
  144. }
  145. module_invoke_all('background_process_pre_execute', $this->handle, $callback, $args, $this->token);
  146. // Initialize progress stats
  147. $old_db = db_set_active('background_process');
  148. progress_remove_progress($this->handle);
  149. db_set_active($old_db);
  150. $this->connection = FALSE;
  151. $this->determineServiceHost();
  152. return $this->dispatch();
  153. }
  154. function dispatch() {
  155. $this->sendMessage('dispatch');
  156. $handle = rawurlencode($this->handle);
  157. $token = rawurlencode($this->token);
  158. list($url, $headers) = background_process_build_request('bgp-start/' . $handle . '/' . $token, $this->service_host);
  159. background_process_set_service_host($this->handle, $this->service_host);
  160. $options = array('method' => 'POST', 'headers' => $headers);
  161. $result = background_process_http_request($url, $options);
  162. if (empty($result->error)) {
  163. $this->connection = $result->fp;
  164. _background_process_ensure_cleanup($this->handle, TRUE);
  165. return TRUE;
  166. }
  167. else {
  168. background_process_remove_process($this->handle);
  169. watchdog('bg_process', 'Could not call service %handle for callback %callback: %error', array('%handle' => $this->handle, '%callback' => $callback, '%error' => $result->error), WATCHDOG_ERROR);
  170. // Throw exception here instead?
  171. return NULL;
  172. }
  173. return FALSE;
  174. }
  175. function sendMessage($action) {
  176. if (module_exists('nodejs')) {
  177. if (!isset($this->progress_object)) {
  178. if ($progress = progress_get_progress($this->handle)) {
  179. $this->progress_object = $progress;
  180. $this->progress = $progress->progress;
  181. $this->progress_message = $progress->message;
  182. }
  183. else {
  184. $this->progress = 0;
  185. $this->progress_message = '';
  186. }
  187. }
  188. $object = clone $this;
  189. $message = (object) array(
  190. 'channel' => 'background_process',
  191. 'data' => (object) array(
  192. 'action' => $action,
  193. 'background_process' => $object,
  194. 'timestamp' => microtime(TRUE),
  195. ),
  196. 'callback' => 'nodejsBackgroundProcess',
  197. );
  198. drupal_alter('background_process_message', $message);
  199. nodejs_send_content_channel_message($message);
  200. }
  201. }
  202. }