machineName; } /** * The name of a migration group, used to collect related migrations. * * @var string */ protected $group; public function getGroup() { return $this->group; } /** * Detailed information describing the migration. * * @var string */ protected $description; public function getDescription() { return $this->description; } /** * Save options passed to current operation * @var array */ protected $options; public function getOption($option_name) { if (isset($this->options[$option_name])) { return $this->options[$option_name]; } else { return NULL; } } public function getItemLimit() { if (isset($this->options['limit']) && ($this->options['limit']['unit'] == 'items' || $this->options['limit']['unit'] == 'item')) { return $this->options['limit']['value']; } else { return NULL; } } public function getTimeLimit() { if (isset($this->options['limit']) && ($this->options['limit']['unit'] == 'seconds' || $this->options['limit']['unit'] == 'second')) { return $this->options['limit']['value']; } else { return NULL; } } /** * Indicates that we are processing a rollback or import - used to avoid * excess writes in endProcess() * * @var boolean */ protected $processing = FALSE; /** * Are we importing, rolling back, or doing nothing? * * @var enum */ protected $status = MigrationBase::STATUS_IDLE; /** * When the current operation started. * @var int */ protected $starttime; /** * Whether to maintain a history of migration processes in migrate_log * * @var boolean */ protected $logHistory = TRUE; /** * Primary key of the current history record (inserted at the beginning of * a process, to be updated at the end) * * @var int */ protected $logID; /** * Number of "items" processed in the current migration process (whatever that * means for the type of process) * * @var int */ protected $total_processed = 0; /** * List of other Migration classes which should be imported before this one. * E.g., a comment migration class would typically have node and user migrations * as dependencies. * * @var array */ protected $dependencies = array(), $softDependencies = array(); public function getHardDependencies() { return $this->dependencies; } public function getSoftDependencies() { return $this->softDependencies; } public function getDependencies() { return array_merge($this->dependencies, $this->softDependencies); } /** * Name of a function for displaying feedback. It must take the message to display * as its first argument, and a (string) message type as its second argument * (see drush_log()). * @var string */ protected static $displayFunction; public static function setDisplayFunction($display_function) { self::$displayFunction = $display_function; } /** * The fraction of the memory limit at which an operation will be interrupted. * Can be overridden by a Migration subclass if one would like to push the * envelope. Defaults to 85%. * * @var float */ protected $memoryThreshold = 0.85; /** * The PHP memory_limit expressed in bytes. * * @var int */ protected $memoryLimit; /** * The fraction of the time limit at which an operation will be interrupted. * Can be overridden by a Migration subclass if one would like to push the * envelope. Defaults to 90%. * * @var float */ protected $timeThreshold = 0.90; /** * The PHP max_execution_time. * * @var int */ protected $timeLimit; /** * MigrateTeamMember objects representing people involved with this * migration. * * @var array */ protected $team = array(); public function getTeam() { return $this->team; } /** * If provided, an URL for an issue tracking system containing :id where * the issue number will go (e.g., 'http://example.com/project/ticket/:id'). * * @var string */ protected $issuePattern; public function getIssuePattern() { return $this->issuePattern; } /** * If we set an error handler (during import), remember the previous one so * it can be restored. * * @var callback */ protected $previousErrorHandler = NULL; /** * Disabling a migration prevents it from running with --all, or individually * without --force * * @var boolean */ protected $enabled = TRUE; public function getEnabled() { return $this->enabled; } /** * Codes representing the result of a rollback or import process. */ const RESULT_COMPLETED = 1; // All records have been processed const RESULT_INCOMPLETE = 2; // The process has interrupted itself (e.g., the // memory limit is approaching) const RESULT_STOPPED = 3; // The process was stopped externally (e.g., via // drush migrate-stop) const RESULT_FAILED = 4; // The process had a fatal error const RESULT_SKIPPED = 5; // Dependencies are unfulfilled - skip the process const RESULT_DISABLED = 6; // This migration is disabled, skipping /** * Codes representing the current status of a migration, and stored in the * migrate_status table. */ const STATUS_IDLE = 0; const STATUS_IMPORTING = 1; const STATUS_ROLLING_BACK = 2; const STATUS_STOPPING = 3; const STATUS_DISABLED = 4; /** * Message types to be passed to saveMessage() and saved in message tables. * MESSAGE_INFORMATIONAL represents a condition that did not prevent the operation * from succeeding - all others represent different severities of conditions * resulting in a source record not being imported. */ const MESSAGE_ERROR = 1; const MESSAGE_WARNING = 2; const MESSAGE_NOTICE = 3; const MESSAGE_INFORMATIONAL = 4; /** * Get human readable name for a message constant. * * @return string * Name. */ public function getMessageLevelName($constant) { $map = array( MigrationBase::MESSAGE_ERROR => t('Error'), MigrationBase::MESSAGE_WARNING => t('Warning'), MigrationBase::MESSAGE_NOTICE => t('Notice'), MigrationBase::MESSAGE_INFORMATIONAL => t('Informational'), ); return $map[$constant]; } /** * General initialization of a MigrationBase object. */ public function __construct($group = NULL) { $this->machineName = $this->generateMachineName(); if (empty($group)) { $this->group = MigrateGroup::getInstance('default'); } else { $this->group = $group; } // Record the memory limit in bytes $limit = trim(ini_get('memory_limit')); if ($limit == '-1') { $this->memoryLimit = PHP_INT_MAX; } else { if (!is_numeric($limit)) { $last = drupal_strtolower($limit[strlen($limit)-1]); switch ($last) { case 'g': $limit *= 1024; case 'm': $limit *= 1024; case 'k': $limit *= 1024; break; default: throw new Exception(t('Invalid PHP memory_limit !limit', array('!limit' => $limit))); } } $this->memoryLimit = $limit; } // Record the time limit $this->timeLimit = ini_get('max_execution_time'); // Prevent any emails from being sent out on migration global $conf; if (!empty($conf['mail_system'])) { foreach ($conf['mail_system'] as $system => $class) { $conf['mail_system'][$system] = 'MigrateMailIgnore'; } } // Make sure we clear our semaphores in case of abrupt exit register_shutdown_function(array($this, 'endProcess')); } /** * Initialize static members, before any class instances are created. */ static public function staticInitialize() { // Default the displayFunction outputFunction based on context if (function_exists('drush_log')) { self::$displayFunction = 'drush_log'; } else { self::$displayFunction = 'drupal_set_message'; } } /** * Register a new migration process in the migrate_status table. This will * generally be used in two contexts - by the class detection code for * static (one instance per class) migrations, and by the module implementing * dynamic (parameterized class) migrations. * * @param string $class_name * @param string $machine_name * @param array $arguments */ static public function registerMigration($class_name, $machine_name = NULL, array $arguments = array()) { if (!$machine_name) { $machine_name = self::machineFromClass($class_name); } // Register the migration if it's not already there; if it is, // update the class and arguments in case they've changed. db_merge('migrate_status') ->key(array('machine_name' => $machine_name)) ->fields(array( 'class_name' => $class_name, 'arguments' => serialize($arguments) )) ->execute(); } /** * Deregister a migration - remove all traces of it from the database (without * touching any content which was created by this migration). * * @param string $machine_name */ static public function deregisterMigration($machine_name) { $rows_deleted = db_delete('migrate_status') ->condition('machine_name', $machine_name) ->execute(); } /** * By default, the migration machine name is the class name (with the * Migration suffix, if present, stripped). */ protected function generateMachineName() { $class_name = get_class($this); return self::machineFromClass($class_name); } protected static function machineFromClass($class_name) { if (preg_match('/Migration$/', $class_name)) { $machine_name = drupal_substr($class_name, 0, strlen($class_name) - strlen('Migration')); } else { $machine_name = $class_name; } return $machine_name; } /** * Return the single instance of the given migration. * * @param string $machine_name */ static public function getInstance($machine_name, $class_name = NULL, array $arguments = array()) { $migrations = &drupal_static(__FUNCTION__, array()); // Otherwise might miss cache hit on case difference $machine_name_key = drupal_strtolower($machine_name); if (!isset($migrations[$machine_name_key])) { // Skip the query if our caller already made it if (!$class_name) { // See if we know about this migration $row = db_select('migrate_status', 'ms') ->fields('ms', array('class_name', 'arguments')) ->condition('machine_name', $machine_name) ->execute() ->fetchObject(); if ($row) { $class_name = $row->class_name; $arguments = unserialize($row->arguments); } else { // Can't find a migration with this name throw new MigrateException(t('No migration found with machine name !machine', array('!machine' => $machine_name))); } } $migrations[$machine_name_key] = new $class_name($arguments); } return $migrations[$machine_name_key]; } /** * Identifies whether this migration is "dynamic" (that is, allows multiple * instances distinguished by differing parameters). A dynamic class should * override this with a return value of TRUE. */ static public function isDynamic() { return FALSE; } /** * Default to printing messages, but derived classes are expected to save * messages indexed by current source ID. * * @param string $message * The message to record. * @param int $level * Optional message severity (defaults to MESSAGE_ERROR). */ public function saveMessage($message, $level = MigrationBase::MESSAGE_ERROR) { switch ($level) { case MigrationBase::MESSAGE_ERROR: $level = 'error'; break; case MigrationBase::MESSAGE_WARNING: $level = 'warning'; break; case MigrationBase::MESSAGE_NOTICE: $level = 'notice'; break; case MigrationBase::MESSAGE_INFORMATIONAL: $level = 'status'; break; } self::displayMessage($message, $level); } /** * Output the given message appropriately (drush_print/drupal_set_message/etc.) * * @param string $message * The message to output. * @param int $level * Optional message severity as understood by drupal_set_message and drush_log * (defaults to 'error'). */ static public function displayMessage($message, $level = 'error') { call_user_func(self::$displayFunction, $message, $level); } /** * Custom PHP error handler. * TODO: Redundant with hook_watchdog? * * @param $error_level * The level of the error raised. * @param $message * The error message. * @param $filename * The filename that the error was raised in. * @param $line * The line number the error was raised at. * @param $context * An array that points to the active symbol table at the point the error occurred. */ public function errorHandler($error_level, $message, $filename, $line, $context) { if ($error_level & error_reporting()) { $message .= "\n" . t('File !file, line !line', array('!line' => $line, '!file' => $filename)); // Record notices and continue if ($error_level == E_NOTICE || $error_level == E_USER_NOTICE) { $this->saveMessage($message . "(file: $filename, line $line)", MigrationBase::MESSAGE_INFORMATIONAL); } // Simply ignore strict and deprecated errors // Note DEPRECATED constants introduced in PHP 5.3 elseif (!($error_level == E_STRICT || $error_level == 8192 || $error_level == 16384)) { throw new MigrateException($message, MigrationBase::MESSAGE_ERROR); } } } /** * Takes an Exception object and both saves and displays it, pulling additional * information on the location triggering the exception. * * @param Exception $exception * Object representing the exception. * @param boolean $save * Whether to save the message in the migration's mapping table. Set to FALSE * in contexts where this doesn't make sense. */ public function handleException($exception, $save = TRUE) { $result = _drupal_decode_exception($exception); $message = $result['!message'] . ' (' . $result['%file'] . ':' . $result['%line'] . ')'; if ($save) { $this->saveMessage($message); } self::displayMessage($message); } /** * Check the current status of a migration. * @return int * One of the MigrationBase::STATUS_* constants */ public function getStatus() { if (!$this->enabled) { return MigrationBase::STATUS_DISABLED; } $status = db_select('migrate_status', 'ms') ->fields('ms', array('status')) ->condition('machine_name', $this->machineName) ->execute() ->fetchField(); if (!isset($status)) { $status = MigrationBase::STATUS_IDLE; } return $status; } /** * Retrieve the last time an import operation completed successfully. * @return string * Date/time string, formatted... How? Default DB server format? */ public function getLastImported() { $last_imported = db_select('migrate_log', 'ml') ->fields('ml', array('endtime')) ->condition('machine_name', $this->machineName) ->isNotNull('endtime') ->orderBy('endtime', 'DESC') ->execute() ->fetchField(); if ($last_imported) { $last_imported = date('Y-m-d H:i:s', $last_imported/1000); } else { $last_imported = ''; } return $last_imported; } /** * Fetch the current highwater mark for updated content. * * @return string * The highwater mark. */ public function getHighwater() { $highwater = db_select('migrate_status', 'ms') ->fields('ms', array('highwater')) ->condition('machine_name', $this->machineName) ->execute() ->fetchField(); return $highwater; } /** * Save the highwater mark for this migration (but not when using an idlist). * * @param mixed $highwater * Highwater mark to save * @param boolean $force * If TRUE, save even if it's lower than the previous value. */ protected function saveHighwater($highwater, $force = FALSE) { if (!isset($this->options['idlist'])) { $query = db_update('migrate_status') ->fields(array('highwater' => $highwater)) ->condition('machine_name', $this->machineName); if (!$force) { if (!empty($this->highwaterField['type']) && $this->highwaterField['type'] == 'int') { // If the highwater is an integer type, we need to force the DB server // to treat the varchar highwater field as an integer (otherwise it will // think '5' > '10'). CAST(highwater AS INTEGER) would be ideal, but won't // work in MySQL. This hack is thought to be portable. $query->where('(highwater+0) < :highwater', array(':highwater' => $highwater)); } else { $query->condition('highwater', $highwater, '<'); } } $query->execute(); } } /** * Retrieve the last throughput for current Migration (items / minute). * @return integer */ public function getLastThroughput() { $last_throughput = 0; $row = db_select('migrate_log', 'ml') ->fields('ml', array('starttime', 'endtime', 'numprocessed')) ->condition('machine_name', $this->machineName) ->condition('process_type', 1) ->isNotNull('endtime') ->orderBy('starttime', 'DESC') ->execute() ->fetchObject(); if ($row) { $elapsed = ($row->endtime - $row->starttime)/1000; if ($elapsed > 0) { $last_throughput = round(($row->numprocessed / $elapsed) * 60); } } return $last_throughput; } /** * Reports whether this migration process is complete. For a Migration, for * example, this would be whether all available source rows have been processed. * Other MigrationBase classes will need to return TRUE/FALSE appropriately. */ abstract public function isComplete(); /** * Reports whether all (hard) dependencies have completed migration */ protected function dependenciesComplete($rollback = FALSE) { if ($rollback) { foreach (migrate_migrations() as $migration) { $dependencies = $migration->getHardDependencies(); if (array_search($this->machineName, $dependencies) !== FALSE) { if (method_exists($migration, 'importedCount') && $migration->importedCount() > 0) { return FALSE; } } } } else { foreach ($this->dependencies as $dependency) { $migration = MigrationBase::getInstance($dependency); if (!$migration->isComplete()) { return FALSE; } } } return TRUE; } /** * Returns an array of the migration's dependencies that are incomplete. */ public function incompleteDependencies() { $incomplete = array(); foreach ($this->getDependencies() as $dependency) { $migration = MigrationBase::getInstance($dependency); if (!$migration->isComplete()) { $incomplete[] = $dependency; } } return $incomplete; } /** * Begin a process, ensuring only one process can be active * at once on a given migration. * * @param int $newStatus * MigrationBase::STATUS_IMPORTING or MigrationBase::STATUS_ROLLING_BACK */ protected function beginProcess($newStatus) { // So hook_watchdog() knows what migration (if any) is running self::$currentMigration = $this; // Try to make the semaphore handling atomic (depends on DB support) $transaction = db_transaction(); $this->starttime = microtime(TRUE); // Check to make sure there's no process already running for this migration $status = $this->getStatus(); if ($status != MigrationBase::STATUS_IDLE) { throw new MigrateException(t('There is already an active process on !machine_name', array('!machine_name' => $this->machineName))); } $this->processing = TRUE; $this->status = $newStatus; db_merge('migrate_status') ->key(array('machine_name' => $this->machineName)) ->fields(array('class_name' => get_class($this), 'status' => $newStatus)) ->execute(); // Set an error handler for imports if ($newStatus == MigrationBase::STATUS_IMPORTING) { $this->previousErrorHandler = set_error_handler(array($this, 'errorHandler')); } // Save the initial history record if ($this->logHistory) { $this->logID = db_insert('migrate_log') ->fields(array( 'machine_name' => $this->machineName, 'process_type' => $newStatus, 'starttime' => round(microtime(TRUE) * 1000), 'initialHighwater' => $this->getHighwater(), )) ->execute(); } } /** * End a rollback or import process, releasing the semaphore. Note that it must * be public to be callable as the shutdown function. */ public function endProcess() { if ($this->previousErrorHandler) { set_error_handler($this->previousErrorHandler); $this->previousErrorHandler = NULL; } if ($this->processing) { $this->status = MigrationBase::STATUS_IDLE; $fields = array('class_name' => get_class($this), 'status' => MigrationBase::STATUS_IDLE); db_merge('migrate_status') ->key(array('machine_name' => $this->machineName)) ->fields($fields) ->execute(); // Complete the log record if ($this->logHistory) { try { db_merge('migrate_log') ->key(array('mlid' => $this->logID)) ->fields(array( 'endtime' => round(microtime(TRUE) * 1000), 'finalhighwater' => $this->getHighwater(), 'numprocessed' => $this->total_processed, )) ->execute(); } catch (PDOException $e) { Migration::displayMessage(t('Could not log operation on migration !name - possibly MigrationBase::beginProcess() was not called', array('!name' => $this->machineName))); } } $this->processing = FALSE; } self::$currentMigration = NULL; } /** * Signal that any current import or rollback process should end itself at * the earliest opportunity */ public function stopProcess() { // Do not change the status of an idle migration db_update('migrate_status') ->fields(array('status' => MigrationBase::STATUS_STOPPING)) ->condition('machine_name', $this->machineName) ->condition('status', MigrationBase::STATUS_IDLE, '<>') ->execute(); } /** * Reset the status of the migration to IDLE (to be used when the status * gets stuck, e.g. if a process core-dumped) */ public function resetStatus() { // Do not change the status of an already-idle migration db_update('migrate_status') ->fields(array('status' => MigrationBase::STATUS_IDLE)) ->condition('machine_name', $this->machineName) ->condition('status', MigrationBase::STATUS_IDLE, '<>') ->execute(); } /** * Perform an operation during the rollback phase. * * @param array $options * List of options provided (usually from a drush command). Specific to * the derived class. */ public function processRollback(array $options = array()) { if ($this->enabled) { $return = MigrationBase::RESULT_COMPLETED; if (method_exists($this, 'rollback')) { $this->options = $options; if (!isset($options['force'])) { if (!$this->dependenciesComplete(TRUE)) { return MigrationBase::RESULT_SKIPPED; } } $this->beginProcess(MigrationBase::STATUS_ROLLING_BACK); try { $return = $this->rollback(); } catch (Exception $exception) { // If something bad happened, make sure we clear the semaphore $this->endProcess(); throw $exception; } $this->endProcess(); } } else { $return = MigrationBase::RESULT_DISABLED; } return $return; } /** * Perform an operation during the import phase * * @param array $options * List of options provided (usually from a drush command). Specific to * the derived class. */ public function processImport(array $options = array()) { if ($this->enabled) { $return = MigrationBase::RESULT_COMPLETED; if (method_exists($this, 'import')) { $this->options = $options; if (!isset($options['force']) || !$options['force']) { if (!$this->dependenciesComplete()) { return MigrationBase::RESULT_SKIPPED; } } $this->beginProcess(MigrationBase::STATUS_IMPORTING); try { $return = $this->import(); } catch (Exception $exception) { // If something bad happened, make sure we clear the semaphore $this->endProcess(); throw $exception; } if ($return == MigrationBase::RESULT_COMPLETED && isset($this->total_successes)) { $overallThroughput = round(60*$this->total_successes / (microtime(TRUE) - $this->starttime)); } else { $overallThroughput = 0; } $this->endProcess($overallThroughput); } } else { $return = MigrationBase::RESULT_DISABLED; } return $return; } /** * A derived migration class does the actual rollback or import work in these * methods - we cannot declare them abstract because some classes may define * only one. * * abstract protected function rollback(); * abstract protected function import(); */ /** * Test whether we've exceeded the desired memory threshold. If so, output a message. * * @return boolean * TRUE if the threshold is exceeded, FALSE if not. */ protected function memoryExceeded() { $usage = memory_get_usage(); $pct_memory = $usage/$this->memoryLimit; if ($pct_memory > $this->memoryThreshold) { self::displayMessage( t('Memory usage is !usage (!pct% of limit !limit), resetting statics', array('!pct' => round($pct_memory*100), '!usage' => format_size($usage), '!limit' => format_size($this->memoryLimit))), 'warning'); // First, try resetting Drupal's static storage - this frequently releases // plenty of memory to continue drupal_static_reset(); $usage = memory_get_usage(); $pct_memory = $usage/$this->memoryLimit; // Use a lower threshold - we don't want to be in a situation where we keep // coming back here and trimming a tiny amount if ($pct_memory > (.90 * $this->memoryThreshold)) { self::displayMessage( t('Memory usage is now !usage (!pct% of limit !limit), not enough reclaimed, starting new batch', array('!pct' => round($pct_memory*100), '!usage' => format_size($usage), '!limit' => format_size($this->memoryLimit))), 'warning'); return TRUE; } else { self::displayMessage( t('Memory usage is now !usage (!pct% of limit !limit), reclaimed enough, continuing', array('!pct' => round($pct_memory*100), '!usage' => format_size($usage), '!limit' => format_size($this->memoryLimit))), 'warning'); return FALSE; } } else { return FALSE; } } /** * Test whether we're approaching the PHP time limit. * * @return boolean * TRUE if the threshold is exceeded, FALSE if not. */ protected function timeExceeded() { if ($this->timeLimit == 0) { return FALSE; } $time_elapsed = time() - REQUEST_TIME; $pct_time = $time_elapsed / $this->timeLimit; if ($pct_time > $this->timeThreshold) { return TRUE; } else { return FALSE; } } /** * Test whether we've exceeded the designated time limit. * * @return boolean * TRUE if the threshold is exceeded, FALSE if not. */ protected function timeOptionExceeded() { if (!$timelimit = $this->getTimeLimit()) { return FALSE; } $time_elapsed = time() - REQUEST_TIME; if ($time_elapsed >= $timelimit) { return TRUE; } else { return FALSE; } } /** * Convert an incoming string (which may be a UNIX timestamp, or an arbitrarily-formatted * date/time string) to a UNIX timestamp. * * @param string $value */ static public function timestamp($value) { // Default empty values to now if (empty($value)) { return time(); } // Does it look like it's already a timestamp? Just return it if (is_numeric($value)) { return $value; } $time = strtotime($value); if ($time == FALSE) { // Handles form YYYY-MM-DD HH:MM:SS.garbage if (drupal_strlen($value) > 19) { $time = strtotime(drupal_substr($value, 0, 19)); } } return $time; } } // Make sure static members (in particular, $displayFunction) get // initialized even if there are no class instances. MigrationBase::staticInitialize();