MENU_CALLBACK, 'title' => 'Run background process', 'description' => 'Run background process', 'page callback' => 'background_process_service_start', 'page arguments' => array(1), 'access callback' => 'background_process_service_access', 'access arguments' => array(1, 2), ); $items['background-process/unlock/%'] = array( 'type' => MENU_CALLBACK, 'title' => 'Unlock background process', 'description' => 'Unlock background process', 'page callback' => 'background_process_service_unlock', 'page arguments' => array(2), 'access arguments' => array('administer background process'), 'file' => 'background_process.admin.inc', ); $items['background-process/check-token'] = array( 'type' => MENU_CALLBACK, 'title' => 'Check background process token', 'description' => 'Check background process token', 'page callback' => 'background_process_check_token', 'page arguments' => array(2), 'access callback' => TRUE, 'file' => 'background_process.pages.inc', ); $items['admin/config/system/background-process/settings'] = array( 'type' => MENU_DEFAULT_LOCAL_TASK, 'title' => 'Settings', 'weight' => 1, ); $items['admin/config/system/background-process'] = array( 'title' => 'Background process', 'description' => 'Administer background processes', 'page callback' => 'drupal_get_form', 'page arguments' => array('background_process_settings_form'), 'access arguments' => array('administer background process'), 'file' => 'background_process.admin.inc', ); $items['admin/config/system/background-process/overview'] = array( 'type' => MENU_LOCAL_TASK, 'title' => 'Overview', 'description' => 'Administer background processes', 'page callback' => 'background_process_overview_page', 'access arguments' => array('administer background process'), 'file' => 'background_process.admin.inc', 'weight' => 3, ); return $items; } /** * Implements hook_permission(). */ function background_process_permission() { return array( 'administer background process' => array( 'title' => t('Administer background processes'), 'description' => t('Perform administration tasks for background processes.'), ), ); } /** * Implements hook_cron(). */ function background_process_cron() { // Don't use more than 120 seconds to unlock $expire = 120; @set_time_limit($expire); // Cleanup old handles $time = time(); $msg = t('Never started (auto unlock due to timeout)'); do { if (time() >= $_SERVER['REQUEST_TIME'] + $expire) { break; } $result = db_query_range("SELECT handle, start_stamp FROM {background_process} WHERE start_stamp < :start AND exec_status = :status", 0, 10, array( ':start' => $time - variable_get('background_process_cleanup_age', BACKGROUND_PROCESS_CLEANUP_AGE), ':status' => BACKGROUND_PROCESS_STATUS_LOCKED )); $handles = $result->fetchAllAssoc('handle', PDO::FETCH_ASSOC); foreach ($handles as $handle => $process) { // Unlock the process if (background_process_unlock($handle, $msg, $process['start_stamp'])) { drupal_set_message(t("%handle unlocked: !msg", array('%handle' => $handle, '!msg' => $msg))); } else { drupal_set_message(t("%handle could not be unlocked: !msg", array('%handle' => $handle, '!msg' => $msg)), 'error'); } } } while (!empty($handles)); // Cleanup stale requests $time = time(); $msg = t('Never finished (auto unlock due to long run)'); do { if (time() >= $_SERVER['REQUEST_TIME'] + $expire) { break; } $result = db_query_range("SELECT handle, start_stamp FROM {background_process} WHERE start_stamp < :start AND exec_status = :status", 0, 10, array( ':start' => $time - variable_get('background_process_cleanup_age_running', BACKGROUND_PROCESS_CLEANUP_AGE_RUNNING), ':status' => BACKGROUND_PROCESS_STATUS_RUNNING )); $handles = $result->fetchAllAssoc('handle', PDO::FETCH_ASSOC); foreach ($handles as $handle => $process) { // Unlock the process if (background_process_unlock($handle, $msg, $process['start_stamp'])) { drupal_set_message(t("%handle unlocked: !msg", array('%handle' => $handle, '!msg' => $msg))); } else { drupal_set_message(t("%handle could not be unlocked: !msg", array('%handle' => $handle, '!msg' => $msg)), 'error'); } } } while (!empty($results)); // Cleanup queued requests that were never processed $time = time(); $msg = t('Never picked up by cron worker (auto unlock due to timeout)'); do { if (time() >= $_SERVER['REQUEST_TIME'] + $expire) { break; } $result = db_query_range("SELECT handle, start_stamp FROM {background_process} WHERE start_stamp < :start AND exec_status = :status", 0, 10, array( ':start' => $time - variable_get('background_process_cleanup_age_queue', BACKGROUND_PROCESS_CLEANUP_AGE_QUEUE), ':status' => BACKGROUND_PROCESS_STATUS_QUEUED )); $handles = $result->fetchAllAssoc('handle', PDO::FETCH_ASSOC); foreach ($handles as $handle => $process) { // Unlock the process if (background_process_unlock($handle, $msg, $process['start_stamp'])) { drupal_set_message(t("%handle unlocked: !msg", array('%handle' => $handle, '!msg' => $msg))); } else { drupal_set_message(t("%handle could not be unlocked: !msg", array('%handle' => $handle, '!msg' => $msg)), 'error'); } } } while (!empty($results)); } /** * Implements hook_cron_alter(). */ function background_process_cron_alter(&$items) { $items['background_process_cron']['override_congestion_protection'] = TRUE; // Unlock background if too old. // @todo Move to some access handler or pre-execute? if ($process = background_process_get_process('uc:background_process_cron')) { if ($process->start + 30 < time()) { background_process_unlock($process->handle, t('Self unlocking stale lock'), $process->start); } } } /** * Implements hook_cronapi(). */ function background_process_cronapi($op, $job = NULL) { switch ($op) { case 'list': return array('background_process_cron' => t('Cleanup old process handles')); case 'rule': return '* * * * *'; case 'configure': return 'admin/config/system/background-process'; } } /** * Implements hook_service_group(). * * Default load balancing using random. */ function background_process_service_group() { $info = array(); $info['methods']['background_process_service_group_random'] = t('Random'); $info['methods']['background_process_service_group_round_robin'] = t('Pseudo round-robin'); return $info; } /** * Load balancing based on random pick. */ function background_process_service_group_random($service_group) { return $service_group['hosts'][rand(0, count($service_group['hosts']) - 1)]; } /** * Round-robin load balancing based on random pick. */ function background_process_service_group_round_robin($service_group) { static $idx = NULL; if (isset($idx)) { $idx = ($idx + 1) % count($service_group['hosts']); } else { $idx = rand(0, count($service_group['hosts']) - 1); } return $service_group['hosts'][$idx]; } /** * Access handler for service call */ function background_process_service_access($handle, $token) { // Setup service ignore_user_abort(TRUE); // Damn those slashes! $handle = rawurldecode($handle); $token = rawurldecode($token); // Ensure no session! drupal_save_session(FALSE); unset($_SESSION); $process = background_process_get_process($handle); if (!$process) { watchdog('bg_process', 'Unknown process: %handle', array('%handle' => $handle)); return FALSE; } if ($token !== $process->token) { watchdog('bg_process', 'Invalid token: %token for handle: %handle', array('%token' => $token, '%handle' => $handle)); return FALSE; } // Login as the user that requested the call if ($process->uid) { global $user; $user = user_load($process->uid); if (!$user) { // Invalid user! return FALSE; } } else { $user = drupal_anonymous_user(); } return TRUE; } /** * Implements hook_init(). */ function background_process_init() { // Only determine if we're told to do so if (empty($_SESSION['background_process_determine_default_service_host'])) { return; } // Don't determine on check-token page, to avoid infinite loop if ($_GET['q'] == 'background-process/check-token') { return; } // Only determine when installation of site is finished if (variable_get('install_task', FALSE) != 'done') { return; } // Determine the default service host background_process_determine_and_save_default_service_host(); unset($_SESSION['background_process_determine_default_service_host']); } /** * Implements hook_cron_queue_info(). */ function background_process_cron_queue_info() { $queues['background_process'] = array( 'worker callback' => '_background_process_queue', ); $background_process_queues = variable_get('background_process_queues', array()); foreach ($background_process_queues as $queue_name) { $queues['bgp:' . $queue_name] = array( 'worker callback' => '_background_process_queue', ); } return $queues; } // ---------- HELPER FUNCTIONS ---------- /** * Worker callback for processing queued function call */ function _background_process_queue($item) { $oldhandle = background_process_current_handle(); list ($handle, $token) = $item; if (background_process_service_access($handle, $token)) { try { background_process_service_execute(rawurldecode($handle), TRUE); background_process_current_handle($oldhandle); } catch (Exception $e) { background_process_current_handle($oldhandle); background_process_update_status(rawurldecode($handle), BACKGROUND_PROCESS_STATUS_QUEUED); throw $e; } } } /** * Get/set current handle. * * @staticvar $current_handle * @param $handle * @return string * Current handle. */ function background_process_current_handle($handle = NULL) { static $current_handle = NULL; if (isset($handle)) { $current_handle = $handle; } return $current_handle; } /** * Get a unique handle based on a callback. * * @param $callback * Function to generate handle from. * @return string * Handle. */ function background_process_generate_handle($callback) { return md5(serialize($callback) . ':' . microtime(TRUE) . ':' . rand(1, 5000)); } /** * Start background process * * Calls the service handler through http passing function arguments as serialized data * Be aware that the callback will run in a new request * * @global string $base_url * Base URL for this Drupal request * * @param $callback * Function to call * @param $var [, $... ]] * Arbitrary number of variables to pass on to the callback * @return mixed * Handle on success, FALSE on failure */ function background_process_start($callback /* [, $var [, $... ]] */) { $process = new BackgroundProcess(); $args = func_get_args(); array_splice($args, 0, 1); $result = $process->start($callback, $args); return $result ? $process->handle : $result; } /** * Start locked background process * * Calls the service handler through http passing function arguments as serialized data * Be aware that the callback will run in a new request * * @global string $base_url * Base URL for this Drupal request * * @param $handle * Handle to give background process * @param $callback * Function to call * @param $var [, $... ]] * Arbitrary number of variables to pass on to the callback * @return mixed * Handle on success, FALSE on failure */ function background_process_start_locked($handle, $callback /* [, $var [, $... ]] */) { $process = new BackgroundProcess($handle); $args = func_get_args(); array_splice($args, 0, 2); $result = $process->start($callback, $args); return $result ? $process->handle : $result; } /** * Start background process * * Queue the function call passing function arguments as serialized data * Be aware that the callback will run in a new request * * @global string $base_url * Base URL for this Drupal request * * @param $callback * Function to call * @param $var [, $... ]] * Arbitrary number of variables to pass on to the callback * @return mixed * Handle on success, FALSE on failure */ function background_process_queue($callback /* [, $var [, $... ]] */) { $process = new BackgroundProcess(); $args = func_get_args(); array_splice($args, 0, 1); return $process->queue($callback, $args); } /** * Queue locked background process * * Queue the function call passing function arguments as serialized data * Be aware that the callback will run in a new request * * @global string $base_url * Base URL for this Drupal request * * @param $handle * Handle to give background process * @param $callback * Function to call * @param $var [, $... ]] * Arbitrary number of variables to pass on to the callback * @return mixed * Handle on success, FALSE on failure */ function background_process_queue_locked($handle, $callback /* [, $var [, $... ]] */) { $process = new BackgroundProcess($handle); $args = func_get_args(); array_splice($args, 0, 2); return $process->queue($callback, $args); } /** * Cleanup cache menu and ensure all locks are released (again). */ function _background_process_cleanup_menu($cid) { cache_clear_all($cid, 'cache_menu'); // Release locks in case cache_clear_all() set's a lock and lock_release_all() // has already been run. lock_release_all(); } /** * Call the function requested by the service call * * @param $handle * Raw URL encoded handle of process * @param $return * Whether or not the function should return or exit. */ function background_process_service_start($handle, $return = FALSE) { drupal_add_http_header('Content-Type', 'text/plain'); // Let's clean up the mess the menu-router system leaves behind. $cid = 'menu_item:' . hash('sha256', $_GET['q']); drupal_register_shutdown_function('_background_process_cleanup_menu', $cid); // Setup service ignore_user_abort(TRUE); @set_time_limit(variable_get('background_process_service_timeout', BACKGROUND_PROCESS_SERVICE_TIMEOUT)); // Damn those slashes! $handle = rawurldecode($handle); return background_process_service_execute($handle, $return); } /** * Execute the service * * @param $handle * Handle of process * @param $return * Whether or not the function should return or exit. */ function background_process_service_execute($handle, $return = FALSE) { // @todo Add static caching? We've already loaded this previously in the access handler $process = background_process_get_process($handle); if (!$process) { watchdog('bg_process', 'Process not found for handle: %handle', array('%handle' => $handle), WATCHDOG_ERROR); if ($return) { return; } else { exit; } } $process->start_stamp = microtime(TRUE); try { $old_db = db_set_active('background_process'); $claimed = db_update('background_process') ->fields(array( 'start_stamp' => sprintf("%.06f", $process->start_stamp), 'exec_status' => BACKGROUND_PROCESS_STATUS_RUNNING, )) ->condition('handle', $handle) ->condition('exec_status', array(BACKGROUND_PROCESS_STATUS_LOCKED, BACKGROUND_PROCESS_STATUS_QUEUED), 'IN') ->execute(); db_set_active($old_db); if ($claimed) { $process->exec_status = BACKGROUND_PROCESS_STATUS_RUNNING; $process = BackgroundProcess::load($process); $process->sendMessage('claimed'); background_process_current_handle($handle); } else { if ($return) { return; } else { exit; } } } catch (Exception $e) { db_set_active($old_db); throw $e; } // Make sure the process is removed when we're done if (!$return) { drupal_register_shutdown_function('background_process_remove_process', $process->handle, $process->start_stamp); } if (is_callable($process->callback)) { $old_db = NULL; try { if (!$return) { drupal_register_shutdown_function('module_invoke_all', 'background_process_shutdown', $process); } $callback = _background_process_callback_name($process->callback); $old_db = db_set_active('background_process'); progress_initialize_progress($handle, "Background process '$callback' initialized"); db_set_active($old_db); call_user_func_array($process->callback, $process->args); $old_db = db_set_active('background_process'); progress_end_progress($handle, "Background process '$callback' finished"); db_set_active($old_db); if ($return) { background_process_remove_process($process->handle, $process->start_stamp); module_invoke_all('background_process_shutdown', $process); } } catch (Exception $e) { // Exception occurred, switch back to proper db if necessary // and inform shutdown handlers. if ($old_db) { db_set_active($old_db); } if (!$return) { module_invoke_all('background_process_shutdown', $process, (string) $e); } throw $e; } } else { // Function not found watchdog('bg_process', 'Callback: %callback not found', array('%callback' => $process->callback), WATCHDOG_ERROR); } if ($return) { return; } else { exit; } } /** * Restart the current background process * * @return * Exception on fail, otherwise exit */ function background_process_restart() { $args = func_get_args(); call_user_func_array('background_process_keepalive', $args); exit; } /** * Keep the current background process alive * (meaning restart it when it exits) */ function background_process_keepalive() { $args = func_get_args(); $handle = background_process_current_handle(); if (!$handle) { throw new Exception(t('Background process handle %handle not found', array('%handle' => $handle))); } $process = background_process_get_process($handle); if (!$process) { throw new Exception(t('Background process %handle not found', array('%handle' => $handle))); } drupal_register_shutdown_function('_background_process_restart', $process, $args); } /** * Check if the background process has started * * @param $handle * Handle of openend background process * @return boolean * TRUE if started, FALSE if not */ function background_process_is_started($handle) { $old_db = db_set_active('background_process'); $progress = progress_get_progress($handle); db_set_active($old_db); return !empty($progress); } /** * Check if the background process has finished * * @param $handle * Handle of openend background process * @return boolean * TRUE if finished, FALSE if not */ function background_process_is_finished($handle) { $old_db = db_set_active('background_process'); $progress = progress_get_progress($handle); db_set_active($old_db); return (empty($progress) || $progress->end); } /** * Set background process * * @global object $user * Current logged in user * * @param $handle * Handle of background process * @param $callback * Function of background process * @return boolean * TRUE if set, FALSE if not */ function background_process_set_process($handle, $callback, $uid, $args, $token) { // Setup parameters $args = serialize($args); $callback = serialize($callback); // Get user if (!isset($uid)) { global $user; $uid = $user->uid; } try { $old_db = db_set_active('background_process'); $result = db_update('background_process') ->fields(array( 'callback' => $callback, 'args' => $args, 'uid' => $uid, 'token' => $token )) ->condition('handle', $handle) ->execute(); db_set_active($old_db); return $result; } catch (Exception $e) { db_set_active($old_db); throw $e; } } /** * Lock process * * @param $handle * Handle of background process * @return boolean * TRUE if locked, FALSE if lock could not be obtained */ function background_process_lock_process($handle, $status = BACKGROUND_PROCESS_STATUS_LOCKED) { try { $old_db = db_set_active('background_process'); db_insert('background_process') ->fields(array( 'handle' => $handle, 'start_stamp' => sprintf("%.06f", microtime(TRUE)), 'exec_status' => $status, )) ->execute(); db_set_active($old_db); _background_process_ensure_cleanup($handle); return TRUE; } catch (Exception $e) { db_set_active($old_db); return FALSE; } } /** * Set status for background process */ function background_process_update_status($handle, $status) { db_update('background_process') ->fields(array('exec_status' => $status)) ->condition('handle', $handle) ->execute(); } /** * Get background process * * @param $handle * Handle of background process * @return object * Background process */ function background_process_get_process($handle) { try { $old_db = db_set_active('background_process'); $result = db_select('background_process', 'bp') ->fields('bp', array('handle', 'callback', 'args', 'uid', 'token', 'service_host', 'start_stamp', 'exec_status')) ->condition('handle', $handle) ->execute() ->fetchObject(); db_set_active($old_db); } catch (Exception $e) { db_set_active($old_db); throw $e; } if ($result) { $result->args = unserialize($result->args); $result->callback = unserialize($result->callback); $result->start = $result->start_stamp; $result->status = $result->exec_status; return $result; } return FALSE; } /** * Get background process * * @param $handle * Handle of background process * @return object * Background process */ function background_process_get_processes($status = NULL) { $old_db = db_set_active('background_process'); $result = db_select('background_process', 'bp') ->fields('bp', array('handle', 'callback', 'args', 'uid', 'token', 'service_host', 'start_stamp', 'exec_status')); if (isset($status)) { $result = $result->condition('bp.status', $status); } $result = $result->execute(); $processes = array(); while ($process = $result->fetchObject()) { $process->args = unserialize($process->args); $process->callback = unserialize($process->callback); $process->start = $process->start_stamp; $process->status = $process->exec_status; $processes[] = $process; } db_set_active($old_db); return $processes; } /** * Remove a background process * * @param $handle * Handle of background process * @return mixed * Number of handles deleted on success, FALSE on failure. */ function background_process_remove_process($handle, $start = NULL) { $old_db = db_set_active('background_process'); if (isset($start)) { $result = db_delete('background_process') ->condition('handle', $handle) ->condition('start_stamp', sprintf("%.06f", $start), '=') ->execute(); } else { $result = db_delete('background_process') ->condition('handle', $handle) ->execute(); } db_set_active($old_db); return $result; } /** * Unlock background process. * * @param $handle * Handle of process to unlock */ function background_process_unlock($handle, $msg = NULL, $start = NULL) { $process = background_process_get_process($handle); if ($process && (!isset($start) || $start === $process->start)) { // Unlock the process if (background_process_remove_process($process->handle, $process->start)) { global $user; module_invoke_all('background_process_shutdown', $process, $msg ? $msg : t('Manually unlocked by !name', array('!name' => $user->name))); return TRUE; } } return FALSE; } /** * Set a service host for a background process. * * @param $handle * Background process handle * @param $service_host * Name of service host */ function background_process_set_service_host($handle, $service_host) { try { $old_db = db_set_active('background_process'); $result = db_update('background_process') ->fields(array( 'service_host' => $service_host ? $service_host : '', )) ->condition('handle', $handle) ->execute(); db_set_active($old_db); return $result; } catch (Exception $e) { db_set_active($old_db); throw $e; } } /** * Get service hosts defined in the system. */ function background_process_get_service_hosts() { global $base_url; $service_hosts = variable_get('background_process_service_hosts', array()); $service_hosts += variable_get('background_process_derived_default_host', array( 'default' => array( 'base_url' => $base_url ) )); return $service_hosts; } /** * Get service hosts defined in the system. */ function background_process_get_service_groups() { $service_groups = variable_get('background_process_service_groups', array()); $service_groups += array( 'default' => array( 'hosts' => array(variable_get('background_process_default_service_host', 'default')), ), ); foreach ($service_groups as &$service_group) { $service_group += array( 'method' => 'background_process_service_group_round_robin' ); } return $service_groups; } /** * Determine host for current installation. * Host is determined in the following order: * * * * @return array * Array of service host parameters for the default service host. FALSE if none could be determined. */ function background_process_determine_default_service_host() { $token = md5(session_id() . md5(uniqid(mt_rand(), TRUE)) . md5(uniqid(mt_rand(), TRUE))); variable_set('background_process_token', $token); global $conf; $auth = isset($_SERVER['PHP_AUTH_USER']) ? $_SERVER['PHP_AUTH_USER'] . ':' . $_SERVER['PHP_AUTH_PW'] . '@' : ''; $scheme = isset($_SERVER['HTTPS']) && $_SERVER['HTTPS'] == 'on' ? 'https://' : 'http://'; global $base_url; $url = parse_url($base_url); $path = empty($url['path']) ? '' : $url['path']; $candidates = array( array('base_url' => $base_url), array('base_url' => $scheme . $_SERVER['SERVER_NAME'] . ':' . $_SERVER['SERVER_PORT'] . $path, 'http_host' => $_SERVER['HTTP_HOST']), array('base_url' => $scheme . '127.0.0.1:' . $_SERVER['SERVER_PORT'] . $path, 'http_host' => $_SERVER['HTTP_HOST']), array('base_url' => $scheme . (!array_key_exists('SERVER_ADDR', $_SERVER) ? $_SERVER['LOCAL_ADDR'] : $_SERVER['SERVER_ADDR']) . ':' . $_SERVER['SERVER_PORT'] . $path, 'http_host' => $_SERVER['HTTP_HOST']), array('base_url' => $scheme . $auth . $_SERVER['SERVER_NAME'] . ':' . $_SERVER['SERVER_PORT'] . $path, 'http_host' => $_SERVER['HTTP_HOST']), array('base_url' => $scheme . $auth . '127.0.0.1:' . $_SERVER['SERVER_PORT'] . $path, 'http_host' => $_SERVER['HTTP_HOST']), array('base_url' => $scheme . $auth . (!array_key_exists('SERVER_ADDR', $_SERVER) ? $_SERVER['LOCAL_ADDR'] : $_SERVER['SERVER_ADDR']) . ':' . $_SERVER['SERVER_PORT'] . $path, 'http_host' => $_SERVER['HTTP_HOST']), ); $found = NULL; foreach ($candidates as $i => $candidate) { $conf['background_process_service_hosts']['__test'] = $candidate; list($url, $headers) = background_process_build_request('background-process/check-token', '__test'); if (empty($results[$url])) { $results[$url] = background_process_http_request($url, array('headers' => $headers, 'postpone' => TRUE, 'candidate' => $i, 'method' => 'POST')); } } background_process_http_request_process($results); foreach ($results as $result) { if ($result->code == 200) { if ($token === substr($result->data, 0, strlen($token))) { $found = $candidates[$result->options['candidate']]; break; } } } if ($found) { return $found; } return FALSE; } /** * Build url and headers for http request * @param $url * Relative url for the request * @param $service_hostname * Name of service host, e.g. 'default' * @return array * array(url, headers) */ function background_process_build_request($url, $service_hostname = NULL, $options = array()) { $service_hosts = background_process_get_service_hosts(); if (!$service_hostname || empty($service_hosts[$service_hostname])) { $service_hostname = 'default'; } $service_host = $service_hosts[$service_hostname]; $options += array( 'absolute' => TRUE, 'base_url' => $service_host['base_url'], ); $url = url($url, $options); $parsed = parse_url($url); $host = !empty($service_host['http_host']) ? $service_host['http_host'] : (isset($parsed['host']) ? $parsed['host'] : NULL); $headers = _background_process_request_headers(); $headers = _background_process_filter_headers($headers); $headers['Host'] = $host; $headers['Connection'] = 'Close'; if (isset($parsed['user'])) { $headers['Authorization'] = 'Basic ' . base64_encode($parsed['user'] . ':' . $parsed['pass']); } return array($url, $headers); } /** * Transform header array from key/value to strings. * @param $headers * array of headers in key/value style * @return array * array of strings. */ function background_process_build_headers($headers) { $header = array(); foreach ($headers as $key => $value) { $header[] = "$key: $value"; } return $header; } /** * Perform an http request. * @see drupal_http_request() */ function background_process_http_request($url, array $options = array()) { // Parse the URL and make sure we can handle the schema. $result = new stdClass(); $result->url = $url; $result->options = $options; $result->code = NULL; $uri = @parse_url($url); $result->uri = $uri; if ($uri == FALSE) { $result->error = 'unable to parse URL'; $result->code = -1001; return _background_process_http_request_result($result); } if (!isset($uri['scheme'])) { $result->error = 'missing schema'; $result->code = -1002; return _background_process_http_request_result($result); } // Merge the default options. $options += array( 'headers' => array(), 'method' => 'GET', 'data' => NULL, 'max_redirects' => 3, 'timeout' => variable_get('background_process_connection_timeout', BACKGROUND_PROCESS_CONNECTION_TIMEOUT), 'context' => NULL, 'blocking' => FALSE, 'postpone' => FALSE, ); // stream_socket_client() requires timeout to be a float. $options['timeout'] = (float) $options['timeout']; $host = NULL; switch ($uri['scheme']) { case 'http': case 'feed': $port = isset($uri['port']) ? $uri['port'] : 80; $socket = 'tcp://' . $uri['host'] . ':' . $port; // RFC 2616: "non-standard ports MUST, default ports MAY be included". // We don't add the standard port to prevent from breaking rewrite rules // checking the host that do not take into account the port number. $host = $uri['host'] . ($port != 80 ? ':' . $port : ''); break; case 'https': // Note: Only works when PHP is compiled with OpenSSL support. $port = isset($uri['port']) ? $uri['port'] : 443; $socket = 'ssl://' . $uri['host'] . ':' . $port; $host = $uri['host'] . ($port != 443 ? ':' . $port : ''); break; default: $result->error = 'invalid schema ' . $uri['scheme']; $result->code = -1003; return _background_process_http_request_result($result); } if (!empty($host) && empty($options['headers']['Host'])) { $options['headers']['Host'] = $host; } $result->options = $options; $result->socket = $socket; $result->postponed = $options['postpone']; if ($result->postponed) { return $result; } else { return background_process_http_request_initiate($result); } } /** * Initiate the http request. */ function background_process_http_request_initiate(&$result) { timer_start(__FUNCTION__); $options = $result->options; $socket = $result->socket; $uri = $result->uri; $result->start = microtime(TRUE); $result->data_ready = TRUE; if (empty($options['context'])) { $fp = @stream_socket_client($socket, $errno, $errstr, $options['timeout']); } else { // Create a stream with context. Allows verification of a SSL certificate. $fp = @stream_socket_client($socket, $errno, $errstr, $options['timeout'], STREAM_CLIENT_CONNECT, $options['context']); } // Make sure the socket opened properly. if (!$fp) { // When a network error occurs, we use a negative number so it does not // clash with the HTTP status codes. $result->code = -$errno; $result->error = trim($errstr) ? trim($errstr) : t('Error opening socket @socket', array('@socket' => $socket)); // Mark that this request failed. This will trigger a check of the web // server's ability to make outgoing HTTP requests the next time that // requirements checking is performed. // See system_requirements() // @fixme Disabled for Background Process // variable_set('drupal_http_request_fails', TRUE); return _background_process_http_request_result($result); } $result->fp = $fp; // Construct the path to act on. $path = isset($uri['path']) ? $uri['path'] : '/'; if (isset($uri['query'])) { $path .= '?' . $uri['query']; } // Merge the default headers. $options['headers'] += array( 'User-Agent' => 'Drupal (+http://drupal.org/)', ); // Only add Content-Length if we actually have any content or if it is a POST // or PUT request. Some non-standard servers get confused by Content-Length in // at least HEAD/GET requests, and Squid always requires Content-Length in // POST/PUT requests. $content_length = strlen($options['data']); if ($content_length > 0 || $options['method'] == 'POST' || $options['method'] == 'PUT') { $options['headers']['Content-Length'] = $content_length; } // If the server URL has a user then attempt to use basic authentication. if (isset($uri['user'])) { $options['headers']['Authorization'] = 'Basic ' . base64_encode($uri['user'] . (isset($uri['pass']) ? ':' . $uri['pass'] : '')); } // If the database prefix is being used by SimpleTest to run the tests in a copied // database then set the user-agent header to the database prefix so that any // calls to other Drupal pages will run the SimpleTest prefixed database. The // user-agent is used to ensure that multiple testing sessions running at the // same time won't interfere with each other as they would if the database // prefix were stored statically in a file or database variable. $test_info = &$GLOBALS['drupal_test_info']; if (!empty($test_info['test_run_id'])) { $options['headers']['User-Agent'] = drupal_generate_test_ua($test_info['test_run_id']); } $request = $options['method'] . ' ' . $path . " HTTP/1.0\r\n"; foreach ($options['headers'] as $name => $value) { $request .= $name . ': ' . trim($value) . "\r\n"; } $request .= "\r\n" . $options['data']; $result->request = $request; // Calculate how much time is left of the original timeout value. $timeout = $options['timeout'] - timer_read(__FUNCTION__) / 1000; if ($timeout > 0) { stream_set_timeout($fp, floor($timeout), floor(1000000 * fmod($timeout, 1))); fwrite($fp, $request); stream_set_blocking($fp, 0); } if (!empty($options['blocking'])) { return background_process_http_request_get_response($result); } return $result; } /** * Get response for an http request */ function background_process_http_request_get_response(&$result) { if ($result->postponed) { $result->postponed = FALSE; return background_process_http_request_initiate($result); } if (isset($result->code)) { return $result; } $fp = $result->fp; $options = $result->options; timer_start(__FUNCTION__); if (!empty($options['blocking'])) { stream_set_blocking($fp, 1); } // Fetch response. Due to PHP bugs like http://bugs.php.net/bug.php?id=43782 // and http://bugs.php.net/bug.php?id=46049 we can't rely on feof(), but // instead must invoke stream_get_meta_data() each iteration. $info = stream_get_meta_data($fp); $alive = !$info['eof'] && !$info['timed_out']; while ($alive) { // Calculate how much time is left of the original timeout value. $timeout = $options['timeout'] - timer_read(__FUNCTION__) / 1000; if ($timeout <= 0) { $info['timed_out'] = TRUE; break; } stream_set_timeout($fp, floor($timeout), floor(1000000 * fmod($timeout, 1))); $chunk = fread($fp, 1024); $result->response .= $chunk; $result->data_ready = empty($chunk) ? FALSE : TRUE; $info = stream_get_meta_data($fp); $alive = !$info['eof'] && !$info['timed_out']; if (empty($options['blocking'])) { break; } } if ($alive) { return $result; } fclose($fp); if ($info['timed_out']) { $result->code = HTTP_REQUEST_TIMEOUT; $result->error = 'request timed out'; return _background_process_http_request_result($result); } // Parse response headers from the response body. // Be tolerant of malformed HTTP responses that separate header and body with // \n\n or \r\r instead of \r\n\r\n. list($response, $result->data) = preg_split("/\r\n\r\n|\n\n|\r\r/", $result->response, 2); $response = preg_split("/\r\n|\n|\r/", $response); // Parse the response status line. list($protocol, $code, $status_message) = explode(' ', trim(array_shift($response)), 3); $result->protocol = $protocol; $result->status_message = $status_message; $result->headers = array(); // Parse the response headers. while ($line = trim(array_shift($response))) { list($name, $value) = explode(':', $line, 2); $name = strtolower($name); if (isset($result->headers[$name]) && $name == 'set-cookie') { // RFC 2109: the Set-Cookie response header comprises the token Set- // Cookie:, followed by a comma-separated list of one or more cookies. $result->headers[$name] .= ',' . trim($value); } else { $result->headers[$name] = trim($value); } } $responses = array( 100 => 'Continue', 101 => 'Switching Protocols', 200 => 'OK', 201 => 'Created', 202 => 'Accepted', 203 => 'Non-Authoritative Information', 204 => 'No Content', 205 => 'Reset Content', 206 => 'Partial Content', 300 => 'Multiple Choices', 301 => 'Moved Permanently', 302 => 'Found', 303 => 'See Other', 304 => 'Not Modified', 305 => 'Use Proxy', 307 => 'Temporary Redirect', 400 => 'Bad Request', 401 => 'Unauthorized', 402 => 'Payment Required', 403 => 'Forbidden', 404 => 'Not Found', 405 => 'Method Not Allowed', 406 => 'Not Acceptable', 407 => 'Proxy Authentication Required', 408 => 'Request Time-out', 409 => 'Conflict', 410 => 'Gone', 411 => 'Length Required', 412 => 'Precondition Failed', 413 => 'Request Entity Too Large', 414 => 'Request-URI Too Large', 415 => 'Unsupported Media Type', 416 => 'Requested range not satisfiable', 417 => 'Expectation Failed', 500 => 'Internal Server Error', 501 => 'Not Implemented', 502 => 'Bad Gateway', 503 => 'Service Unavailable', 504 => 'Gateway Time-out', 505 => 'HTTP Version not supported', ); // RFC 2616 states that all unknown HTTP codes must be treated the same as the // base code in their class. if (!isset($responses[$code])) { $code = floor($code / 100) * 100; } $result->code = $code; switch ($code) { case 200: // OK case 304: // Not modified break; case 301: // Moved permanently case 302: // Moved temporarily case 307: // Moved temporarily $location = $result->headers['location']; $options['timeout'] -= timer_read(__FUNCTION__) / 1000; if ($options['timeout'] <= 0) { $result->code = -1; $result->error = 'request timed out'; } elseif ($options['max_redirects']) { // Redirect to the new location. $options['max_redirects']--; $result = background_process_http_request($location, $options); if (empty($result->error)) { background_process_http_request_get_response($result); } $result->redirect_code = $code; } if (!isset($result->redirect_url)) { $result->redirect_url = $location; } break; default: $result->error = $status_message; } return _background_process_http_request_result($result); } function _background_process_http_request_result($result) { if (isset($result->code)) { if (empty($result->end)) { $result->end = microtime(TRUE); } if (!empty($result->options['callback']) && is_callable($result->options['callback'])) { call_user_func($result->options['callback'], $result); } } return $result; } /** * Process multiple http requests. */ function background_process_http_request_process(&$results, $options = array()) { $options += array( 'timeout' => 30, 'interval' => 0.01, 'limit' => 0, ); $interval = $options['interval'] * 1000000; $expire = time() + $options['timeout']; while ($results && time() < $expire) { $cnt = 0; $data_ready = FALSE; foreach ($results as $i => &$result) { if (isset($result->code)) { continue; } background_process_http_request_get_response($result); $data_ready = ($data_ready || $result->data_ready) ? TRUE : FALSE; $cnt++; if ($options['limit'] && $cnt >= $options['limit']) { break; } } if (!$cnt) { break; } if (!$data_ready) { usleep($interval); } } } /** * Determines the default service host and stores it in the variable storage. */ function background_process_determine_and_save_default_service_host() { $host = background_process_determine_default_service_host(); if ($host) { global $base_url; drupal_set_message(t('Default service host determined at %base_url', array('%base_url' => _background_process_secure_url($host['base_url'])))); if ($host['base_url'] === $base_url) { variable_del('background_process_derived_default_host'); } else { variable_set('background_process_derived_default_host', array('default' => $host)); drupal_set_message(t('Default service host differs from base url (%base_url). If migrating database to other sites or environments, you will need to either run "Determine default service host" again, or configure the default service host manually through settings.php', array('%base_url' => $base_url)), 'warning'); } return TRUE; } else { drupal_set_message(t('Could not determine default service host. Please configure background process in your settings.php'), 'error'); return FALSE; } } // ---------- INTERNAL FUNCTIONS ---------- /** * Ensure lock is removed at end of request * @param $handle * Handle of background process * @param $remove * If TRUE, don't remove when shutting down */ function _background_process_ensure_cleanup($handle, $remove = FALSE) { $handles = &drupal_static('background_process_handles_locked', NULL); if (!isset($handles)) { $handles = array(); drupal_register_shutdown_function('_background_process_cleanup_locks'); } if ($remove) { unset($handles[$handle]); } else { $handles[$handle] = $handle; } } /** * Shutdown handler for removing locks */ function _background_process_cleanup_locks() { $handles = &drupal_static('background_process_handles_locked', NULL); if (!empty($handles)) { foreach ($handles as $handle) { background_process_remove_process($handle); } } } /** * Get string name of callback. * * @param $callback * Callback can be either a string or an array. * @return string * The name of the callback, e.g. 'myfunction', 'myclass::mystaticmethod' or 'myclass->mymethod'. */ function _background_process_callback_name($callback) { if (is_array($callback)) { if (is_object($callback[0])) { $callback = get_class($callback[0]) . '->' . $callback[1]; } else { $callback = $callback[0] . '::' . $callback[1]; } } return $callback; } /** * Get request headers * * @return array headers */ function _background_process_request_headers() { foreach ($_SERVER as $key => $value) { if (substr($key, 0, 5) == 'HTTP_') { $key = str_replace(' ', '-', ucwords(strtolower(str_replace('_', ' ', substr($key, 5))))); if (empty($headers[$key])) { $headers[$key] = $value; } else { $headers[$key] .= "; $value"; } } } return $headers; } /** * Remove headers we do not wish to pass on to the next request. * * @param $headers * Headers to filter * @return array * Filtered headers */ function _background_process_filter_headers($headers) { $result = array(); if (empty($headers)) { return $result; } foreach ($headers as $key => $value) { if (!preg_match('/^(Connection|Keep-Alive|Proxy-Authenticate|Proxy-Authorization|TE|Trailers|Transfer-Encoding|Upgrade|Set-Cookie|Content-Length|Host|Accept-Encoding)$/i', $key)) { $result[$key] = $value; } } return $result; } /** * Secure a URL by obfuscating the password if present. * * @param $url * @return string * URL */ function _background_process_secure_url($url) { $url = parse_url($url); if (!empty($url['pass'])) { $url['pass'] = 'XXXXXXXX'; } return _background_process_unparse_url($url); } /** * Reverse logic of parse_url(). * * @param $parsed_url * Array from parse_url() * @return string * URL */ function _background_process_unparse_url($parsed_url) { $scheme = isset($parsed_url['scheme']) ? $parsed_url['scheme'] . '://' : ''; $host = isset($parsed_url['host']) ? $parsed_url['host'] : ''; $port = isset($parsed_url['port']) ? ':' . $parsed_url['port'] : ''; $user = isset($parsed_url['user']) ? $parsed_url['user'] : ''; $pass = isset($parsed_url['pass']) ? ':' . $parsed_url['pass'] : ''; $pass = ($user || $pass) ? "$pass@" : ''; $path = isset($parsed_url['path']) ? $parsed_url['path'] : ''; $query = isset($parsed_url['query']) ? '?' . $parsed_url['query'] : ''; $fragment = isset($parsed_url['fragment']) ? '#' . $parsed_url['fragment'] : ''; return "$scheme$user$pass$host$port$path$query$fragment"; } /** * Shutdown handler for restarting background process. * * @param $process * Background process object * @param $args * Array of arguments for callback */ function _background_process_restart($process, $args = array()) { $args = empty($args) ? $process->args : $args; $new = BackgroundProcess::load($process); $result = $new->start($process->callback, $args); }