| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563 | <?php/** * @file * This module implements a framework for calling funtions in the background. * * @code * $handle = background_process_start('mymodule_dosomething', $myvar1, $myvar2); * $handle = background_process_start(array('myclass', 'mystaticfunction'), $myvar1, $myvar2); * $handle = background_process_start(array($myobject, 'mymethod'), $myvar1, $myvar2); * $handle = background_process_start_locked('dontrunconcurrently', 'mymodule_dosomething', $myvar1, $myvar2); * @endcode * * For implementation of load balancing functionality: * @see hook_service_group() * * @todo Add functionality for killing keep alive requests? */include 'BackgroundProcess.class.php';define('BACKGROUND_PROCESS_SERVICE_TIMEOUT', 0);define('BACKGROUND_PROCESS_CONNECTION_TIMEOUT', 2);define('BACKGROUND_PROCESS_STREAM_TIMEOUT', 2);define('BACKGROUND_PROCESS_CLEANUP_AGE', 120);define('BACKGROUND_PROCESS_CLEANUP_AGE_RUNNING', 8 * 3600);define('BACKGROUND_PROCESS_CLEANUP_AGE_QUEUE', 86400);define('BACKGROUND_PROCESS_REDISPATCH_THRESHOLD', 10);define('BACKGROUND_PROCESS_STATUS_NONE', 0);define('BACKGROUND_PROCESS_STATUS_LOCKED', 1);define('BACKGROUND_PROCESS_STATUS_RUNNING', 2);define('BACKGROUND_PROCESS_STATUS_QUEUED', 3);// ---------- HOOKS ----------/** * Implements hook_menu(). */function background_process_menu() {  $items = array();  $items['bgp-start/%/%'] = array(    'type' => MENU_CALLBACK,    'title' => 'Run background process',    'description' => 'Run background process',    'page callback' => 'background_process_service_start',    'page arguments' => array(1),    'access callback' => 'background_process_service_access',    'access arguments' => array(1, 2),  );  $items['background-process/unlock/%'] = array(    'type' => MENU_CALLBACK,    'title' => 'Unlock background process',    'description' => 'Unlock background process',    'page callback' => 'background_process_service_unlock',    'page arguments' => array(2),    'access arguments' => array('administer background process'),    'file' => 'background_process.admin.inc',  );  $items['background-process/check-token'] = array(    'type' => MENU_CALLBACK,    'title' => 'Check background process token',    'description' => 'Check background process token',    'page callback' => 'background_process_check_token',    'page arguments' => array(2),    'access callback' => TRUE,    'file' => 'background_process.pages.inc',  );  $items['admin/config/system/background-process/settings'] = array(    'type' => MENU_DEFAULT_LOCAL_TASK,    'title' => 'Settings',    'weight' => 1,  );  $items['admin/config/system/background-process'] = array(    'title' => 'Background process',    'description' => 'Administer background processes',    'page callback' => 'drupal_get_form',    'page arguments' => array('background_process_settings_form'),    'access arguments' => array('administer background process'),    'file' => 'background_process.admin.inc',  );  $items['admin/config/system/background-process/overview'] = array(    'type' => MENU_LOCAL_TASK,    'title' => 'Overview',    'description' => 'Administer background processes',    'page callback' => 'background_process_overview_page',    'access arguments' => array('administer background process'),    'file' => 'background_process.admin.inc',    'weight' => 3,  );  return $items;}/** * Implements hook_permission(). */function background_process_permission() {  return array(    'administer background process' => array(      'title' => t('Administer background processes'),      'description' => t('Perform administration tasks for background processes.'),    ),  );}/** * Implements hook_cron(). */function background_process_cron() {  // Don't use more than 120 seconds to unlock  $expire = 120;  @set_time_limit($expire);  // Cleanup old handles  $time = time();  $msg = t('Never started (auto unlock due to timeout)');  do {    if (time() >= $_SERVER['REQUEST_TIME'] + $expire) {      break;    }    $result = db_query_range("SELECT handle, start_stamp FROM {background_process} WHERE start_stamp < :start AND exec_status = :status", 0, 10, array(      ':start' => $time - variable_get('background_process_cleanup_age', BACKGROUND_PROCESS_CLEANUP_AGE),      ':status' => BACKGROUND_PROCESS_STATUS_LOCKED    ));    $handles = $result->fetchAllAssoc('handle', PDO::FETCH_ASSOC);    foreach ($handles as $handle => $process) {      // Unlock the process      if (background_process_unlock($handle, $msg, $process['start_stamp'])) {        drupal_set_message(t("%handle unlocked: !msg", array('%handle' => $handle, '!msg' => $msg)));      }      else {        drupal_set_message(t("%handle could not be unlocked: !msg", array('%handle' => $handle, '!msg' => $msg)), 'error');      }    }  } while (!empty($handles));  // Cleanup stale requests  $time = time();  $msg = t('Never finished (auto unlock due to long run)');  do {    if (time() >= $_SERVER['REQUEST_TIME'] + $expire) {      break;    }    $result = db_query_range("SELECT handle, start_stamp FROM {background_process} WHERE start_stamp < :start AND exec_status = :status", 0, 10, array(       ':start' => $time - variable_get('background_process_cleanup_age_running', BACKGROUND_PROCESS_CLEANUP_AGE_RUNNING),      ':status' => BACKGROUND_PROCESS_STATUS_RUNNING    ));    $handles = $result->fetchAllAssoc('handle', PDO::FETCH_ASSOC);    foreach ($handles as $handle => $process) {      // Unlock the process      if (background_process_unlock($handle, $msg, $process['start_stamp'])) {        drupal_set_message(t("%handle unlocked: !msg", array('%handle' => $handle, '!msg' => $msg)));      }      else {        drupal_set_message(t("%handle could not be unlocked: !msg", array('%handle' => $handle, '!msg' => $msg)), 'error');      }    }  } while (!empty($results));  // Cleanup queued requests that were never processed   $time = time();  $msg = t('Never picked up by cron worker (auto unlock due to timeout)');  do {    if (time() >= $_SERVER['REQUEST_TIME'] + $expire) {      break;    }    $result = db_query_range("SELECT handle, start_stamp FROM {background_process} WHERE start_stamp < :start AND exec_status = :status", 0, 10, array(      ':start' => $time - variable_get('background_process_cleanup_age_queue', BACKGROUND_PROCESS_CLEANUP_AGE_QUEUE),      ':status' => BACKGROUND_PROCESS_STATUS_QUEUED    ));    $handles = $result->fetchAllAssoc('handle', PDO::FETCH_ASSOC);    foreach ($handles as $handle => $process) {      // Unlock the process      if (background_process_unlock($handle, $msg, $process['start_stamp'])) {        drupal_set_message(t("%handle unlocked: !msg", array('%handle' => $handle, '!msg' => $msg)));      }      else {        drupal_set_message(t("%handle could not be unlocked: !msg", array('%handle' => $handle, '!msg' => $msg)), 'error');      }    }  } while (!empty($results));}/** * Implements hook_cron_alter(). */function background_process_cron_alter(&$items) {  $items['background_process_cron']['override_congestion_protection'] = TRUE;  // Unlock background if too old.  // @todo Move to some access handler or pre-execute?  if ($process = background_process_get_process('uc:background_process_cron')) {    if ($process->start + 30 < time()) {      background_process_unlock($process->handle, t('Self unlocking stale lock'), $process->start);    }  }}/** * Implements hook_cronapi(). */function background_process_cronapi($op, $job = NULL) {  switch ($op) {    case 'list':      return array('background_process_cron' => t('Cleanup old process handles'));    case 'rule':      return '* * * * *';    case 'configure':      return 'admin/config/system/background-process';  }}/** * Implements hook_service_group(). * * Default load balancing using random. */function background_process_service_group() {  $info = array();  $info['methods']['background_process_service_group_random'] = t('Random');  $info['methods']['background_process_service_group_round_robin'] = t('Pseudo round-robin');  return $info;}/** * Load balancing based on random pick. */function background_process_service_group_random($service_group) {  return $service_group['hosts'][rand(0, count($service_group['hosts']) - 1)];}/** * Round-robin load balancing based on random pick. */function background_process_service_group_round_robin($service_group) {  static $idx = NULL;  if (isset($idx)) {    $idx = ($idx + 1) % count($service_group['hosts']);  }  else {    $idx = rand(0, count($service_group['hosts']) - 1);  }  return $service_group['hosts'][$idx];}/** * Access handler for service call */function background_process_service_access($handle, $token) {  // Setup service  ignore_user_abort(TRUE);  // Damn those slashes!  $handle = rawurldecode($handle);  $token = rawurldecode($token);  // Ensure no session!  drupal_save_session(FALSE);  unset($_SESSION);  $process = background_process_get_process($handle);  if (!$process) {    watchdog('bg_process', 'Unknown process: %handle', array('%handle' => $handle));    return FALSE;  }  if ($token !== $process->token) {    watchdog('bg_process', 'Invalid token: %token for handle: %handle', array('%token' => $token, '%handle' => $handle));    return FALSE;  }  // Login as the user that requested the call  if ($process->uid) {    global $user;    $user = user_load($process->uid);    if (!$user) {      // Invalid user!      return FALSE;    }  }  else {    $user = drupal_anonymous_user();  }  return TRUE;}/** * Implements hook_init(). */function background_process_init() {  // Only determine if we're told to do so  if (empty($_SESSION['background_process_determine_default_service_host'])) {    return;  }  // Don't determine on check-token page, to avoid infinite loop  if ($_GET['q'] == 'background-process/check-token') {    return;  }  // Only determine when installation of site is finished  if (variable_get('install_task', FALSE) != 'done') {    return;  }  // Determine the default service host  background_process_determine_and_save_default_service_host();  unset($_SESSION['background_process_determine_default_service_host']);}/** * Implements hook_cron_queue_info(). */function background_process_cron_queue_info() {  $queues['background_process'] = array(    'worker callback' => '_background_process_queue',  );  $background_process_queues = variable_get('background_process_queues', array());  foreach ($background_process_queues as $queue_name) {    $queues['bgp:' . $queue_name] = array(      'worker callback' => '_background_process_queue',    );  }  return $queues;}// ---------- HELPER FUNCTIONS ----------/** * Worker callback for processing queued function call */function _background_process_queue($item) {  $oldhandle = background_process_current_handle();  list ($handle, $token) = $item;  if (background_process_service_access($handle, $token)) {    try {      background_process_service_execute(rawurldecode($handle), TRUE);      background_process_current_handle($oldhandle);    }    catch (Exception $e) {      background_process_current_handle($oldhandle);      background_process_update_status(rawurldecode($handle), BACKGROUND_PROCESS_STATUS_QUEUED);      throw $e;    }   }}/** * Get/set current handle. * * @staticvar $current_handle * @param $handle * @return string *   Current handle. */function background_process_current_handle($handle = NULL) {  static $current_handle = NULL;  if (isset($handle)) {    $current_handle = $handle;  }  return $current_handle;}/** * Get a unique handle based on a callback. * * @param $callback *   Function to generate handle from. * @return string *   Handle. */function background_process_generate_handle($callback) {  return md5(serialize($callback) . ':' . microtime(TRUE) . ':' . rand(1, 5000));}/** * Start background process * * Calls the service handler through http passing function arguments as serialized data * Be aware that the callback will run in a new request * * @global string $base_url *   Base URL for this Drupal request * * @param $callback *   Function to call * @param $var [, $... ]] *   Arbitrary number of variables to pass on to the callback * @return mixed *   Handle on success, FALSE on failure */function background_process_start($callback /* [, $var [, $... ]] */) {  $process = new BackgroundProcess();  $args = func_get_args();  array_splice($args, 0, 1);  $result = $process->start($callback, $args);  return $result ? $process->handle : $result;}/** * Start locked background process * * Calls the service handler through http passing function arguments as serialized data * Be aware that the callback will run in a new request * * @global string $base_url *   Base URL for this Drupal request * * @param $handle *   Handle to give background process * @param $callback *   Function to call * @param $var [, $... ]] *   Arbitrary number of variables to pass on to the callback * @return mixed *   Handle on success, FALSE on failure */function background_process_start_locked($handle, $callback /* [, $var [, $... ]] */) {  $process = new BackgroundProcess($handle);  $args = func_get_args();  array_splice($args, 0, 2);  $result = $process->start($callback, $args);  return $result ? $process->handle : $result;}/** * Start background process * * Queue the function call passing function arguments as serialized data * Be aware that the callback will run in a new request * * @global string $base_url *   Base URL for this Drupal request * * @param $callback *   Function to call * @param $var [, $... ]] *   Arbitrary number of variables to pass on to the callback * @return mixed *   Handle on success, FALSE on failure */function background_process_queue($callback /* [, $var [, $... ]] */) {  $process = new BackgroundProcess();  $args = func_get_args();  array_splice($args, 0, 1);  return $process->queue($callback, $args);}/** * Queue locked background process * * Queue the function call passing function arguments as serialized data * Be aware that the callback will run in a new request * * @global string $base_url *   Base URL for this Drupal request * * @param $handle *   Handle to give background process * @param $callback *   Function to call * @param $var [, $... ]] *   Arbitrary number of variables to pass on to the callback * @return mixed *   Handle on success, FALSE on failure */function background_process_queue_locked($handle, $callback /* [, $var [, $... ]] */) {  $process = new BackgroundProcess($handle);  $args = func_get_args();  array_splice($args, 0, 2);  return $process->queue($callback, $args);}/** * Cleanup cache menu and ensure all locks are released (again). */function _background_process_cleanup_menu($cid) {  cache_clear_all($cid, 'cache_menu');  // Release locks in case cache_clear_all() set's a lock and lock_release_all()  // has already been run.  lock_release_all();}/** * Call the function requested by the service call * * @param $handle *   Raw URL encoded handle of process * @param $return *   Whether or not the function should return or exit. */function background_process_service_start($handle, $return = FALSE) {  drupal_add_http_header('Content-Type', 'text/plain');  // Let's clean up the mess the menu-router system leaves behind.  $cid = 'menu_item:' . hash('sha256', $_GET['q']);  drupal_register_shutdown_function('_background_process_cleanup_menu', $cid);  // Setup service  ignore_user_abort(TRUE);  @set_time_limit(variable_get('background_process_service_timeout', BACKGROUND_PROCESS_SERVICE_TIMEOUT));  // Damn those slashes!  $handle = rawurldecode($handle);  return background_process_service_execute($handle, $return);}/** * Execute the service * * @param $handle *   Handle of process * @param $return *   Whether or not the function should return or exit. */function background_process_service_execute($handle, $return = FALSE) {  // @todo Add static caching? We've already loaded this previously in the access handler  $process = background_process_get_process($handle);  if (!$process) {    watchdog('bg_process', 'Process not found for handle: %handle', array('%handle' => $handle), WATCHDOG_ERROR);    if ($return) {      return;    }    else {      exit;    }  }  $process->start_stamp = microtime(TRUE);  try {    $old_db = db_set_active('background_process');    $claimed = db_update('background_process')                 ->fields(array(                   'start_stamp' => sprintf("%.06f", $process->start_stamp),                   'exec_status' => BACKGROUND_PROCESS_STATUS_RUNNING,                 ))                 ->condition('handle', $handle)                 ->condition('exec_status', array(BACKGROUND_PROCESS_STATUS_LOCKED, BACKGROUND_PROCESS_STATUS_QUEUED), 'IN')                 ->execute();    db_set_active($old_db);    if ($claimed) {      $process->exec_status = BACKGROUND_PROCESS_STATUS_RUNNING;      $process = BackgroundProcess::load($process);      $process->sendMessage('claimed');      background_process_current_handle($handle);    }    else {      if ($return) {        return;      }      else {        exit;      }    }  }  catch (Exception $e) {    db_set_active($old_db);    throw $e;  }  // Make sure the process is removed when we're done  if (!$return) {    drupal_register_shutdown_function('background_process_remove_process', $process->handle, $process->start_stamp);  }  if (is_callable($process->callback)) {    $old_db = NULL;    try {      if (!$return) {        drupal_register_shutdown_function('module_invoke_all', 'background_process_shutdown', $process);      }      $callback = _background_process_callback_name($process->callback);      $old_db = db_set_active('background_process');      progress_initialize_progress($handle, "Background process '$callback' initialized");      db_set_active($old_db);      call_user_func_array($process->callback, $process->args);      $old_db = db_set_active('background_process');      progress_end_progress($handle, "Background process '$callback' finished");      db_set_active($old_db);      if ($return) {        background_process_remove_process($process->handle, $process->start_stamp);        module_invoke_all('background_process_shutdown', $process);      }    }    catch (Exception $e) {      // Exception occurred, switch back to proper db if necessary      // and inform shutdown handlers.      if ($old_db) {        db_set_active($old_db);      }      if (!$return) {        module_invoke_all('background_process_shutdown', $process, (string) $e);      }      throw $e;    }  }  else {    // Function not found    watchdog('bg_process', 'Callback: %callback not found', array('%callback' => $process->callback), WATCHDOG_ERROR);  }  if ($return) {    return;  }  else {    exit;  }}/** * Restart the current background process * * @return *   Exception on fail, otherwise exit */function background_process_restart() {  $args = func_get_args();  call_user_func_array('background_process_keepalive', $args);  exit;}/** * Keep the current background process alive * (meaning restart it when it exits) */function background_process_keepalive() {  $args = func_get_args();  $handle = background_process_current_handle();  if (!$handle) {    throw new Exception(t('Background process handle %handle not found', array('%handle' => $handle)));  }  $process = background_process_get_process($handle);  if (!$process) {    throw new Exception(t('Background process %handle not found', array('%handle' => $handle)));  }  drupal_register_shutdown_function('_background_process_restart', $process, $args);}/** * Check if the background process has started * * @param $handle *   Handle of openend background process * @return boolean *   TRUE if started, FALSE if not */function background_process_is_started($handle) {  $old_db = db_set_active('background_process');  $progress = progress_get_progress($handle);  db_set_active($old_db);  return !empty($progress);}/** * Check if the background process has finished * * @param $handle *   Handle of openend background process * @return boolean *   TRUE if finished, FALSE if not */function background_process_is_finished($handle) {  $old_db = db_set_active('background_process');  $progress = progress_get_progress($handle);  db_set_active($old_db);  return (empty($progress) || $progress->end);}/** * Set background process * * @global object $user *   Current logged in user * * @param $handle *   Handle of background process * @param $callback *   Function of background process * @return boolean *   TRUE if set, FALSE if not */function background_process_set_process($handle, $callback, $uid, $args, $token) {  // Setup parameters  $args = serialize($args);  $callback = serialize($callback);  // Get user  if (!isset($uid)) {    global $user;    $uid = $user->uid;  }  try {    $old_db = db_set_active('background_process');    $result = db_update('background_process')      ->fields(array(        'callback' => $callback,        'args' => $args,        'uid' => $uid,        'token' => $token      ))      ->condition('handle', $handle)      ->execute();    db_set_active($old_db);    return $result;  }  catch (Exception $e) {    db_set_active($old_db);    throw $e;  }}/** * Lock process * * @param $handle *   Handle of background process * @return boolean *   TRUE if locked, FALSE if lock could not be obtained */function background_process_lock_process($handle, $status = BACKGROUND_PROCESS_STATUS_LOCKED) {  try {    $old_db = db_set_active('background_process');    db_insert('background_process')      ->fields(array(        'handle' => $handle,        'start_stamp' => sprintf("%.06f", microtime(TRUE)),        'exec_status' => $status,      ))      ->execute();    db_set_active($old_db);    _background_process_ensure_cleanup($handle);    return TRUE;  }  catch (Exception $e) {    db_set_active($old_db);    return FALSE;  }}/** * Set status for background process */function background_process_update_status($handle, $status) {  db_update('background_process')    ->fields(array('exec_status' => $status))    ->condition('handle', $handle)    ->execute();}/** * Get background process * * @param $handle *   Handle of background process * @return object *   Background process */function background_process_get_process($handle) {  try {    $old_db = db_set_active('background_process');    $result = db_select('background_process', 'bp')      ->fields('bp', array('handle', 'callback', 'args', 'uid', 'token', 'service_host', 'start_stamp', 'exec_status'))      ->condition('handle', $handle)      ->execute()      ->fetchObject();    db_set_active($old_db);  }  catch (Exception $e) {    db_set_active($old_db);    throw $e;  }  if ($result) {    $result->args = unserialize($result->args);    $result->callback = unserialize($result->callback);    $result->start = $result->start_stamp;    $result->status = $result->exec_status;    return $result;  }  return FALSE;}/** * Get background process * * @param $handle *   Handle of background process * @return object *   Background process */function background_process_get_processes($status = NULL) {  $old_db = db_set_active('background_process');  $result = db_select('background_process', 'bp')    ->fields('bp', array('handle', 'callback', 'args', 'uid', 'token', 'service_host', 'start_stamp', 'exec_status'));  if (isset($status)) {    $result = $result->condition('bp.status', $status);  }  $result = $result->execute();  $processes = array();  while ($process = $result->fetchObject()) {    $process->args = unserialize($process->args);    $process->callback = unserialize($process->callback);    $process->start = $process->start_stamp;    $process->status = $process->exec_status;    $processes[] = $process;  }  db_set_active($old_db);  return $processes;}/** * Remove a background process * * @param $handle *   Handle of background process * @return mixed *   Number of handles deleted on success, FALSE on failure. */function background_process_remove_process($handle, $start = NULL) {  $old_db = db_set_active('background_process');  if (isset($start)) {    $result = db_delete('background_process')      ->condition('handle', $handle)      ->condition('start_stamp', sprintf("%.06f", $start), '=')      ->execute();  }  else {    $result = db_delete('background_process')      ->condition('handle', $handle)      ->execute();  }  db_set_active($old_db);  return $result;}/** * Unlock background process. * * @param $handle *   Handle of process to unlock */function background_process_unlock($handle, $msg = NULL, $start = NULL) {  $process = background_process_get_process($handle);  if ($process && (!isset($start) || $start === $process->start)) {    // Unlock the process    if (background_process_remove_process($process->handle, $process->start)) {      global $user;      module_invoke_all('background_process_shutdown', $process, $msg ? $msg : t('Manually unlocked by !name', array('!name' => $user->name)));      return TRUE;    }  }  return FALSE;}/** * Set a service host for a background process. * * @param $handle *   Background process handle * @param $service_host *   Name of service host */function background_process_set_service_host($handle, $service_host) {  try {    $old_db = db_set_active('background_process');    $result = db_update('background_process')           ->fields(array(             'service_host' => $service_host ? $service_host : '',           ))           ->condition('handle', $handle)           ->execute();    db_set_active($old_db);    return $result;  }  catch (Exception $e) {    db_set_active($old_db);    throw $e;  }}/** * Get service hosts defined in the system. */function background_process_get_service_hosts() {  global $base_url;  $service_hosts = variable_get('background_process_service_hosts', array());  $service_hosts += variable_get('background_process_derived_default_host', array(    'default' => array(      'base_url' => $base_url    )  ));  return $service_hosts;}/** * Get service hosts defined in the system. */function background_process_get_service_groups() {  $service_groups = variable_get('background_process_service_groups', array());  $service_groups += array(    'default' => array(      'hosts' => array(variable_get('background_process_default_service_host', 'default')),    ),  );  foreach ($service_groups as &$service_group) {    $service_group += array(      'method' => 'background_process_service_group_round_robin'    );  }  return $service_groups;}/** * Determine host for current installation. * Host is determined in the following order: *   <server name> *   <localhost> *   <server ip> * @return array *   Array of service host parameters for the default service host. FALSE if none could be determined. */function background_process_determine_default_service_host() {  $token = md5(session_id() . md5(uniqid(mt_rand(), TRUE)) . md5(uniqid(mt_rand(), TRUE)));  variable_set('background_process_token', $token);  global $conf;  $auth = isset($_SERVER['PHP_AUTH_USER']) ? $_SERVER['PHP_AUTH_USER'] . ':' . $_SERVER['PHP_AUTH_PW'] . '@' : '';  $scheme = isset($_SERVER['HTTPS']) && $_SERVER['HTTPS'] == 'on' ? 'https://' : 'http://';  global $base_url;  $url = parse_url($base_url);  $path = empty($url['path']) ? '' : $url['path'];  $candidates = array(    array('base_url' => $base_url),    array('base_url' => $scheme . $_SERVER['SERVER_NAME'] . ':' . $_SERVER['SERVER_PORT'] . $path, 'http_host' => $_SERVER['HTTP_HOST']),    array('base_url' => $scheme . '127.0.0.1:' . $_SERVER['SERVER_PORT'] . $path, 'http_host' => $_SERVER['HTTP_HOST']),    array('base_url' => $scheme . (!array_key_exists('SERVER_ADDR', $_SERVER) ? $_SERVER['LOCAL_ADDR'] : $_SERVER['SERVER_ADDR']) . ':' . $_SERVER['SERVER_PORT'] . $path, 'http_host' => $_SERVER['HTTP_HOST']),    array('base_url' => $scheme . $auth . $_SERVER['SERVER_NAME'] . ':' . $_SERVER['SERVER_PORT'] . $path, 'http_host' => $_SERVER['HTTP_HOST']),    array('base_url' => $scheme . $auth . '127.0.0.1:' . $_SERVER['SERVER_PORT'] . $path, 'http_host' => $_SERVER['HTTP_HOST']),    array('base_url' => $scheme . $auth . (!array_key_exists('SERVER_ADDR', $_SERVER) ? $_SERVER['LOCAL_ADDR'] : $_SERVER['SERVER_ADDR']) . ':' . $_SERVER['SERVER_PORT'] . $path, 'http_host' => $_SERVER['HTTP_HOST']),  );  $found = NULL;  foreach ($candidates as $i => $candidate) {    $conf['background_process_service_hosts']['__test'] = $candidate;    list($url, $headers) = background_process_build_request('background-process/check-token', '__test');    if (empty($results[$url])) {      $results[$url] = background_process_http_request($url, array('headers' => $headers, 'postpone' => TRUE, 'candidate' => $i, 'method' => 'POST'));    }  }  background_process_http_request_process($results);  foreach ($results as $result) {    if ($result->code == 200) {      if ($token === substr($result->data, 0, strlen($token))) {        $found = $candidates[$result->options['candidate']];        break;      }    }  }  if ($found) {    return $found;  }  return FALSE;}/** * Build url and headers for http request * @param $url *   Relative url for the request * @param $service_hostname *   Name of service host, e.g. 'default' * @return array *   array(url, headers) */function background_process_build_request($url, $service_hostname = NULL, $options = array()) {  $service_hosts = background_process_get_service_hosts();  if (!$service_hostname || empty($service_hosts[$service_hostname])) {    $service_hostname = 'default';  }  $service_host = $service_hosts[$service_hostname];  $options += array(    'absolute' => TRUE,    'base_url' => $service_host['base_url'],  );  $url = url($url, $options);  $parsed = parse_url($url);  $host = !empty($service_host['http_host']) ? $service_host['http_host'] : (isset($parsed['host']) ? $parsed['host'] : NULL);  $headers = _background_process_request_headers();  $headers = _background_process_filter_headers($headers);  $headers['Host'] = $host;  $headers['Connection'] = 'Close';  if (isset($parsed['user'])) {    $headers['Authorization'] = 'Basic ' . base64_encode($parsed['user'] . ':' . $parsed['pass']);  }  return array($url, $headers);}/** * Transform header array from key/value to strings. * @param $headers *   array of headers in key/value style * @return array *   array of strings. */function background_process_build_headers($headers) {  $header = array();  foreach ($headers as $key => $value) {    $header[] = "$key: $value";  }  return $header;}/** * Perform an http request. * @see drupal_http_request() */function background_process_http_request($url, array $options = array()) {  // Parse the URL and make sure we can handle the schema.  $result = new stdClass();  $result->url = $url;  $result->options = $options;  $result->code = NULL;  $uri = @parse_url($url);  $result->uri = $uri;  if ($uri == FALSE) {    $result->error = 'unable to parse URL';    $result->code = -1001;    return _background_process_http_request_result($result);  }  if (!isset($uri['scheme'])) {    $result->error = 'missing schema';    $result->code = -1002;    return _background_process_http_request_result($result);  }  // Merge the default options.  $options += array(    'headers' => array(),    'method' => 'GET',    'data' => NULL,    'max_redirects' => 3,    'timeout' => variable_get('background_process_connection_timeout', BACKGROUND_PROCESS_CONNECTION_TIMEOUT),    'context' => NULL,    'blocking' => FALSE,    'postpone' => FALSE,  );  // stream_socket_client() requires timeout to be a float.  $options['timeout'] = (float) $options['timeout'];  $host = NULL;  switch ($uri['scheme']) {    case 'http':    case 'feed':      $port = isset($uri['port']) ? $uri['port'] : 80;      $socket = 'tcp://' . $uri['host'] . ':' . $port;      // RFC 2616: "non-standard ports MUST, default ports MAY be included".      // We don't add the standard port to prevent from breaking rewrite rules      // checking the host that do not take into account the port number.      $host = $uri['host'] . ($port != 80 ? ':' . $port : '');      break;    case 'https':      // Note: Only works when PHP is compiled with OpenSSL support.      $port = isset($uri['port']) ? $uri['port'] : 443;      $socket = 'ssl://' . $uri['host'] . ':' . $port;      $host = $uri['host'] . ($port != 443 ? ':' . $port : '');      break;    default:      $result->error = 'invalid schema ' . $uri['scheme'];      $result->code = -1003;      return _background_process_http_request_result($result);  }  if (!empty($host) && empty($options['headers']['Host'])) {    $options['headers']['Host'] = $host;  }  $result->options = $options;  $result->socket = $socket;  $result->postponed = $options['postpone'];  if ($result->postponed) {    return $result;  }  else {    return background_process_http_request_initiate($result);  }}/** * Initiate the http request. */function background_process_http_request_initiate(&$result) {  timer_start(__FUNCTION__);  $options = $result->options;  $socket = $result->socket;  $uri = $result->uri;  $result->start = microtime(TRUE);  $result->data_ready = TRUE;  if (empty($options['context'])) {    $fp = @stream_socket_client($socket, $errno, $errstr, $options['timeout']);  }  else {    // Create a stream with context. Allows verification of a SSL certificate.    $fp = @stream_socket_client($socket, $errno, $errstr, $options['timeout'], STREAM_CLIENT_CONNECT, $options['context']);  }  // Make sure the socket opened properly.  if (!$fp) {    // When a network error occurs, we use a negative number so it does not    // clash with the HTTP status codes.    $result->code = -$errno;    $result->error = trim($errstr) ? trim($errstr) : t('Error opening socket @socket', array('@socket' => $socket));    // Mark that this request failed. This will trigger a check of the web    // server's ability to make outgoing HTTP requests the next time that    // requirements checking is performed.    // See system_requirements()    // @fixme Disabled for Background Process    // variable_set('drupal_http_request_fails', TRUE);    return _background_process_http_request_result($result);  }  $result->fp = $fp;  // Construct the path to act on.  $path = isset($uri['path']) ? $uri['path'] : '/';  if (isset($uri['query'])) {    $path .= '?' . $uri['query'];  }  // Merge the default headers.  $options['headers'] += array(    'User-Agent' => 'Drupal (+http://drupal.org/)',  );  // Only add Content-Length if we actually have any content or if it is a POST  // or PUT request. Some non-standard servers get confused by Content-Length in  // at least HEAD/GET requests, and Squid always requires Content-Length in  // POST/PUT requests.  $content_length = strlen($options['data']);  if ($content_length > 0 || $options['method'] == 'POST' || $options['method'] == 'PUT') {    $options['headers']['Content-Length'] = $content_length;  }  // If the server URL has a user then attempt to use basic authentication.  if (isset($uri['user'])) {    $options['headers']['Authorization'] = 'Basic ' . base64_encode($uri['user'] . (isset($uri['pass']) ? ':' . $uri['pass'] : ''));  }  // If the database prefix is being used by SimpleTest to run the tests in a copied  // database then set the user-agent header to the database prefix so that any  // calls to other Drupal pages will run the SimpleTest prefixed database. The  // user-agent is used to ensure that multiple testing sessions running at the  // same time won't interfere with each other as they would if the database  // prefix were stored statically in a file or database variable.  $test_info = &$GLOBALS['drupal_test_info'];  if (!empty($test_info['test_run_id'])) {    $options['headers']['User-Agent'] = drupal_generate_test_ua($test_info['test_run_id']);  }  $request = $options['method'] . ' ' . $path . " HTTP/1.0\r\n";  foreach ($options['headers'] as $name => $value) {    $request .= $name . ': ' . trim($value) . "\r\n";  }  $request .= "\r\n" . $options['data'];  $result->request = $request;  // Calculate how much time is left of the original timeout value.  $timeout = $options['timeout'] - timer_read(__FUNCTION__) / 1000;  if ($timeout > 0) {    stream_set_timeout($fp, floor($timeout), floor(1000000 * fmod($timeout, 1)));    fwrite($fp, $request);    stream_set_blocking($fp, 0);  }  if (!empty($options['blocking'])) {    return background_process_http_request_get_response($result);  }  return $result;}/** * Get response for an http request */function background_process_http_request_get_response(&$result) {  if ($result->postponed) {    $result->postponed = FALSE;    return background_process_http_request_initiate($result);  }  if (isset($result->code)) {    return $result;  }  $fp = $result->fp;  $options = $result->options;  timer_start(__FUNCTION__);  if (!empty($options['blocking'])) {    stream_set_blocking($fp, 1);  }  // Fetch response. Due to PHP bugs like http://bugs.php.net/bug.php?id=43782  // and http://bugs.php.net/bug.php?id=46049 we can't rely on feof(), but  // instead must invoke stream_get_meta_data() each iteration.  $info = stream_get_meta_data($fp);  $alive = !$info['eof'] && !$info['timed_out'];  while ($alive) {    // Calculate how much time is left of the original timeout value.    $timeout = $options['timeout'] - timer_read(__FUNCTION__) / 1000;    if ($timeout <= 0) {      $info['timed_out'] = TRUE;      break;    }    stream_set_timeout($fp, floor($timeout), floor(1000000 * fmod($timeout, 1)));    $chunk = fread($fp, 1024);    $result->response .= $chunk;    $result->data_ready = empty($chunk) ? FALSE : TRUE;    $info = stream_get_meta_data($fp);    $alive = !$info['eof'] && !$info['timed_out'];    if (empty($options['blocking'])) {      break;    }  }  if ($alive) {    return $result;  }  fclose($fp);  if ($info['timed_out']) {    $result->code = HTTP_REQUEST_TIMEOUT;    $result->error = 'request timed out';    return _background_process_http_request_result($result);  }  // Parse response headers from the response body.  // Be tolerant of malformed HTTP responses that separate header and body with  // \n\n or \r\r instead of \r\n\r\n.  list($response, $result->data) = preg_split("/\r\n\r\n|\n\n|\r\r/", $result->response, 2);  $response = preg_split("/\r\n|\n|\r/", $response);  // Parse the response status line.  list($protocol, $code, $status_message) = explode(' ', trim(array_shift($response)), 3);  $result->protocol = $protocol;  $result->status_message = $status_message;  $result->headers = array();  // Parse the response headers.  while ($line = trim(array_shift($response))) {    list($name, $value) = explode(':', $line, 2);    $name = strtolower($name);    if (isset($result->headers[$name]) && $name == 'set-cookie') {      // RFC 2109: the Set-Cookie response header comprises the token Set-      // Cookie:, followed by a comma-separated list of one or more cookies.      $result->headers[$name] .= ',' . trim($value);    }    else {      $result->headers[$name] = trim($value);    }  }  $responses = array(    100 => 'Continue',    101 => 'Switching Protocols',    200 => 'OK',    201 => 'Created',    202 => 'Accepted',    203 => 'Non-Authoritative Information',    204 => 'No Content',    205 => 'Reset Content',    206 => 'Partial Content',    300 => 'Multiple Choices',    301 => 'Moved Permanently',    302 => 'Found',    303 => 'See Other',    304 => 'Not Modified',    305 => 'Use Proxy',    307 => 'Temporary Redirect',    400 => 'Bad Request',    401 => 'Unauthorized',    402 => 'Payment Required',    403 => 'Forbidden',    404 => 'Not Found',    405 => 'Method Not Allowed',    406 => 'Not Acceptable',    407 => 'Proxy Authentication Required',    408 => 'Request Time-out',    409 => 'Conflict',    410 => 'Gone',    411 => 'Length Required',    412 => 'Precondition Failed',    413 => 'Request Entity Too Large',    414 => 'Request-URI Too Large',    415 => 'Unsupported Media Type',    416 => 'Requested range not satisfiable',    417 => 'Expectation Failed',    500 => 'Internal Server Error',    501 => 'Not Implemented',    502 => 'Bad Gateway',    503 => 'Service Unavailable',    504 => 'Gateway Time-out',    505 => 'HTTP Version not supported',  );  // RFC 2616 states that all unknown HTTP codes must be treated the same as the  // base code in their class.  if (!isset($responses[$code])) {    $code = floor($code / 100) * 100;  }  $result->code = $code;  switch ($code) {    case 200: // OK    case 304: // Not modified      break;    case 301: // Moved permanently    case 302: // Moved temporarily    case 307: // Moved temporarily      $location = $result->headers['location'];      $options['timeout'] -= timer_read(__FUNCTION__) / 1000;      if ($options['timeout'] <= 0) {        $result->code = -1;        $result->error = 'request timed out';      }      elseif ($options['max_redirects']) {        // Redirect to the new location.        $options['max_redirects']--;        $result = background_process_http_request($location, $options);        if (empty($result->error)) {          background_process_http_request_get_response($result);        }        $result->redirect_code = $code;      }      if (!isset($result->redirect_url)) {        $result->redirect_url = $location;      }      break;    default:      $result->error = $status_message;  }  return _background_process_http_request_result($result);}function _background_process_http_request_result($result) {  if (isset($result->code)) {    if (empty($result->end)) {      $result->end = microtime(TRUE);    }    if (!empty($result->options['callback']) && is_callable($result->options['callback'])) {      call_user_func($result->options['callback'], $result);    }  }  return $result;}/** * Process multiple http requests. */function background_process_http_request_process(&$results, $options = array()) {  $options += array(    'timeout' => 30,    'interval' => 0.01,    'limit' => 0,  );  $interval = $options['interval'] * 1000000;  $expire = time() + $options['timeout'];  while ($results && time() < $expire) {    $cnt = 0;    $data_ready = FALSE;    foreach ($results as $i => &$result) {      if (isset($result->code)) {        continue;      }      background_process_http_request_get_response($result);      $data_ready = ($data_ready || $result->data_ready) ? TRUE : FALSE;      $cnt++;      if ($options['limit'] && $cnt >= $options['limit']) {        break;      }    }    if (!$cnt) {      break;    }    if (!$data_ready) {      usleep($interval);    }  }}/** * Determines the default service host and stores it in the variable storage. */function background_process_determine_and_save_default_service_host() {  $host = background_process_determine_default_service_host();  if ($host) {    global $base_url;    drupal_set_message(t('Default service host determined at %base_url', array('%base_url' => _background_process_secure_url($host['base_url']))));    if ($host['base_url'] === $base_url) {      variable_del('background_process_derived_default_host');    }    else {      variable_set('background_process_derived_default_host', array('default' => $host));      drupal_set_message(t('Default service host differs from base url (%base_url). If migrating database to other sites or environments, you will need to either run "Determine default service host" again, or configure the default service host manually through settings.php', array('%base_url' => $base_url)), 'warning');    }    return TRUE;  }  else {    drupal_set_message(t('Could not determine default service host. Please configure background process in your settings.php'), 'error');    return FALSE;  }}// ---------- INTERNAL FUNCTIONS ----------/** * Ensure lock is removed at end of request * @param $handle *   Handle of background process * @param $remove *   If TRUE, don't remove when shutting down */function _background_process_ensure_cleanup($handle, $remove = FALSE) {  $handles = &drupal_static('background_process_handles_locked', NULL);  if (!isset($handles)) {    $handles = array();    drupal_register_shutdown_function('_background_process_cleanup_locks');  }  if ($remove) {    unset($handles[$handle]);  }  else {    $handles[$handle] = $handle;  }}/** * Shutdown handler for removing locks */function _background_process_cleanup_locks() {  $handles = &drupal_static('background_process_handles_locked', NULL);  if (!empty($handles)) {    foreach ($handles as $handle) {      background_process_remove_process($handle);    }  }}/** * Get string name of callback. * * @param $callback *   Callback can be either a string or an array. * @return string *   The name of the callback, e.g. 'myfunction', 'myclass::mystaticmethod' or 'myclass->mymethod'. */function _background_process_callback_name($callback) {  if (is_array($callback)) {    if (is_object($callback[0])) {      $callback = get_class($callback[0]) . '->' . $callback[1];    }    else {      $callback = $callback[0] . '::' . $callback[1];    }  }  return $callback;}/** * Get request headers * * @return array headers */function _background_process_request_headers() {  foreach ($_SERVER as $key => $value) {    if (substr($key, 0, 5) == 'HTTP_') {      $key = str_replace(' ', '-', ucwords(strtolower(str_replace('_', ' ', substr($key, 5)))));      if (empty($headers[$key])) {        $headers[$key] = $value;      }      else {        $headers[$key] .= "; $value";      }    }  }  return $headers;}/** * Remove headers we do not wish to pass on to the next request. * * @param $headers *   Headers to filter * @return array *   Filtered headers */function _background_process_filter_headers($headers) {  $result = array();  if (empty($headers)) {    return $result;  }  foreach ($headers as $key => $value) {    if (!preg_match('/^(Connection|Keep-Alive|Proxy-Authenticate|Proxy-Authorization|TE|Trailers|Transfer-Encoding|Upgrade|Set-Cookie|Content-Length|Host|Accept-Encoding)$/i', $key)) {      $result[$key] = $value;    }  }  return $result;}/** * Secure a URL by obfuscating the password if present. * * @param $url * @return string *   URL */function _background_process_secure_url($url) {  $url = parse_url($url);  if (!empty($url['pass'])) {    $url['pass'] = 'XXXXXXXX';  }  return _background_process_unparse_url($url);}/** * Reverse logic of parse_url(). * * @param $parsed_url *   Array from parse_url() * @return string *   URL */function _background_process_unparse_url($parsed_url) {  $scheme   = isset($parsed_url['scheme']) ? $parsed_url['scheme'] . '://' : '';  $host     = isset($parsed_url['host']) ? $parsed_url['host'] : '';  $port     = isset($parsed_url['port']) ? ':' . $parsed_url['port'] : '';  $user     = isset($parsed_url['user']) ? $parsed_url['user'] : '';  $pass     = isset($parsed_url['pass']) ? ':' . $parsed_url['pass']  : '';  $pass     = ($user || $pass) ? "$pass@" : '';  $path     = isset($parsed_url['path']) ? $parsed_url['path'] : '';  $query    = isset($parsed_url['query']) ? '?' . $parsed_url['query'] : '';  $fragment = isset($parsed_url['fragment']) ? '#' . $parsed_url['fragment'] : '';  return "$scheme$user$pass$host$port$path$query$fragment";}/** * Shutdown handler for restarting background process. * * @param $process *   Background process object * @param $args *   Array of arguments for callback */function _background_process_restart($process, $args = array()) {  $args = empty($args) ? $process->args : $args;  $new = BackgroundProcess::load($process);  $result = $new->start($process->callback, $args);}
 |