<?php
/**
 * @file
 *
 * Class for handling background processes.
 */

/**
 * BackgroundProcess class.
 */
class BackgroundProcess {
  public $handle;
  public $connection;
  public $service_host;
  public $service_group;
  public $uid;

  public static function load($process) {
    $new = new BackgroundProcess($process->handle);
    @$new->callback = $process->callback;
    @$new->args = $process->args;
    @$new->uid = $process->uid;
    @$new->token = $process->token;
    @$new->service_host = $process->service_host;
    @$new->service_group = $process->service_group;
    @$new->exec_status = $process->exec_status;
    @$new->start_stamp = $process->start_stamp;
    @$new->status = $process->exec_status;
    @$new->start = $process->start_stamp;
    return $new;
  }

  /**
   * Constructor.
   *
   * @param type $handle
   *   Handle to use. Optional; leave out for auto-handle.
   */
  public function __construct($handle = NULL) {
    $this->handle = $handle ? $handle : background_process_generate_handle('auto');
    $this->token = background_process_generate_handle('token');
    $this->service_group = variable_get('background_process_default_service_group', 'default');
  }

  public function lock($status = BACKGROUND_PROCESS_STATUS_LOCKED) {
    // Preliminary select to avoid unnecessary write-attempt
    if (background_process_get_process($this->handle)) {
      // watchdog('bg_process', 'Will not attempt to lock handle %handle, already exists', array('%handle' => $this->handle), WATCHDOG_NOTICE);
      return FALSE;
    }

    // "Lock" handle
    $this->start_stamp = $this->start = microtime(TRUE);
    if (!background_process_lock_process($this->handle, $status)) {
      // If this happens, we might have a race condition or an md5 clash
      watchdog('bg_process', 'Could not lock handle %handle', array('%handle' => $this->handle), WATCHDOG_ERROR);
      return FALSE;
    }
    $this->exec_status = $this->status = BACKGROUND_PROCESS_STATUS_LOCKED;
    $this->sendMessage('locked');
    return TRUE;
  }

  /**
   * Start background process
   *
   * Calls the service handler through http passing function arguments as serialized data
   * Be aware that the callback will run in a new request
   *
   * @global string $base_url
   *   Base URL for this Drupal request
   *
   * @param $callback
   *   Function to call.
   * @param $args
   *   Array containg arguments to pass on to the callback.
   * @return mixed
   *   TRUE on success, NULL on failure, FALSE on handle locked.
   */
  public function start($callback, $args = array()) {
    if (!$this->lock()) {
      return FALSE;
    }

    return $this->execute($callback, $args);
  }

  public function queue($callback, $args = array()) {
    if (!$this->lock(BACKGROUND_PROCESS_STATUS_QUEUED)) {
      return FALSE;
    }

    if (!background_process_set_process($this->handle, $callback, $this->uid, $args, $this->token)) {
      // Could not update process
      return NULL;
    }

    module_invoke_all('background_process_pre_execute', $this->handle, $callback, $args, $this->token);

    // Initialize progress stats
    $old_db = db_set_active('background_process');
    progress_remove_progress($this->handle);
    db_set_active($old_db);

    $queues = variable_get('background_process_queues', array());
    $queue_name = isset($queues[$callback]) ? 'bgp:' . $queues[$callback] : 'background_process';
    $queue = DrupalQueue::get($queue_name);
    $queue->createItem(array(rawurlencode($this->handle), rawurlencode($this->token)));
    _background_process_ensure_cleanup($this->handle, TRUE);
  }


  public function determineServiceHost() {
    // Validate explicitly selected service host
    $service_hosts = background_process_get_service_hosts();
    if ($this->service_host && empty($service_hosts[$this->service_host])) {
      $this->service_host = variable_get('background_process_default_service_host', 'default');
      if (empty($service_hosts[$this->service_host])) {
        $this->service_host = NULL;
      }
    }

    // Find service group if a service host is not explicitly specified.
    if (!$this->service_host) {
      if (!$this->service_group) {
        $this->service_group = variable_get('background_process_default_service_group', 'default');
      }
      if ($this->service_group) {
        $service_groups = variable_get('background_process_service_groups', array());
        if (isset($service_groups[$this->service_group])) {
          $service_group = $service_groups[$this->service_group];

          // Default method if none is provided
          $service_group += array(
            'method' => 'background_process_service_group_round_robin'
          );
          if (is_callable($service_group['method'])) {
            $this->service_host = call_user_func($service_group['method'], $service_group);
            // Revalidate service host
            if ($this->service_host && empty($service_hosts[$this->service_host])) {
              $this->service_host = NULL;
            }
          }
        }
      }
    }

    // Fallback service host
    if (!$this->service_host || empty($service_hosts[$this->service_host])) {
      $this->service_host = variable_get('background_process_default_service_host', 'default');
      if (empty($service_hosts[$this->service_host])) {
        $this->service_host = 'default';
      }
    }

    return $this->service_host;
  }

  public function execute($callback, $args = array()) {
    if (!background_process_set_process($this->handle, $callback, $this->uid, $args, $this->token)) {
      // Could not update process
      return NULL;
    }

    module_invoke_all('background_process_pre_execute', $this->handle, $callback, $args, $this->token);

    // Initialize progress stats
    $old_db = db_set_active('background_process');
    progress_remove_progress($this->handle);
    db_set_active($old_db);

    $this->connection = FALSE;

    $this->determineServiceHost();

    return $this->dispatch();
  }

  function dispatch() {
    $this->sendMessage('dispatch');

    $handle = rawurlencode($this->handle);
    $token = rawurlencode($this->token);
    list($url, $headers) = background_process_build_request('bgp-start/' . $handle . '/' . $token, $this->service_host);

    background_process_set_service_host($this->handle, $this->service_host);

    $options = array('method' => 'POST', 'headers' => $headers);
    $result = background_process_http_request($url, $options);
    if (empty($result->error)) {
      $this->connection = $result->fp;
      _background_process_ensure_cleanup($this->handle, TRUE);
      return TRUE;
    }
    else {
      background_process_remove_process($this->handle);
      watchdog('bg_process', 'Could not call service %handle for callback %callback: %error', array('%handle' => $this->handle, '%callback' => $callback, '%error' => $result->error), WATCHDOG_ERROR);
      // Throw exception here instead?
      return NULL;
    }
    return FALSE;
  }

  function sendMessage($action) {
    if (module_exists('nodejs')) {
      if (!isset($this->progress_object)) {
        if ($progress = progress_get_progress($this->handle)) {
          $this->progress_object = $progress;
          $this->progress = $progress->progress;
          $this->progress_message = $progress->message;
        }
        else {
          $this->progress = 0;
          $this->progress_message = '';
        }
      }
      $object = clone $this;
      $message = (object) array(
        'channel' => 'background_process',
        'data' => (object) array(
          'action' => $action,
          'background_process' => $object,
          'timestamp' => microtime(TRUE),
        ),
        'callback' => 'nodejsBackgroundProcess',
      );
      drupal_alter('background_process_message', $message);
      nodejs_send_content_channel_message($message);
    }
  }

}