migration.inc 43 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288
  1. <?php
  2. /**
  3. * @file
  4. * Defines the base class for import/rollback processes.
  5. */
  6. /**
  7. * The base class for all import objects. This is where most of the smarts
  8. * of the migrate module resides. Migrations are created by deriving from this
  9. * class, and in the constructor (after calling parent::__construct()) initializing
  10. * at a minimum the name, description, source, and destination properties. The constructor
  11. * will also usually make several calls to addFieldMapping().
  12. */
  13. abstract class Migration extends MigrationBase {
  14. /**
  15. * Source object for the migration, derived from MigrateSource.
  16. *
  17. * @var MigrateSource
  18. */
  19. protected $source;
  20. public function getSource() {
  21. return $this->source;
  22. }
  23. /**
  24. * Destination object for the migration, derived from MigrateDestination.
  25. *
  26. * @var MigrateDestination
  27. */
  28. protected $destination;
  29. public function getDestination() {
  30. return $this->destination;
  31. }
  32. /**
  33. * Map object tracking relationships between source and destination data
  34. *
  35. * @var MigrateMap
  36. */
  37. protected $map;
  38. public function getMap() {
  39. return $this->map;
  40. }
  41. /**
  42. * Indicate whether the primary system of record for this migration is the
  43. * source, or the destination (Drupal). In the source case, migration of
  44. * an existing object will completely replace the Drupal object with data from
  45. * the source side. In the destination case, the existing Drupal object will
  46. * be loaded, then changes from the source applied; also, rollback will not be
  47. * supported.
  48. *
  49. * @var int
  50. */
  51. const SOURCE = 1;
  52. const DESTINATION = 2;
  53. protected $systemOfRecord = Migration::SOURCE;
  54. public function getSystemOfRecord() {
  55. return $this->systemOfRecord;
  56. }
  57. /**
  58. * Specify value of needs_update for current map row. Usually set by
  59. * MigrateFieldHandler implementations.
  60. *
  61. * @var int
  62. */
  63. public $needsUpdate = MigrateMap::STATUS_IMPORTED;
  64. /**
  65. * Simple mappings between destination fields (keys) and source fields (values).
  66. *
  67. * @var array
  68. */
  69. protected $fieldMappings = array();
  70. public function getFieldMappings() {
  71. return $this->fieldMappings;
  72. }
  73. /**
  74. * An array of counts. Initially used for cache hit/miss tracking.
  75. *
  76. * @var array
  77. */
  78. protected $counts = array();
  79. /**
  80. * When performing a bulkRollback(), the maximum number of items to pass in
  81. * a single call. Can be overridden in derived class constructor.
  82. *
  83. * @var int
  84. */
  85. protected $rollbackBatchSize = 50;
  86. /**
  87. * If present, an array with keys name and alias (optional). Name refers to
  88. * the source columns used for tracking highwater marks. alias is an
  89. * optional table alias.
  90. *
  91. * @var array
  92. */
  93. protected $highwaterField = array();
  94. public function getHighwaterField() {
  95. return $this->highwaterField;
  96. }
  97. protected $currentSourceKey;
  98. /**
  99. * The object currently being constructed
  100. * @var stdClass
  101. */
  102. protected $destinationValues;
  103. /**
  104. * The current data row retrieved from the source.
  105. * @var stdClass
  106. */
  107. protected $sourceValues;
  108. /**
  109. * General initialization of a Migration object.
  110. */
  111. public function __construct($group = NULL) {
  112. parent::__construct($group);
  113. }
  114. /**
  115. * Deregister a migration - remove all traces of it from the database (without
  116. * touching any content which was created by this migration).
  117. *
  118. * We'd like to do this at uninstall time, but the implementing module is
  119. * already disabled, so we can't instantiate it to get at the map. This can
  120. * be done in hook_disable(), however.
  121. *
  122. * @param string $machine_name
  123. */
  124. static public function deregisterMigration($machine_name) {
  125. try {
  126. // Remove map and message tables
  127. $migration = self::getInstance($machine_name);
  128. $migration->map->destroy();
  129. // TODO: Clear log entries? Or keep for historical purposes?
  130. // Call the parent deregistration (which clears migrate_status) last, the
  131. // above will reference it.
  132. parent::deregisterMigration($machine_name);
  133. }
  134. catch (Exception $e) {
  135. // Fail silently if it's already gone
  136. }
  137. }
  138. ////////////////////////////////////////////////////////////////////
  139. // Processing
  140. /**
  141. * Add a mapping for a destination field, specifying a source field and/or
  142. * a default value.
  143. *
  144. * @param string $destinationField
  145. * Name of the destination field.
  146. * @param string $sourceField
  147. * Name of the source field (optional).
  148. */
  149. public function addFieldMapping($destination_field, $source_field = NULL) {
  150. // Warn of duplicate mappings
  151. if (!is_null($destination_field) && isset($this->fieldMappings[$destination_field])) {
  152. self::displayMessage(
  153. t('!name addFieldMapping: !dest was previously mapped, overridden',
  154. array('!name' => $this->machineName, '!dest' => $destination_field)),
  155. 'warning');
  156. }
  157. $mapping = new MigrateFieldMapping($destination_field, $source_field);
  158. if (is_null($destination_field)) {
  159. $this->fieldMappings[] = $mapping;
  160. }
  161. else {
  162. $this->fieldMappings[$destination_field] = $mapping;
  163. }
  164. return $mapping;
  165. }
  166. /**
  167. * Remove any existing mappings for a given destination or source field.
  168. *
  169. * @param string $destination_field
  170. * Name of the destination field.
  171. * @param string $source_field
  172. * Name of the source field.
  173. */
  174. public function removeFieldMapping($destination_field, $source_field = NULL) {
  175. if (isset($destination_field)) {
  176. unset($this->fieldMappings[$destination_field]);
  177. }
  178. if (isset($source_field)) {
  179. foreach ($this->fieldMappings as $key => $mapping) {
  180. if ($mapping->getSourceField() == $source_field) {
  181. unset($this->fieldMappings[$key]);
  182. }
  183. }
  184. }
  185. }
  186. /**
  187. * Shortcut for adding several fields which have the same name on both source
  188. * and destination sides.
  189. *
  190. * @param array $fields
  191. * List of field names to map.
  192. */
  193. public function addSimpleMappings(array $fields) {
  194. foreach ($fields as $field) {
  195. $this->addFieldMapping($field, $field);
  196. }
  197. }
  198. /**
  199. * Shortcut for adding several destination fields which are to be explicitly
  200. * not migrated.
  201. *
  202. * @param array $fields
  203. * List of fields to mark as not for migration.
  204. *
  205. * @param string $issue_group
  206. * Issue group name to apply to the generated mappings (defaults to 'DNM').
  207. */
  208. public function addUnmigratedDestinations(array $fields, $issue_group = NULL) {
  209. if (!$issue_group) {
  210. $issue_group = t('DNM');
  211. }
  212. foreach ($fields as $field) {
  213. $this->addFieldMapping($field)
  214. ->issueGroup($issue_group);
  215. }
  216. }
  217. /**
  218. * Shortcut for adding several source fields which are to be explicitly
  219. * not migrated.
  220. *
  221. * @param array $fields
  222. * List of fields to mark as not for migration.
  223. *
  224. * @param string $issue_group
  225. * Issue group name to apply to the generated mappings (defaults to 'DNM').
  226. */
  227. public function addUnmigratedSources(array $fields, $issue_group = NULL) {
  228. if (!$issue_group) {
  229. $issue_group = t('DNM');
  230. }
  231. foreach ($fields as $field) {
  232. $this->addFieldMapping(NULL, $field)
  233. ->issueGroup($issue_group);
  234. }
  235. }
  236. /**
  237. * Reports whether this migration process is complete (i.e., all available
  238. * source rows have been processed).
  239. */
  240. public function isComplete() {
  241. $total = $this->source->count(TRUE);
  242. // If the source is uncountable, we have no way of knowing if it's
  243. // complete, so stipulate that it is.
  244. if ($total < 0) {
  245. return TRUE;
  246. }
  247. $processed = $this->processedCount();
  248. return $total <= $processed;
  249. }
  250. /**
  251. * Override MigrationBase::beginProcess, to make sure the map/message tables
  252. * are present.
  253. *
  254. * @param int $newStatus
  255. * Migration::STATUS_IMPORTING or Migration::STATUS_ROLLING_BACK
  256. */
  257. protected function beginProcess($newStatus) {
  258. parent::beginProcess($newStatus);
  259. // Do some standard setup
  260. if (isset($this->options['feedback']) && isset($this->options['feedback']['value']) &&
  261. isset($this->options['feedback']['unit'])) {
  262. $this->feedback = $this->options['feedback']['value'];
  263. $this->feedback_unit = $this->options['feedback']['unit'];
  264. if ($this->feedback_unit == 'item') {
  265. $this->feedback_unit = 'items';
  266. }
  267. elseif ($this->feedback_unit == 'second') {
  268. $this->feedback_unit = 'seconds';
  269. }
  270. }
  271. $this->lastfeedback = $this->starttime;
  272. $this->total_processed = $this->total_successes =
  273. $this->processed_since_feedback = $this->successes_since_feedback = 0;
  274. // Call pre-process methods
  275. if ($this->status == Migration::STATUS_IMPORTING) {
  276. $this->preImport();
  277. }
  278. elseif ($this->status == Migration::STATUS_ROLLING_BACK) {
  279. $this->preRollback();
  280. }
  281. }
  282. /**
  283. * Override MigrationBase::endProcess, to call post hooks. Note that it must
  284. * be public to be callable as the shutdown function.
  285. */
  286. public function endProcess() {
  287. // Call post-process methods
  288. if ($this->status == Migration::STATUS_IMPORTING) {
  289. $this->postImport();
  290. }
  291. elseif ($this->status == Migration::STATUS_ROLLING_BACK) {
  292. $this->postRollback();
  293. }
  294. parent::endProcess();
  295. }
  296. /**
  297. * Default implementations of pre/post import/rollback methods. These call
  298. * the destination methods (if they exist) - when overriding, always
  299. * call parent::preImport() etc.
  300. */
  301. protected function preImport() {
  302. if (method_exists($this->destination, 'preImport')) {
  303. $this->destination->preImport();
  304. }
  305. }
  306. protected function preRollback() {
  307. if (method_exists($this->destination, 'preRollback')) {
  308. $this->destination->preRollback();
  309. }
  310. }
  311. protected function postImport() {
  312. if (method_exists($this->destination, 'postImport')) {
  313. $this->destination->postImport();
  314. }
  315. }
  316. protected function postRollback() {
  317. if (method_exists($this->destination, 'postRollback')) {
  318. $this->destination->postRollback();
  319. }
  320. }
  321. /**
  322. * Perform a rollback operation - remove migrated items from the destination.
  323. */
  324. protected function rollback() {
  325. $return = MigrationBase::RESULT_COMPLETED;
  326. $itemlimit = $this->getItemLimit();
  327. $idlist = $this->getOption('idlist');
  328. if ($idlist) {
  329. // Make the IDs keys, to more easily identify them
  330. $idlist = array_flip(explode(',', $idlist));
  331. }
  332. if (method_exists($this->destination, 'bulkRollback')) {
  333. // Too many at once can lead to memory issues, so batch 'em up
  334. $destids = array();
  335. $sourceids = array();
  336. $batch_count = 0;
  337. foreach ($this->map as $destination_key) {
  338. if ($this->timeOptionExceeded()) {
  339. break;
  340. }
  341. if (($return = $this->checkStatus()) != MigrationBase::RESULT_COMPLETED) {
  342. break;
  343. }
  344. if ($itemlimit && ($this->total_processed + $batch_count >= $itemlimit)) {
  345. break;
  346. }
  347. $this->currentSourceKey = $this->map->getCurrentKey();
  348. // If there's an idlist, skip anything not in the list
  349. if ($idlist && !isset($idlist[$this->currentSourceKey['sourceid1']])) {
  350. continue;
  351. }
  352. // Note that bulk rollback is only supported for single-column keys
  353. $sourceids[] = $this->currentSourceKey;
  354. if (!empty($destination_key->destid1)) {
  355. $destids[] = $destination_key->destid1;
  356. }
  357. $batch_count++;
  358. if ($batch_count >= $this->rollbackBatchSize) {
  359. try {
  360. if ($this->systemOfRecord == Migration::SOURCE) {
  361. if (!empty($destids)) {
  362. migrate_instrument_start('destination bulkRollback');
  363. $this->destination->bulkRollback($destids);
  364. migrate_instrument_stop('destination bulkRollback');
  365. }
  366. }
  367. // Keep track in case of interruption
  368. migrate_instrument_start('rollback map/message update');
  369. $this->map->deleteBulk($sourceids);
  370. migrate_instrument_stop('rollback map/message update');
  371. $this->total_successes += $batch_count;
  372. $this->successes_since_feedback += $batch_count;
  373. }
  374. catch (Exception $e) {
  375. $this->handleException($e, FALSE);
  376. migrate_instrument_stop('bulkRollback');
  377. migrate_instrument_stop('rollback map/message update');
  378. }
  379. $destids = array();
  380. $sourceids = array();
  381. // Will increment even if there was an exception... But we don't
  382. // really have a way to know how many really were successfully rolled back
  383. $this->total_processed += $batch_count;
  384. $this->processed_since_feedback += $batch_count;
  385. $batch_count = 0;
  386. }
  387. }
  388. if ($batch_count > 0) {
  389. if ($this->systemOfRecord == Migration::SOURCE) {
  390. if (!empty($destids)) {
  391. migrate_instrument_start('destination bulkRollback');
  392. $this->destination->bulkRollback($destids);
  393. migrate_instrument_stop('destination bulkRollback');
  394. }
  395. $this->total_processed += $batch_count;
  396. $this->total_successes += $batch_count;
  397. $this->processed_since_feedback += $batch_count;
  398. $this->successes_since_feedback += $batch_count;
  399. }
  400. migrate_instrument_start('rollback map/message update');
  401. $this->map->deleteBulk($sourceids);
  402. migrate_instrument_stop('rollback map/message update');
  403. }
  404. }
  405. else {
  406. foreach ($this->map as $destination_key) {
  407. if ($this->timeOptionExceeded()) {
  408. break;
  409. }
  410. if (($return = $this->checkStatus()) != MigrationBase::RESULT_COMPLETED) {
  411. break;
  412. }
  413. if ($this->itemOptionExceeded()) {
  414. break;
  415. }
  416. $this->currentSourceKey = $this->map->getCurrentKey();
  417. // If there's an idlist, skip anything not in the list
  418. if ($idlist && !isset($idlist[$this->currentSourceKey['sourceid1']])) {
  419. continue;
  420. }
  421. // Rollback one record
  422. try {
  423. if ($this->systemOfRecord == Migration::SOURCE) {
  424. // Skip when the destination key is null
  425. $skip = FALSE;
  426. foreach ($destination_key as $key_value) {
  427. if (is_null($key_value)) {
  428. $skip = TRUE;
  429. break;
  430. }
  431. }
  432. if (!$skip) {
  433. migrate_instrument_start('destination rollback');
  434. $this->destination->rollback((array)$destination_key);
  435. migrate_instrument_stop('destination rollback');
  436. }
  437. }
  438. migrate_instrument_start('rollback map/message update');
  439. $this->map->delete($this->currentSourceKey);
  440. migrate_instrument_stop('rollback map/message update');
  441. $this->total_successes++;
  442. $this->successes_since_feedback++;
  443. }
  444. catch (Exception $e) {
  445. // TODO: At least count failures
  446. continue;
  447. }
  448. $this->total_processed++;
  449. $this->processed_since_feedback++;
  450. }
  451. }
  452. $this->map->clearMessages();
  453. $this->progressMessage($return);
  454. // If we're using highwater marks, reset at completion of a full rollback
  455. // TODO: What about partial rollbacks? Probably little we can do to make
  456. // that work cleanly...
  457. if ($this->highwaterField) {
  458. $this->saveHighwater('', TRUE);
  459. }
  460. return $return;
  461. }
  462. /**
  463. * Perform an import operation - migrate items from source to destination.
  464. */
  465. protected function import() {
  466. $return = MigrationBase::RESULT_COMPLETED;
  467. try {
  468. $this->source->rewind();
  469. }
  470. catch (Exception $e) {
  471. self::displayMessage(
  472. t('Migration failed with source plugin exception: !e',
  473. array('!e' => $e->getMessage())));
  474. return MigrationBase::RESULT_FAILED;
  475. }
  476. while ($this->source->valid()) {
  477. $data_row = $this->source->current();
  478. $this->currentSourceKey = $this->source->getCurrentKey();
  479. // Wipe old messages
  480. $this->map->delete($this->currentSourceKey, TRUE);
  481. $this->sourceValues = $data_row;
  482. $this->applyMappings();
  483. try {
  484. migrate_instrument_start('destination import', TRUE);
  485. $ids = $this->destination->import($this->destinationValues, $this->sourceValues);
  486. migrate_instrument_stop('destination import');
  487. if ($ids) {
  488. $this->map->saveIDMapping($this->sourceValues, $ids, $this->needsUpdate);
  489. $this->successes_since_feedback++;
  490. $this->total_successes++;
  491. }
  492. else {
  493. $this->map->saveIDMapping($this->sourceValues, array(), MigrateMap::STATUS_FAILED);
  494. $message = t('New object was not saved, no error provided');
  495. $this->saveMessage($message);
  496. self::displayMessage($message);
  497. }
  498. }
  499. catch (MigrateException $e) {
  500. $this->map->saveIDMapping($this->sourceValues, array(), MigrateMap::STATUS_FAILED);
  501. $this->saveMessage($e->getMessage(), $e->getLevel());
  502. self::displayMessage($e->getMessage());
  503. }
  504. catch (Exception $e) {
  505. $this->map->saveIDMapping($this->sourceValues, array(), MigrateMap::STATUS_FAILED);
  506. $this->handleException($e);
  507. }
  508. $this->total_processed++;
  509. $this->processed_since_feedback++;
  510. if ($this->highwaterField) {
  511. $this->saveHighwater($this->sourceValues->{$this->highwaterField['name']});
  512. }
  513. // Reset row properties.
  514. unset($this->sourceValues, $this->destinationValues);
  515. $this->needsUpdate = MigrateMap::STATUS_IMPORTED;
  516. // TODO: Temporary. Remove when http://drupal.org/node/375494 is committed.
  517. // TODO: Should be done in MigrateDestinationEntity
  518. if (!empty($this->destination->entityType)) {
  519. entity_get_controller($this->destination->entityType)->resetCache();
  520. }
  521. if ($this->timeOptionExceeded()) {
  522. break;
  523. }
  524. if (($return = $this->checkStatus()) != MigrationBase::RESULT_COMPLETED) {
  525. break;
  526. }
  527. if ($this->itemOptionExceeded()) {
  528. break;
  529. }
  530. try {
  531. $this->source->next();
  532. }
  533. catch (Exception $e) {
  534. self::displayMessage(
  535. t('Migration failed with source plugin exception: !e',
  536. array('!e' => $e->getMessage())));
  537. return MigrationBase::RESULT_FAILED;
  538. }
  539. }
  540. $this->progressMessage($return);
  541. return $return;
  542. }
  543. /**
  544. * Perform an analysis operation - report on field values in the source.
  545. *
  546. * @return array
  547. * Array of analysis details - each element is keyed by field name and
  548. * contains an array describing the field values.
  549. */
  550. public function analyze() {
  551. // The source needs this to find the map table.
  552. self::$currentMigration = $this;
  553. try {
  554. $this->source->rewind();
  555. }
  556. catch (Exception $e) {
  557. self::displayMessage(
  558. t('Migration analysis failed with source plugin exception: !e',
  559. array('!e' => $e->getMessage())));
  560. self::$currentMigration = NULL;
  561. return array();
  562. }
  563. // Get the documented fields first
  564. $source_fields = $this->source->fields();
  565. $analysis = array();
  566. $field_init = array(
  567. 'is_numeric' => TRUE,
  568. 'min_numeric' => NULL,
  569. 'max_numeric' => NULL,
  570. 'min_strlen' => 0,
  571. 'max_strlen' => 0,
  572. 'distinct_values' => array(),
  573. );
  574. foreach ($source_fields as $field_name => $description) {
  575. // Ignore fields from the map table
  576. if (substr($field_name, 0, strlen('migrate_map_')) == 'migrate_map_') {
  577. continue;
  578. }
  579. $analysis[$field_name] = $field_init +
  580. array('description' => $description);
  581. }
  582. // For each data row...
  583. while ($this->source->valid()) {
  584. $row = $this->source->current();
  585. // Cheat for XML migrations, which don't pick up the source values
  586. // until applyMappings() applies the xpath()
  587. if (is_a($this, 'XMLMigration')) {
  588. $this->currentSourceKey = $this->source->getCurrentKey();
  589. $this->sourceValues = $row;
  590. $this->applyMappings();
  591. $row = $this->sourceValues;
  592. unset($row->xml);
  593. }
  594. // For each field in this row...
  595. foreach ($row as $field_name => $raw_value) {
  596. // Ignore fields from the map table
  597. if (substr($field_name, 0, strlen('migrate_map_')) == 'migrate_map_') {
  598. continue;
  599. }
  600. // It might be an array of values, check each value
  601. if (!is_array($raw_value)) {
  602. $raw_value = array($raw_value);
  603. }
  604. foreach ($raw_value as $field_value) {
  605. // If this is an undocumented field, initialize it
  606. if (!isset($analysis[$field_name])) {
  607. $analysis[$field_name] = $field_init +
  608. array('description' => '');
  609. }
  610. // Ignore leading/trailing spaces in determing numerics
  611. $trimmed_value = trim($field_value);
  612. if (is_numeric($trimmed_value)) {
  613. $trimmed_value = floatval($trimmed_value);
  614. // First numeric value, initialize the min/max
  615. if (is_null($analysis[$field_name]['min_numeric'])) {
  616. $analysis[$field_name]['min_numeric'] = $trimmed_value;
  617. $analysis[$field_name]['max_numeric'] = $trimmed_value;
  618. }
  619. else {
  620. $analysis[$field_name]['min_numeric'] = min($trimmed_value,
  621. $analysis[$field_name]['min_numeric']);
  622. $analysis[$field_name]['max_numeric'] = max($trimmed_value,
  623. $analysis[$field_name]['max_numeric']);
  624. }
  625. }
  626. elseif ($trimmed_value !== '') {
  627. // Empty strings are !is_numeric(), but treat as empty rather than
  628. // assuming we don't have a numeric field
  629. $analysis[$field_name]['is_numeric'] = FALSE;
  630. }
  631. $strlen = strlen($field_value);
  632. // First string value, initialize both min and max
  633. if ($analysis[$field_name]['max_strlen'] == 0) {
  634. $analysis[$field_name]['min_strlen'] = $strlen;
  635. $analysis[$field_name]['max_strlen'] = $strlen;
  636. }
  637. else {
  638. $analysis[$field_name]['min_strlen'] = min($strlen,
  639. $analysis[$field_name]['min_strlen']);
  640. $analysis[$field_name]['max_strlen'] = max($strlen,
  641. $analysis[$field_name]['max_strlen']);
  642. }
  643. // Track up to 10 distinct values
  644. if (count($analysis[$field_name]['distinct_values']) <= 10) {
  645. $analysis[$field_name]['distinct_values'][$trimmed_value]++;
  646. }
  647. }
  648. }
  649. try {
  650. $this->source->next();
  651. }
  652. catch (Exception $e) {
  653. self::displayMessage(
  654. t('Migration analysis failed with source plugin exception: !e. Partial results follow:',
  655. array('!e' => $e->getMessage())));
  656. self::$currentMigration = NULL;
  657. return $analysis;
  658. }
  659. }
  660. self::$currentMigration = NULL;
  661. return $analysis;
  662. }
  663. /**
  664. * Default implementation of prepareRow(). This method is called from the source
  665. * plugin upon first pulling the raw data from the source.
  666. *
  667. * @param $row
  668. * Object containing raw source data.
  669. * @return bool
  670. * TRUE to process this row, FALSE to have the source skip it.
  671. */
  672. public function prepareRow($row) {
  673. return TRUE;
  674. }
  675. ////////////////////////////////////////////////////////////////////
  676. // Utility methods
  677. /**
  678. * Convenience function to return count of total source records
  679. *
  680. * @param boolean $refresh
  681. * Pass TRUE to refresh the cached count.
  682. */
  683. public function sourceCount($refresh = FALSE) {
  684. try {
  685. $count = $this->source->count($refresh);
  686. }
  687. catch (Exception $e) {
  688. $count = t('N/A');
  689. self::displayMessage($e->getMessage());
  690. }
  691. return $count;
  692. }
  693. /**
  694. * Get the number of source records processed.
  695. * @return int
  696. * Number of processed records.
  697. */
  698. public function processedCount() {
  699. try {
  700. $count = $this->map->processedCount();
  701. }
  702. catch (Exception $e) {
  703. $count = t('N/A');
  704. self::displayMessage($e->getMessage());
  705. }
  706. return $count;
  707. }
  708. /**
  709. * Get the number of records successfully imported.
  710. * @return int
  711. * Number of imported records.
  712. */
  713. public function importedCount() {
  714. try {
  715. $count = $this->map->importedCount();
  716. }
  717. catch (Exception $e) {
  718. $count = t('N/A');
  719. self::displayMessage($e->getMessage());
  720. }
  721. return $count;
  722. }
  723. /**
  724. * Get the number of records marked as needing update.
  725. * @return int
  726. */
  727. public function updateCount() {
  728. try {
  729. $count = $this->map->updateCount();
  730. }
  731. catch (Exception $e) {
  732. $count = t('N/A');
  733. self::displayMessage($e->getMessage());
  734. }
  735. return $count;
  736. }
  737. /**
  738. * Test whether we've exceeded the designated item limit.
  739. *
  740. * @return boolean
  741. * TRUE if the threshold is exceeded, FALSE if not.
  742. */
  743. protected function itemOptionExceeded() {
  744. $itemlimit = $this->getItemLimit();
  745. if ($itemlimit && $this->total_processed >= $itemlimit) {
  746. return TRUE;
  747. }
  748. return FALSE;
  749. }
  750. /**
  751. * Get the number of source records which failed to import.
  752. * TODO: Doesn't yet account for informationals, or multiple errors for
  753. * a source record.
  754. *
  755. * @return int
  756. * Number of records errored out.
  757. */
  758. public function errorCount() {
  759. return $this->map->errorCount();
  760. }
  761. /**
  762. * Get the number of messages associated with this migration
  763. *
  764. * @return int
  765. * Number of messages.
  766. */
  767. public function messageCount() {
  768. return $this->map->messageCount();
  769. }
  770. /**
  771. * Prepares this migration to run as an update - that is, in addition to
  772. * unmigrated content (source records not in the map table) being imported,
  773. * previously-migrated content will also be updated in place.
  774. */
  775. public function prepareUpdate() {
  776. $this->map->prepareUpdate();
  777. }
  778. /**
  779. * Outputs a progress message, reflecting the current status of a migration process.
  780. *
  781. * @param int $result
  782. * Status of the process, represented by one of the Migration::RESULT_* constants.
  783. */
  784. protected function progressMessage($result) {
  785. $time = microtime(TRUE) - $this->lastfeedback;
  786. if ($time > 0) {
  787. $perminute = round(60*$this->processed_since_feedback/$time);
  788. $time = round($time, 1);
  789. }
  790. else {
  791. $perminute = '?';
  792. }
  793. if ($this->status == Migration::STATUS_IMPORTING) {
  794. switch ($result) {
  795. case Migration::RESULT_COMPLETED:
  796. $basetext = "Processed !numitems (!created created, !updated updated, !failed failed, !ignored ignored) in !time sec (!perminute/min) - done with '!name'";
  797. $type = 'completed';
  798. break;
  799. case Migration::RESULT_FAILED:
  800. $basetext = "Processed !numitems (!created created, !updated updated, !failed failed, !ignored ignored) in !time sec (!perminute/min) - failure with '!name'";
  801. $type = 'failed';
  802. break;
  803. case Migration::RESULT_INCOMPLETE:
  804. $basetext = "Processed !numitems (!created created, !updated updated, !failed failed, !ignored ignored) in !time sec (!perminute/min) - continuing with '!name'";
  805. $type = 'ok';
  806. break;
  807. case Migration::RESULT_STOPPED:
  808. $basetext = "Processed !numitems (!created created, !updated updated, !failed failed, !ignored ignored) in !time sec (!perminute/min) - stopped '!name'";
  809. $type = 'warning';
  810. break;
  811. }
  812. }
  813. else {
  814. switch ($result) {
  815. case Migration::RESULT_COMPLETED:
  816. $basetext = "Rolled back !numitems in !time sec (!perminute/min) - done with '!name'";
  817. $type = 'completed';
  818. break;
  819. case Migration::RESULT_FAILED:
  820. $basetext = "Rolled back !numitems in !time sec (!perminute/min) - failure with '!name'";
  821. $type = 'failed';
  822. break;
  823. case Migration::RESULT_INCOMPLETE:
  824. $basetext = "Rolled back !numitems in !time sec (!perminute/min) - continuing with '!name'";
  825. $type = 'ok';
  826. break;
  827. case Migration::RESULT_STOPPED:
  828. $basetext = "Rolled back !numitems in !time sec (!perminute/min) - stopped '!name'";
  829. $type = 'warning';
  830. break;
  831. }
  832. }
  833. $numitems = $this->processed_since_feedback + $this->source->getIgnored();
  834. $message = t($basetext,
  835. array('!numitems' => $numitems,
  836. '!successes' => $this->successes_since_feedback,
  837. '!failed' => $this->processed_since_feedback - $this->successes_since_feedback,
  838. '!created' => $this->destination->getCreated(),
  839. '!updated' => $this->destination->getUpdated(),
  840. '!ignored' => $this->source->getIgnored(),
  841. '!time' => $time,
  842. '!perminute' => $perminute,
  843. '!name' => $this->machineName));
  844. self::displayMessage($message, $type);
  845. // Report on lookup_cache hit rate. Only visible at 'debug' level.
  846. if ($result != Migration::RESULT_INCOMPLETE && !empty($this->counts['lookup_cache'])) {
  847. foreach ($this->counts['lookup_cache'] as $name => $tallies) {
  848. $tallies += array('hit' => 0, 'miss_hit' => 0, 'miss_miss' => 0); // Set defaults to avoid NOTICE.
  849. $sum = $tallies['hit']+$tallies['miss_hit']+$tallies['miss_miss'];
  850. self::displayMessage(
  851. t('Lookup cache: !mn SM=!name !hit hit, !miss_hit miss_hit, !miss_miss miss_miss (!total total).', array(
  852. '!mn' => $this->machineName,
  853. '!name' => $name,
  854. '!hit' => round((100*$tallies['hit'])/$sum) . '%',
  855. '!miss_hit' => round((100*$tallies['miss_hit'])/$sum) . '%',
  856. '!miss_miss' => round((100*$tallies['miss_miss'])/$sum) . '%',
  857. '!total' => $sum
  858. )), 'debug');
  859. }
  860. $this->counts['lookup_cache'] = array();
  861. }
  862. if ($result == Migration::RESULT_INCOMPLETE) {
  863. $this->lastfeedback = time();
  864. $this->processed_since_feedback = $this->successes_since_feedback = 0;
  865. $this->source->resetStats();
  866. $this->destination->resetStats();
  867. }
  868. }
  869. /**
  870. * Standard top-of-loop stuff, common between rollback and import - check
  871. * for exceptional conditions, and display feedback.
  872. */
  873. protected function checkStatus() {
  874. if ($this->memoryExceeded()) {
  875. return MigrationBase::RESULT_INCOMPLETE;
  876. }
  877. if ($this->timeExceeded()) {
  878. return MigrationBase::RESULT_INCOMPLETE;
  879. }
  880. if ($this->getStatus() == Migration::STATUS_STOPPING) {
  881. return MigrationBase::RESULT_STOPPED;
  882. }
  883. // If feedback is requested, produce a progress message at the proper time
  884. if (isset($this->feedback)) {
  885. if (($this->feedback_unit == 'seconds' && time() - $this->lastfeedback >= $this->feedback) ||
  886. ($this->feedback_unit == 'items' && $this->processed_since_feedback >= $this->feedback)) {
  887. $this->progressMessage(MigrationBase::RESULT_INCOMPLETE);
  888. }
  889. }
  890. return MigrationBase::RESULT_COMPLETED;
  891. }
  892. /**
  893. * Apply field mappings to a data row received from the source, returning
  894. * a populated destination object.
  895. */
  896. protected function applyMappings() {
  897. $this->destinationValues = new stdClass;
  898. foreach ($this->fieldMappings as $mapping) {
  899. $destination = $mapping->getDestinationField();
  900. // Skip mappings with no destination (source fields marked DNM)
  901. if ($destination) {
  902. $source = $mapping->getSourceField();
  903. $default = $mapping->getDefaultValue();
  904. // When updating existing items, make sure we don't create a destination
  905. // field that is not mapped to anything (a source field or a default value)
  906. if (!$source && !isset($default)) {
  907. continue;
  908. }
  909. $destination_values = NULL;
  910. // If there's a source mapping, and a source value in the data row, copy
  911. // to the destination
  912. if ($source && property_exists($this->sourceValues, $source)) {
  913. $destination_values = $this->sourceValues->$source;
  914. }
  915. // Otherwise, apply the default value (if any)
  916. elseif (!is_null($default)) {
  917. $destination_values = $default;
  918. }
  919. // If there's a separator specified for this destination, then it
  920. // will be populated as an array exploded from the source value
  921. $separator = $mapping->getSeparator();
  922. if ($separator && isset($destination_values)) {
  923. $destination_values = explode($separator, $destination_values);
  924. }
  925. // If a source migration is supplied, use the current value for this field
  926. // to look up a destination ID from the provided migration
  927. $source_migration = $mapping->getSourceMigration();
  928. if ($source_migration && isset($destination_values)) {
  929. $destination_values = $this->handleSourceMigration($source_migration, $destination_values, $default, $this);
  930. }
  931. // Call any designated callbacks
  932. $callbacks = $mapping->getCallbacks();
  933. foreach ($callbacks as $callback) {
  934. if (isset($destination_values)) {
  935. $destination_values = call_user_func($callback, $destination_values);
  936. }
  937. }
  938. // If specified, assure a unique value for this property.
  939. $dedupe = $mapping->getDedupe();
  940. if ($dedupe && isset($destination_values)) {
  941. $destination_values = $this->handleDedupe($dedupe, $destination_values);
  942. }
  943. // Assign any arguments
  944. if (isset($destination_values)) {
  945. $arguments = $mapping->getArguments();
  946. if ($arguments) {
  947. if (!is_array($destination_values)) {
  948. $destination_values = array($destination_values);
  949. }
  950. // TODO: Stuffing arguments into the destination field is gross - can
  951. // we come up with a better way to communicate them to the field
  952. // handlers?
  953. $destination_values['arguments'] = array();
  954. foreach ($arguments as $argname => $destarg) {
  955. if (is_array($destarg) && isset($destarg['source_field']) && property_exists($this->sourceValues, $destarg['source_field'])) {
  956. $destination_values['arguments'][$argname] = $this->sourceValues->$destarg['source_field'];
  957. }
  958. elseif (is_array($destarg) && isset($destarg['default_value'])) {
  959. $destination_values['arguments'][$argname] = $destarg['default_value'];
  960. }
  961. else {
  962. $destination_values['arguments'][$argname] = $destarg;
  963. }
  964. }
  965. }
  966. }
  967. // Are we dealing with the primary value of the destination field, or a
  968. // subfield?
  969. $destination = explode(':', $destination);
  970. $destination_field = $destination[0];
  971. if (isset($destination[1])) {
  972. $subfield = $destination[1];
  973. if (!is_array($this->destinationValues->$destination_field)) {
  974. $this->destinationValues->$destination_field = array($this->destinationValues->$destination_field);
  975. }
  976. $this->destinationValues->{$destination_field}['arguments'][$subfield] = $destination_values;
  977. }
  978. else {
  979. $this->destinationValues->$destination_field = $destination_values;
  980. }
  981. }
  982. }
  983. }
  984. /**
  985. * Look up a value migrated in another migration.
  986. *
  987. * @param mixed $source_migrations
  988. * An array of source migrations, or string for a single migration.
  989. * @param mixed $source_keys
  990. * Key(s) to be looked up against the source migration(s). This may be a simple
  991. * value (one single-field key), an array of values (multiple single-field keys
  992. * to each be looked up), or an array of arrays (multiple multi-field keys to
  993. * each be looked up).
  994. * @param mixed $default
  995. * The default value, if no ID was found.
  996. * @param $migration
  997. * The implementing migration.
  998. * @return
  999. * Destination value(s) from the source migration(s), as a single value if
  1000. * a single key was passed in, or an array of values if there were multiple
  1001. * keys to look up.
  1002. */
  1003. protected function handleSourceMigration($source_migrations, $source_keys, $default = NULL, $migration = NULL) {
  1004. // Handle the source migration(s) as an array.
  1005. $source_migrations = (array) $source_migrations;
  1006. // We want to treat source keys consistently as an array of arrays (each
  1007. // representing one key).
  1008. if (is_array($source_keys)) {
  1009. if (is_array(reset($source_keys))) {
  1010. // Already an array of key arrays, fall through
  1011. }
  1012. else {
  1013. // An array of single-key values - make each one an array
  1014. $new_source_keys = array();
  1015. foreach ($source_keys as $source_key) {
  1016. $new_source_keys[] = array($source_key);
  1017. }
  1018. $source_keys = $new_source_keys;
  1019. }
  1020. }
  1021. else {
  1022. // A simple value - make it an array within an array
  1023. $source_keys = array(array($source_keys));
  1024. }
  1025. // Instantiate each migration, and store back in the array.
  1026. foreach ($source_migrations as $key => $source_migration) {
  1027. $source_migrations[$key] = Migration::getInstance($source_migration);
  1028. }
  1029. $results = array();
  1030. // Each $source_key will be an array of key values
  1031. foreach ($source_keys as $source_key) {
  1032. // If any source keys are empty, skip this set
  1033. $continue = FALSE;
  1034. foreach ($source_key as $value) {
  1035. if (empty($value) && $value !== 0 && $value !== '0') {
  1036. $continue = TRUE;
  1037. break;
  1038. }
  1039. }
  1040. if ($continue || empty($source_key)) {
  1041. continue;
  1042. }
  1043. // Loop through each source migration, checking for an existing dest ID.
  1044. foreach ($source_migrations as $source_migration) {
  1045. // Break out of the loop as soon as a destination ID is found.
  1046. if ($destids = $source_migration->getMap()->lookupDestinationID($source_key)) {
  1047. break;
  1048. }
  1049. }
  1050. // If no destination ID was found, give each source migration a chance to
  1051. // create a stub.
  1052. if (!$destids) {
  1053. foreach ($source_migrations as $source_migration) {
  1054. // Break out of the loop if a stub was successfully created.
  1055. if ($destids = $source_migration->createStubWrapper($source_key, $migration)) {
  1056. break;
  1057. }
  1058. }
  1059. }
  1060. if ($destids) {
  1061. // Assume that if the destination key is a single value, it
  1062. // should be passed as such
  1063. if (count($destids) == 1) {
  1064. $results[] = reset($destids);
  1065. }
  1066. else {
  1067. $results[] = $destids;
  1068. }
  1069. }
  1070. // If no match found, apply the default value (if any)
  1071. elseif (!is_null($default)) {
  1072. $results[] = $default;
  1073. }
  1074. }
  1075. // Return a single result if we had a single key
  1076. if (count($source_keys) > 1) {
  1077. return $results;
  1078. }
  1079. else {
  1080. return reset($results);
  1081. }
  1082. }
  1083. /**
  1084. * For fields which require uniqueness, assign a new unique value if necessary.
  1085. *
  1086. * @param array $dedupe
  1087. * An array with two keys, 'table' the name of the Drupal table and 'column'
  1088. * the column within that table where uniqueness must be maintained.
  1089. * @param $original
  1090. * The value coming in, which must be checked for uniqueness.
  1091. * @return string
  1092. * The value to use - either the original, or a variation created by appending
  1093. * a sequence number.
  1094. */
  1095. protected function handleDedupe($dedupe, $original) {
  1096. // If we're remigrating a previously-existing value, simply running through
  1097. // our normal process will re-dedupe it - we must be sure to preserve the
  1098. // previously-written value. Note that this means that you cannot migrate
  1099. // changes to this field - the originally-migrated value will always
  1100. // remain, because we can't tell what the original was.
  1101. if (isset($this->sourceValues->migrate_map_destid1)) {
  1102. $key_field = key($this->destination->getKeySchema());
  1103. $existing_value = db_select($dedupe['table'], 't')
  1104. ->fields('t', array($dedupe['column']))
  1105. ->range(0, 1)
  1106. ->condition($key_field, $this->sourceValues->migrate_map_destid1)
  1107. ->execute()
  1108. ->fetchField();
  1109. // Note that if, for some reason, we don't find a value, fall through
  1110. // to the normal deduping process
  1111. if ($existing_value) {
  1112. return $existing_value;
  1113. }
  1114. }
  1115. $i = 1;
  1116. $candidate = $original;
  1117. while ($candidate_found = db_select($dedupe['table'], 't')
  1118. ->fields('t', array($dedupe['column']))
  1119. ->range(0, 1)
  1120. ->condition('t.' . $dedupe['column'], $candidate)
  1121. ->execute()
  1122. ->fetchField()) {
  1123. // We already have the candidate value. Find a non-existing value.
  1124. $i++;
  1125. // @TODO: support custom replacement pattern instead of just append.
  1126. $candidate = $original . '_' . $i;
  1127. }
  1128. if ($i > 1) {
  1129. $message = t('Replacing !column !original with !candidate',
  1130. array('!column' => $dedupe['column'],
  1131. '!original' => $original,
  1132. '!candidate' => $candidate));
  1133. $migration = Migration::currentMigration();
  1134. $migration->saveMessage($message, Migration::MESSAGE_INFORMATIONAL);
  1135. }
  1136. return $candidate;
  1137. }
  1138. /**
  1139. * If stub creation is enabled, try to create a stub and save the mapping.
  1140. */
  1141. protected function createStubWrapper(array $source_key, $migration = NULL) {
  1142. if (method_exists($this, 'createStub')) {
  1143. $destids = $this->createStub($migration, $source_key);
  1144. if ($destids) {
  1145. // Fake a data row with the source key in it
  1146. $map_source_key = $this->map->getSourceKey();
  1147. $data_row = new stdClass;
  1148. $i = 0;
  1149. foreach ($map_source_key as $key => $definition) {
  1150. $data_row->$key = $source_key[$i++];
  1151. }
  1152. $this->map->saveIDMapping($data_row, $destids, MigrateMap::STATUS_NEEDS_UPDATE);
  1153. }
  1154. }
  1155. else {
  1156. $destids = NULL;
  1157. }
  1158. return $destids;
  1159. }
  1160. /**
  1161. * Pass messages through to the map class
  1162. *
  1163. * @param string $message
  1164. * The message to record.
  1165. * @param int $level
  1166. * Optional message severity (defaults to MESSAGE_ERROR).
  1167. */
  1168. public function saveMessage($message, $level = MigrationBase::MESSAGE_ERROR) {
  1169. $this->map->saveMessage($this->currentSourceKey, $message, $level);
  1170. }
  1171. /**
  1172. * Set the specified row to be updated, if it exists.
  1173. */
  1174. public function setUpdate(array $source_key = NULL) {
  1175. if (!$source_key) {
  1176. $source_key = $this->currentSourceKey;
  1177. }
  1178. $this->map->setUpdate($source_key);
  1179. }
  1180. }
  1181. /**
  1182. * Convenience class - deriving from this rather than directory from Migration
  1183. * ensures that a class will not be registered as a migration itself - it is
  1184. * the implementor's responsibility to register each instance of a dynamic
  1185. * migration class.
  1186. */
  1187. abstract class DynamicMigration extends Migration {
  1188. /**
  1189. * Overrides default of FALSE
  1190. */
  1191. static public function isDynamic() {
  1192. return TRUE;
  1193. }
  1194. }