| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582 | <?phpnamespace Drupal\migrate;use Drupal\Component\Utility\Bytes;use Drupal\Core\Utility\Error;use Drupal\Core\StringTranslation\StringTranslationTrait;use Drupal\migrate\Event\MigrateEvents;use Drupal\migrate\Event\MigrateImportEvent;use Drupal\migrate\Event\MigratePostRowSaveEvent;use Drupal\migrate\Event\MigratePreRowSaveEvent;use Drupal\migrate\Event\MigrateRollbackEvent;use Drupal\migrate\Event\MigrateRowDeleteEvent;use Drupal\migrate\Exception\RequirementsException;use Drupal\migrate\Plugin\MigrateIdMapInterface;use Drupal\migrate\Plugin\MigrationInterface;use Symfony\Component\EventDispatcher\EventDispatcherInterface;/** * Defines a migrate executable class. */class MigrateExecutable implements MigrateExecutableInterface {  use StringTranslationTrait;  /**   * The configuration of the migration to do.   *   * @var \Drupal\migrate\Plugin\MigrationInterface   */  protected $migration;  /**   * Status of one row.   *   * The value is a MigrateIdMapInterface::STATUS_* constant, for example:   * STATUS_IMPORTED.   *   * @var int   */  protected $sourceRowStatus;  /**   * The ratio of the memory limit at which an operation will be interrupted.   *   * @var float   */  protected $memoryThreshold = 0.85;  /**   * The PHP memory_limit expressed in bytes.   *   * @var int   */  protected $memoryLimit;  /**   * The configuration values of the source.   *   * @var array   */  protected $sourceIdValues;  /**   * An array of counts. Initially used for cache hit/miss tracking.   *   * @var array   */  protected $counts = [];  /**   * The source.   *   * @var \Drupal\migrate\Plugin\MigrateSourceInterface   */  protected $source;  /**   * The event dispatcher.   *   * @var \Symfony\Component\EventDispatcher\EventDispatcherInterface   */  protected $eventDispatcher;  /**   * Migration message service.   *   * @todo https://www.drupal.org/node/2822663 Make this protected.   *   * @var \Drupal\migrate\MigrateMessageInterface   */  public $message;  /**   * Constructs a MigrateExecutable and verifies and sets the memory limit.   *   * @param \Drupal\migrate\Plugin\MigrationInterface $migration   *   The migration to run.   * @param \Drupal\migrate\MigrateMessageInterface $message   *   (optional) The migrate message service.   * @param \Symfony\Component\EventDispatcher\EventDispatcherInterface $event_dispatcher   *   (optional) The event dispatcher.   *   * @throws \Drupal\migrate\MigrateException   */  public function __construct(MigrationInterface $migration, MigrateMessageInterface $message = NULL, EventDispatcherInterface $event_dispatcher = NULL) {    $this->migration = $migration;    $this->message = $message ?: new MigrateMessage();    $this->getIdMap()->setMessage($this->message);    $this->eventDispatcher = $event_dispatcher;    // Record the memory limit in bytes    $limit = trim(ini_get('memory_limit'));    if ($limit == '-1') {      $this->memoryLimit = PHP_INT_MAX;    }    else {      $this->memoryLimit = Bytes::toInt($limit);    }  }  /**   * Returns the source.   *   * Makes sure source is initialized based on migration settings.   *   * @return \Drupal\migrate\Plugin\MigrateSourceInterface   *   The source.   */  protected function getSource() {    if (!isset($this->source)) {      $this->source = $this->migration->getSourcePlugin();    }    return $this->source;  }  /**   * Gets the event dispatcher.   *   * @return \Symfony\Component\EventDispatcher\EventDispatcherInterface   */  protected function getEventDispatcher() {    if (!$this->eventDispatcher) {      $this->eventDispatcher = \Drupal::service('event_dispatcher');    }    return $this->eventDispatcher;  }  /**   * {@inheritdoc}   */  public function import() {    // Only begin the import operation if the migration is currently idle.    if ($this->migration->getStatus() !== MigrationInterface::STATUS_IDLE) {      $this->message->display($this->t('Migration @id is busy with another operation: @status',        [          '@id' => $this->migration->id(),          '@status' => $this->t($this->migration->getStatusLabel()),        ]), 'error');      return MigrationInterface::RESULT_FAILED;    }    $this->getEventDispatcher()->dispatch(MigrateEvents::PRE_IMPORT, new MigrateImportEvent($this->migration, $this->message));    // Knock off migration if the requirements haven't been met.    try {      $this->migration->checkRequirements();    }    catch (RequirementsException $e) {      $this->message->display(        $this->t(          'Migration @id did not meet the requirements. @message @requirements',          [            '@id' => $this->migration->id(),            '@message' => $e->getMessage(),            '@requirements' => $e->getRequirementsString(),          ]        ),        'error'      );      return MigrationInterface::RESULT_FAILED;    }    $this->migration->setStatus(MigrationInterface::STATUS_IMPORTING);    $return = MigrationInterface::RESULT_COMPLETED;    $source = $this->getSource();    $id_map = $this->getIdMap();    try {      $source->rewind();    }    catch (\Exception $e) {      $this->message->display(        $this->t('Migration failed with source plugin exception: @e in @file line @line', [          '@e' => $e->getMessage(),          '@file' => $e->getFile(),          '@line' => $e->getLine(),        ]), 'error');      $this->migration->setStatus(MigrationInterface::STATUS_IDLE);      return MigrationInterface::RESULT_FAILED;    }    $destination = $this->migration->getDestinationPlugin();    while ($source->valid()) {      $row = $source->current();      $this->sourceIdValues = $row->getSourceIdValues();      try {        $this->processRow($row);        $save = TRUE;      }      catch (MigrateException $e) {        $this->getIdMap()->saveIdMapping($row, [], $e->getStatus());        $this->saveMessage($e->getMessage(), $e->getLevel());        $save = FALSE;      }      catch (MigrateSkipRowException $e) {        if ($e->getSaveToMap()) {          $id_map->saveIdMapping($row, [], MigrateIdMapInterface::STATUS_IGNORED);        }        if ($message = trim($e->getMessage())) {          $this->saveMessage($message, MigrationInterface::MESSAGE_INFORMATIONAL);        }        $save = FALSE;      }      if ($save) {        try {          $this->getEventDispatcher()->dispatch(MigrateEvents::PRE_ROW_SAVE, new MigratePreRowSaveEvent($this->migration, $this->message, $row));          $destination_ids = $id_map->lookupDestinationIds($this->sourceIdValues);          $destination_id_values = $destination_ids ? reset($destination_ids) : [];          $destination_id_values = $destination->import($row, $destination_id_values);          $this->getEventDispatcher()->dispatch(MigrateEvents::POST_ROW_SAVE, new MigratePostRowSaveEvent($this->migration, $this->message, $row, $destination_id_values));          if ($destination_id_values) {            // We do not save an idMap entry for config.            if ($destination_id_values !== TRUE) {              $id_map->saveIdMapping($row, $destination_id_values, $this->sourceRowStatus, $destination->rollbackAction());            }          }          else {            $id_map->saveIdMapping($row, [], MigrateIdMapInterface::STATUS_FAILED);            if (!$id_map->messageCount()) {              $message = $this->t('New object was not saved, no error provided');              $this->saveMessage($message);              $this->message->display($message);            }          }        }        catch (MigrateException $e) {          $this->getIdMap()->saveIdMapping($row, [], $e->getStatus());          $this->saveMessage($e->getMessage(), $e->getLevel());        }        catch (\Exception $e) {          $this->getIdMap()->saveIdMapping($row, [], MigrateIdMapInterface::STATUS_FAILED);          $this->handleException($e);        }      }      $this->sourceRowStatus = MigrateIdMapInterface::STATUS_IMPORTED;      // Check for memory exhaustion.      if (($return = $this->checkStatus()) != MigrationInterface::RESULT_COMPLETED) {        break;      }      // If anyone has requested we stop, return the requested result.      if ($this->migration->getStatus() == MigrationInterface::STATUS_STOPPING) {        $return = $this->migration->getInterruptionResult();        $this->migration->clearInterruptionResult();        break;      }      try {        $source->next();      }      catch (\Exception $e) {        $this->message->display(          $this->t('Migration failed with source plugin exception: @e in @file line @line', [            '@e' => $e->getMessage(),            '@file' => $e->getFile(),            '@line' => $e->getLine(),          ]), 'error');        $this->migration->setStatus(MigrationInterface::STATUS_IDLE);        return MigrationInterface::RESULT_FAILED;      }    }    $this->getEventDispatcher()->dispatch(MigrateEvents::POST_IMPORT, new MigrateImportEvent($this->migration, $this->message));    $this->migration->setStatus(MigrationInterface::STATUS_IDLE);    return $return;  }  /**   * {@inheritdoc}   */  public function rollback() {    // Only begin the rollback operation if the migration is currently idle.    if ($this->migration->getStatus() !== MigrationInterface::STATUS_IDLE) {      $this->message->display($this->t('Migration @id is busy with another operation: @status', ['@id' => $this->migration->id(), '@status' => $this->t($this->migration->getStatusLabel())]), 'error');      return MigrationInterface::RESULT_FAILED;    }    // Announce that rollback is about to happen.    $this->getEventDispatcher()->dispatch(MigrateEvents::PRE_ROLLBACK, new MigrateRollbackEvent($this->migration));    // Optimistically assume things are going to work out; if not, $return will be    // updated to some other status.    $return = MigrationInterface::RESULT_COMPLETED;    $this->migration->setStatus(MigrationInterface::STATUS_ROLLING_BACK);    $id_map = $this->getIdMap();    $destination = $this->migration->getDestinationPlugin();    // Loop through each row in the map, and try to roll it back.    $id_map->rewind();    while ($id_map->valid()) {      $destination_key = $id_map->currentDestination();      if ($destination_key) {        $map_row = $id_map->getRowByDestination($destination_key);        if ($map_row['rollback_action'] == MigrateIdMapInterface::ROLLBACK_DELETE) {          $this->getEventDispatcher()            ->dispatch(MigrateEvents::PRE_ROW_DELETE, new MigrateRowDeleteEvent($this->migration, $destination_key));          $destination->rollback($destination_key);          $this->getEventDispatcher()            ->dispatch(MigrateEvents::POST_ROW_DELETE, new MigrateRowDeleteEvent($this->migration, $destination_key));        }        // We're now done with this row, so remove it from the map.        $id_map->deleteDestination($destination_key);      }      else {        // If there is no destination key the import probably failed and we can        // remove the row without further action.        $source_key = $id_map->currentSource();        $id_map->delete($source_key);      }      $id_map->next();      // Check for memory exhaustion.      if (($return = $this->checkStatus()) != MigrationInterface::RESULT_COMPLETED) {        break;      }      // If anyone has requested we stop, return the requested result.      if ($this->migration->getStatus() == MigrationInterface::STATUS_STOPPING) {        $return = $this->migration->getInterruptionResult();        $this->migration->clearInterruptionResult();        break;      }    }    // Notify modules that rollback attempt was complete.    $this->getEventDispatcher()->dispatch(MigrateEvents::POST_ROLLBACK, new MigrateRollbackEvent($this->migration));    $this->migration->setStatus(MigrationInterface::STATUS_IDLE);    return $return;  }  /**   * Get the ID map from the current migration.   *   * @return \Drupal\migrate\Plugin\MigrateIdMapInterface   *   The ID map.   */  protected function getIdMap() {    return $this->migration->getIdMap();  }  /**   * {@inheritdoc}   */  public function processRow(Row $row, array $process = NULL, $value = NULL) {    foreach ($this->migration->getProcessPlugins($process) as $destination => $plugins) {      $multiple = FALSE;      /** @var $plugin \Drupal\migrate\Plugin\MigrateProcessInterface */      foreach ($plugins as $plugin) {        $definition = $plugin->getPluginDefinition();        // Many plugins expect a scalar value but the current value of the        // pipeline might be multiple scalars (this is set by the previous        // plugin) and in this case the current value needs to be iterated        // and each scalar separately transformed.        if ($multiple && !$definition['handle_multiples']) {          $new_value = [];          if (!is_array($value)) {            throw new MigrateException(sprintf('Pipeline failed at %s plugin for destination %s: %s received instead of an array,', $plugin->getPluginId(), $destination, $value));          }          $break = FALSE;          foreach ($value as $scalar_value) {            try {              $new_value[] = $plugin->transform($scalar_value, $this, $row, $destination);            }            catch (MigrateSkipProcessException $e) {              $new_value[] = NULL;              $break = TRUE;            }          }          $value = $new_value;          if ($break) {            break;          }        }        else {          try {            $value = $plugin->transform($value, $this, $row, $destination);          }          catch (MigrateSkipProcessException $e) {            $value = NULL;            break;          }          $multiple = $plugin->multiple();        }      }      // Ensure all values, including nulls, are migrated.      if ($plugins) {        if (isset($value)) {          $row->setDestinationProperty($destination, $value);        }        else {          $row->setEmptyDestinationProperty($destination);        }      }      // Reset the value.      $value = NULL;    }  }  /**   * Fetches the key array for the current source record.   *   * @return array   *   The current source IDs.   */  protected function currentSourceIds() {    return $this->getSource()->getCurrentIds();  }  /**   * {@inheritdoc}   */  public function saveMessage($message, $level = MigrationInterface::MESSAGE_ERROR) {    $this->getIdMap()->saveMessage($this->sourceIdValues, $message, $level);  }  /**   * Takes an Exception object and both saves and displays it.   *   * Pulls in additional information on the location triggering the exception.   *   * @param \Exception $exception   *   Object representing the exception.   * @param bool $save   *   (optional) Whether to save the message in the migration's mapping table.   *   Set to FALSE in contexts where this doesn't make sense.   */  protected function handleException(\Exception $exception, $save = TRUE) {    $result = Error::decodeException($exception);    $message = $result['@message'] . ' (' . $result['%file'] . ':' . $result['%line'] . ')';    if ($save) {      $this->saveMessage($message);    }    $this->message->display($message, 'error');  }  /**   * Checks for exceptional conditions, and display feedback.   */  protected function checkStatus() {    if ($this->memoryExceeded()) {      return MigrationInterface::RESULT_INCOMPLETE;    }    return MigrationInterface::RESULT_COMPLETED;  }  /**   * Tests whether we've exceeded the desired memory threshold.   *   * If so, output a message.   *   * @return bool   *   TRUE if the threshold is exceeded, otherwise FALSE.   */  protected function memoryExceeded() {    $usage = $this->getMemoryUsage();    $pct_memory = $usage / $this->memoryLimit;    if (!$threshold = $this->memoryThreshold) {      return FALSE;    }    if ($pct_memory > $threshold) {      $this->message->display(        $this->t(          'Memory usage is @usage (@pct% of limit @limit), reclaiming memory.',          [            '@pct' => round($pct_memory * 100),            '@usage' => $this->formatSize($usage),            '@limit' => $this->formatSize($this->memoryLimit),          ]        ),        'warning'      );      $usage = $this->attemptMemoryReclaim();      $pct_memory = $usage / $this->memoryLimit;      // Use a lower threshold - we don't want to be in a situation where we keep      // coming back here and trimming a tiny amount      if ($pct_memory > (0.90 * $threshold)) {        $this->message->display(          $this->t(            'Memory usage is now @usage (@pct% of limit @limit), not enough reclaimed, starting new batch',            [              '@pct' => round($pct_memory * 100),              '@usage' => $this->formatSize($usage),              '@limit' => $this->formatSize($this->memoryLimit),            ]          ),          'warning'        );        return TRUE;      }      else {        $this->message->display(          $this->t(            'Memory usage is now @usage (@pct% of limit @limit), reclaimed enough, continuing',            [              '@pct' => round($pct_memory * 100),              '@usage' => $this->formatSize($usage),              '@limit' => $this->formatSize($this->memoryLimit),            ]          ),          'warning');        return FALSE;      }    }    else {      return FALSE;    }  }  /**   * Returns the memory usage so far.   *   * @return int   *   The memory usage.   */  protected function getMemoryUsage() {    return memory_get_usage();  }  /**   * Tries to reclaim memory.   *   * @return int   *   The memory usage after reclaim.   */  protected function attemptMemoryReclaim() {    // First, try resetting Drupal's static storage - this frequently releases    // plenty of memory to continue.    drupal_static_reset();    // Entity storage can blow up with caches so clear them out.    $entity_type_manager = \Drupal::entityTypeManager();    foreach ($entity_type_manager->getDefinitions() as $id => $definition) {      $entity_type_manager->getStorage($id)->resetCache();    }    // @TODO: explore resetting the container.    // Run garbage collector to further reduce memory.    gc_collect_cycles();    return memory_get_usage();  }  /**   * Generates a string representation for the given byte count.   *   * @param int $size   *   A size in bytes.   *   * @return string   *   A translated string representation of the size.   */  protected function formatSize($size) {    return format_size($size);  }}
 |