handle); @$new->callback = $process->callback; @$new->args = $process->args; @$new->uid = $process->uid; @$new->token = $process->token; @$new->service_host = $process->service_host; @$new->service_group = $process->service_group; @$new->exec_status = $process->exec_status; @$new->start_stamp = $process->start_stamp; @$new->status = $process->exec_status; @$new->start = $process->start_stamp; return $new; } /** * Constructor. * * @param type $handle * Handle to use. Optional; leave out for auto-handle. */ public function __construct($handle = NULL) { $this->handle = $handle ? $handle : background_process_generate_handle('auto'); $this->token = background_process_generate_handle('token'); $this->service_group = variable_get('background_process_default_service_group', 'default'); } public function lock($status = BACKGROUND_PROCESS_STATUS_LOCKED) { // Preliminary select to avoid unnecessary write-attempt if (background_process_get_process($this->handle)) { // watchdog('bg_process', 'Will not attempt to lock handle %handle, already exists', array('%handle' => $this->handle), WATCHDOG_NOTICE); return FALSE; } // "Lock" handle $this->start_stamp = $this->start = microtime(TRUE); if (!background_process_lock_process($this->handle, $status)) { // If this happens, we might have a race condition or an md5 clash watchdog('bg_process', 'Could not lock handle %handle', array('%handle' => $this->handle), WATCHDOG_ERROR); return FALSE; } $this->exec_status = $this->status = BACKGROUND_PROCESS_STATUS_LOCKED; $this->sendMessage('locked'); return TRUE; } /** * Start background process * * Calls the service handler through http passing function arguments as serialized data * Be aware that the callback will run in a new request * * @global string $base_url * Base URL for this Drupal request * * @param $callback * Function to call. * @param $args * Array containg arguments to pass on to the callback. * @return mixed * TRUE on success, NULL on failure, FALSE on handle locked. */ public function start($callback, $args = array()) { if (!$this->lock()) { return FALSE; } return $this->execute($callback, $args); } public function queue($callback, $args = array()) { if (!$this->lock(BACKGROUND_PROCESS_STATUS_QUEUED)) { return FALSE; } if (!background_process_set_process($this->handle, $callback, $this->uid, $args, $this->token)) { // Could not update process return NULL; } module_invoke_all('background_process_pre_execute', $this->handle, $callback, $args, $this->token); // Initialize progress stats $old_db = db_set_active('background_process'); progress_remove_progress($this->handle); db_set_active($old_db); $queues = variable_get('background_process_queues', array()); $queue_name = isset($queues[$callback]) ? 'bgp:' . $queues[$callback] : 'background_process'; $queue = DrupalQueue::get($queue_name); $queue->createItem(array(rawurlencode($this->handle), rawurlencode($this->token))); _background_process_ensure_cleanup($this->handle, TRUE); } public function determineServiceHost() { // Validate explicitly selected service host $service_hosts = background_process_get_service_hosts(); if ($this->service_host && empty($service_hosts[$this->service_host])) { $this->service_host = variable_get('background_process_default_service_host', 'default'); if (empty($service_hosts[$this->service_host])) { $this->service_host = NULL; } } // Find service group if a service host is not explicitly specified. if (!$this->service_host) { if (!$this->service_group) { $this->service_group = variable_get('background_process_default_service_group', 'default'); } if ($this->service_group) { $service_groups = variable_get('background_process_service_groups', array()); if (isset($service_groups[$this->service_group])) { $service_group = $service_groups[$this->service_group]; // Default method if none is provided $service_group += array( 'method' => 'background_process_service_group_round_robin' ); if (is_callable($service_group['method'])) { $this->service_host = call_user_func($service_group['method'], $service_group); // Revalidate service host if ($this->service_host && empty($service_hosts[$this->service_host])) { $this->service_host = NULL; } } } } } // Fallback service host if (!$this->service_host || empty($service_hosts[$this->service_host])) { $this->service_host = variable_get('background_process_default_service_host', 'default'); if (empty($service_hosts[$this->service_host])) { $this->service_host = 'default'; } } return $this->service_host; } public function execute($callback, $args = array()) { if (!background_process_set_process($this->handle, $callback, $this->uid, $args, $this->token)) { // Could not update process return NULL; } module_invoke_all('background_process_pre_execute', $this->handle, $callback, $args, $this->token); // Initialize progress stats $old_db = db_set_active('background_process'); progress_remove_progress($this->handle); db_set_active($old_db); $this->connection = FALSE; $this->determineServiceHost(); return $this->dispatch(); } function dispatch() { $this->sendMessage('dispatch'); $handle = rawurlencode($this->handle); $token = rawurlencode($this->token); list($url, $headers) = background_process_build_request('bgp-start/' . $handle . '/' . $token, $this->service_host); background_process_set_service_host($this->handle, $this->service_host); $options = array('method' => 'POST', 'headers' => $headers); $result = background_process_http_request($url, $options); if (empty($result->error)) { $this->connection = $result->fp; _background_process_ensure_cleanup($this->handle, TRUE); return TRUE; } else { background_process_remove_process($this->handle); watchdog('bg_process', 'Could not call service %handle for callback %callback: %error', array('%handle' => $this->handle, '%callback' => $callback, '%error' => $result->error), WATCHDOG_ERROR); // Throw exception here instead? return NULL; } return FALSE; } function sendMessage($action) { if (module_exists('nodejs')) { if (!isset($this->progress_object)) { if ($progress = progress_get_progress($this->handle)) { $this->progress_object = $progress; $this->progress = $progress->progress; $this->progress_message = $progress->message; } else { $this->progress = 0; $this->progress_message = ''; } } $object = clone $this; $message = (object) array( 'channel' => 'background_process', 'data' => (object) array( 'action' => $action, 'background_process' => $object, 'timestamp' => microtime(TRUE), ), 'callback' => 'nodejsBackgroundProcess', ); drupal_alter('background_process_message', $message); nodejs_send_content_channel_message($message); } } }