| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232 | <?php/** * @file * * Class for handling background processes. *//** * BackgroundProcess class. */class BackgroundProcess {  public $handle;  public $connection;  public $service_host;  public $service_group;  public $uid;  public static function load($process) {    $new = new BackgroundProcess($process->handle);    @$new->callback = $process->callback;    @$new->args = $process->args;    @$new->uid = $process->uid;    @$new->token = $process->token;    @$new->service_host = $process->service_host;    @$new->service_group = $process->service_group;    @$new->exec_status = $process->exec_status;    @$new->start_stamp = $process->start_stamp;    @$new->status = $process->exec_status;    @$new->start = $process->start_stamp;    return $new;  }  /**   * Constructor.   *   * @param type $handle   *   Handle to use. Optional; leave out for auto-handle.   */  public function __construct($handle = NULL) {    $this->handle = $handle ? $handle : background_process_generate_handle('auto');    $this->token = background_process_generate_handle('token');    $this->service_group = variable_get('background_process_default_service_group', 'default');  }  public function lock($status = BACKGROUND_PROCESS_STATUS_LOCKED) {    // Preliminary select to avoid unnecessary write-attempt    if (background_process_get_process($this->handle)) {      // watchdog('bg_process', 'Will not attempt to lock handle %handle, already exists', array('%handle' => $this->handle), WATCHDOG_NOTICE);      return FALSE;    }    // "Lock" handle    $this->start_stamp = $this->start = microtime(TRUE);    if (!background_process_lock_process($this->handle, $status)) {      // If this happens, we might have a race condition or an md5 clash      watchdog('bg_process', 'Could not lock handle %handle', array('%handle' => $this->handle), WATCHDOG_ERROR);      return FALSE;    }    $this->exec_status = $this->status = BACKGROUND_PROCESS_STATUS_LOCKED;    $this->sendMessage('locked');    return TRUE;  }  /**   * Start background process   *   * Calls the service handler through http passing function arguments as serialized data   * Be aware that the callback will run in a new request   *   * @global string $base_url   *   Base URL for this Drupal request   *   * @param $callback   *   Function to call.   * @param $args   *   Array containg arguments to pass on to the callback.   * @return mixed   *   TRUE on success, NULL on failure, FALSE on handle locked.   */  public function start($callback, $args = array()) {    if (!$this->lock()) {      return FALSE;    }    return $this->execute($callback, $args);  }  public function queue($callback, $args = array()) {    if (!$this->lock(BACKGROUND_PROCESS_STATUS_QUEUED)) {      return FALSE;    }    if (!background_process_set_process($this->handle, $callback, $this->uid, $args, $this->token)) {      // Could not update process      return NULL;    }    module_invoke_all('background_process_pre_execute', $this->handle, $callback, $args, $this->token);    // Initialize progress stats    $old_db = db_set_active('background_process');    progress_remove_progress($this->handle);    db_set_active($old_db);    $queues = variable_get('background_process_queues', array());    $queue_name = isset($queues[$callback]) ? 'bgp:' . $queues[$callback] : 'background_process';    $queue = DrupalQueue::get($queue_name);    $queue->createItem(array(rawurlencode($this->handle), rawurlencode($this->token)));    _background_process_ensure_cleanup($this->handle, TRUE);  }  public function determineServiceHost() {    // Validate explicitly selected service host    $service_hosts = background_process_get_service_hosts();    if ($this->service_host && empty($service_hosts[$this->service_host])) {      $this->service_host = variable_get('background_process_default_service_host', 'default');      if (empty($service_hosts[$this->service_host])) {        $this->service_host = NULL;      }    }    // Find service group if a service host is not explicitly specified.    if (!$this->service_host) {      if (!$this->service_group) {        $this->service_group = variable_get('background_process_default_service_group', 'default');      }      if ($this->service_group) {        $service_groups = variable_get('background_process_service_groups', array());        if (isset($service_groups[$this->service_group])) {          $service_group = $service_groups[$this->service_group];          // Default method if none is provided          $service_group += array(            'method' => 'background_process_service_group_round_robin'          );          if (is_callable($service_group['method'])) {            $this->service_host = call_user_func($service_group['method'], $service_group);            // Revalidate service host            if ($this->service_host && empty($service_hosts[$this->service_host])) {              $this->service_host = NULL;            }          }        }      }    }    // Fallback service host    if (!$this->service_host || empty($service_hosts[$this->service_host])) {      $this->service_host = variable_get('background_process_default_service_host', 'default');      if (empty($service_hosts[$this->service_host])) {        $this->service_host = 'default';      }    }    return $this->service_host;  }  public function execute($callback, $args = array()) {    if (!background_process_set_process($this->handle, $callback, $this->uid, $args, $this->token)) {      // Could not update process      return NULL;    }    module_invoke_all('background_process_pre_execute', $this->handle, $callback, $args, $this->token);    // Initialize progress stats    $old_db = db_set_active('background_process');    progress_remove_progress($this->handle);    db_set_active($old_db);    $this->connection = FALSE;    $this->determineServiceHost();    return $this->dispatch();  }  function dispatch() {    $this->sendMessage('dispatch');    $handle = rawurlencode($this->handle);    $token = rawurlencode($this->token);    list($url, $headers) = background_process_build_request('bgp-start/' . $handle . '/' . $token, $this->service_host);    background_process_set_service_host($this->handle, $this->service_host);    $options = array('method' => 'POST', 'headers' => $headers);    $result = background_process_http_request($url, $options);    if (empty($result->error)) {      $this->connection = $result->fp;      _background_process_ensure_cleanup($this->handle, TRUE);      return TRUE;    }    else {      background_process_remove_process($this->handle);      watchdog('bg_process', 'Could not call service %handle for callback %callback: %error', array('%handle' => $this->handle, '%callback' => $callback, '%error' => $result->error), WATCHDOG_ERROR);      // Throw exception here instead?      return NULL;    }    return FALSE;  }  function sendMessage($action) {    if (module_exists('nodejs')) {      if (!isset($this->progress_object)) {        if ($progress = progress_get_progress($this->handle)) {          $this->progress_object = $progress;          $this->progress = $progress->progress;          $this->progress_message = $progress->message;        }        else {          $this->progress = 0;          $this->progress_message = '';        }      }      $object = clone $this;      $message = (object) array(        'channel' => 'background_process',        'data' => (object) array(          'action' => $action,          'background_process' => $object,          'timestamp' => microtime(TRUE),        ),        'callback' => 'nodejsBackgroundProcess',      );      drupal_alter('background_process_message', $message);      nodejs_send_content_channel_message($message);    }  }}
 |