source.inc 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478
  1. <?php
  2. /**
  3. * @file
  4. * Define base for migration sources.
  5. */
  6. /**
  7. * Abstract base class for source handling.
  8. *
  9. * Derived classes are expected to define __toString(), returning a string
  10. * describing the source and significant options. See
  11. * MigrateSourceSQL for an example.
  12. */
  13. abstract class MigrateSource implements Iterator {
  14. /**
  15. * The current row from the quey
  16. *
  17. * @var stdClass
  18. */
  19. protected $currentRow;
  20. /**
  21. * The primary key of the current row
  22. *
  23. * @var array
  24. */
  25. protected $currentKey;
  26. public function getCurrentKey() {
  27. return $this->currentKey;
  28. }
  29. /**
  30. * The Migration class currently invoking us, during rewind() and next().
  31. *
  32. * @var Migration
  33. */
  34. protected $activeMigration;
  35. /**
  36. * The MigrateMap class for the current migration.
  37. *
  38. * @var MigrateMap
  39. */
  40. protected $activeMap;
  41. /**
  42. * Number of rows intentionally ignored (prepareRow() returned FALSE)
  43. *
  44. * @var int
  45. */
  46. protected $numIgnored = 0;
  47. public function getIgnored() {
  48. return $this->numIgnored;
  49. }
  50. /**
  51. * Number of rows we've at least looked at.
  52. *
  53. * @var int
  54. */
  55. protected $numProcessed = 0;
  56. public function getProcessed() {
  57. return $this->numProcessed;
  58. }
  59. /**
  60. * Reset numIgnored back to 0.
  61. */
  62. public function resetStats() {
  63. $this->numIgnored = 0;
  64. }
  65. /**
  66. * Information on the highwater mark for the current migration, if any.
  67. *
  68. * @var array
  69. */
  70. protected $highwaterField;
  71. /**
  72. * The highwater mark at the beginning of the import operation.
  73. *
  74. * @var
  75. */
  76. protected $originalHighwater = '';
  77. /**
  78. * Used in the case of multiple key sources that need to use idlist.
  79. *
  80. * @var string
  81. */
  82. protected $multikeySeparator = ':';
  83. /**
  84. * List of source IDs to process.
  85. *
  86. * @var array
  87. */
  88. protected $idList = array();
  89. /**
  90. * Derived classes must implement fields(), returning a list of available
  91. * source fields.
  92. *
  93. * @return array
  94. * Keys: machine names of the fields (to be passed to addFieldMapping)
  95. * Values: Human-friendly descriptions of the fields.
  96. */
  97. abstract public function fields();
  98. /**
  99. * Whether this instance should cache the source count.
  100. *
  101. * @var boolean
  102. */
  103. protected $cacheCounts = FALSE;
  104. /**
  105. * Key to use for caching counts.
  106. *
  107. * @var string
  108. */
  109. protected $cacheKey;
  110. /**
  111. * Whether this instance should not attempt to count the source.
  112. *
  113. * @var boolean
  114. */
  115. protected $skipCount = FALSE;
  116. /**
  117. * If TRUE, we will maintain hashed source rows to determine whether incoming
  118. * data has changed.
  119. *
  120. * @var bool
  121. */
  122. protected $trackChanges = FALSE;
  123. /**
  124. * By default, next() will directly read the map row and add it to the data
  125. * row. A source plugin implementation may do this itself (in particular, the
  126. * SQL source can incorporate the map table into the query) - if so, it should
  127. * set this TRUE so we don't duplicate the effort.
  128. *
  129. * @var bool
  130. */
  131. protected $mapRowAdded = FALSE;
  132. /**
  133. * Return a count of available source records, from the cache if appropriate.
  134. * Returns -1 if the source is not countable.
  135. *
  136. * @param boolean $refresh
  137. */
  138. public function count($refresh = FALSE) {
  139. if ($this->skipCount) {
  140. return -1;
  141. }
  142. if (!isset($this->cacheKey)) {
  143. $this->cacheKey = md5((string)$this);
  144. }
  145. // If a refresh is requested, or we're not caching counts, ask the derived
  146. // class to get the count from the source.
  147. if ($refresh || !$this->cacheCounts) {
  148. $count = $this->computeCount();
  149. cache_set($this->cacheKey, $count, 'cache');
  150. }
  151. else {
  152. // Caching is in play, first try to retrieve a cached count.
  153. $cache_object = cache_get($this->cacheKey, 'cache');
  154. if (is_object($cache_object)) {
  155. // Success
  156. $count = $cache_object->data;
  157. }
  158. else {
  159. // No cached count, ask the derived class to count 'em up, and cache
  160. // the result
  161. $count = $this->computeCount();
  162. cache_set($this->cacheKey, $count, 'cache');
  163. }
  164. }
  165. return $count;
  166. }
  167. /**
  168. * Derived classes must implement computeCount(), to retrieve a fresh count of
  169. * source records.
  170. */
  171. //abstract public function computeCount();
  172. /**
  173. * Class constructor.
  174. *
  175. * @param array $options
  176. * Optional array of options.
  177. */
  178. public function __construct($options = array()) {
  179. if (!empty($options['cache_counts'])) {
  180. $this->cacheCounts = TRUE;
  181. }
  182. if (!empty($options['skip_count'])) {
  183. $this->skipCount = TRUE;
  184. }
  185. if (!empty($options['cache_key'])) {
  186. $this->cacheKey = $options['cache_key'];
  187. }
  188. if (!empty($options['track_changes'])) {
  189. $this->trackChanges = $options['track_changes'];
  190. }
  191. }
  192. /**
  193. * Default implementations of Iterator methods - many derivations will find
  194. * these adequate and will only need to implement rewind() and next()
  195. */
  196. /**
  197. * Implementation of Iterator::current() - called when entering a loop
  198. * iteration, returning the current row
  199. */
  200. public function current() {
  201. return $this->currentRow;
  202. }
  203. /**
  204. * Implementation of Iterator::key - called when entering a loop iteration, returning
  205. * the key of the current row. It must be a scalar - we will serialize
  206. * to fulfill the requirement, but using getCurrentKey() is preferable.
  207. */
  208. public function key() {
  209. return serialize($this->currentKey);
  210. }
  211. /**
  212. * Implementation of Iterator::valid() - called at the top of the loop, returning
  213. * TRUE to process the loop and FALSE to terminate it
  214. */
  215. public function valid() {
  216. return !is_null($this->currentRow);
  217. }
  218. /**
  219. * Implementation of Iterator::rewind() - subclasses of MigrateSource should
  220. * implement performRewind() to do any class-specific setup for iterating
  221. * source records.
  222. */
  223. public function rewind() {
  224. $this->activeMigration = Migration::currentMigration();
  225. $this->activeMap = $this->activeMigration->getMap();
  226. $this->numProcessed = 0;
  227. $this->numIgnored = 0;
  228. $this->highwaterField = $this->activeMigration->getHighwaterField();
  229. if (!empty($this->highwaterField)) {
  230. $this->originalHighwater = $this->activeMigration->getHighwater();
  231. }
  232. if ($this->activeMigration->getOption('idlist')) {
  233. $this->idList = explode(',', $this->activeMigration->getOption('idlist'));
  234. }
  235. else {
  236. $this->idList = array();
  237. }
  238. migrate_instrument_start(get_class($this) . ' performRewind');
  239. $this->performRewind();
  240. migrate_instrument_stop(get_class($this) . ' performRewind');
  241. $this->next();
  242. }
  243. /**
  244. * Implementation of Iterator::next() - subclasses of MigrateSource should
  245. * implement getNextRow() to retrieve the next valid source rocord to process.
  246. */
  247. public function next() {
  248. $this->currentKey = NULL;
  249. $this->currentRow = NULL;
  250. migrate_instrument_start(get_class($this) . ' getNextRow');
  251. while ($row = $this->getNextRow()) {
  252. migrate_instrument_stop(get_class($this) . ' getNextRow');
  253. // Populate the source key for this row
  254. $this->currentKey = $this->activeMigration->prepareKey(
  255. $this->activeMap->getSourceKey(), $row);
  256. // Pick up the existing map row, if any, unless getNextRow() did it.
  257. if (!$this->mapRowAdded) {
  258. $map_row = $this->activeMap->getRowBySource($this->currentKey);
  259. // Add map info to the row, if present
  260. if ($map_row) {
  261. foreach ($map_row as $field => $value) {
  262. $field = 'migrate_map_' . $field;
  263. $row->$field = $value;
  264. }
  265. }
  266. }
  267. // First, determine if this row should be passed to prepareRow(), or
  268. // skipped entirely. The rules are:
  269. // 1. If there's an explicit idlist, that's all we care about (ignore
  270. // highwaters and map rows).
  271. $prepared = FALSE;
  272. if (!empty($this->idList)) {
  273. // Check first source key.
  274. if (!in_array(reset($this->currentKey), $this->idList)) {
  275. // If this is a compound source key, check the full key.
  276. $compoundKey = implode($this->multikeySeparator, $this->currentKey);
  277. if (count($this->currentKey) == 1 || !in_array($compoundKey, $this->idList)) {
  278. // Could not find the key, skip.
  279. continue;
  280. }
  281. }
  282. }
  283. // 2. If the row is not in the map (we have never tried to import it
  284. // before), we always want to try it.
  285. elseif (!isset($row->migrate_map_sourceid1)) {
  286. // Fall through
  287. }
  288. // 3. If the row is marked as needing update, pass it.
  289. elseif ($row->migrate_map_needs_update == MigrateMap::STATUS_NEEDS_UPDATE) {
  290. // Fall through
  291. }
  292. // 4. At this point, we have a row which has previously been imported and
  293. // not marked for update. If we're not using highwater marks, then we
  294. // will not take this row. Except, if we're looking for changes in the
  295. // data, we need to go through prepareRow() before we can decide to
  296. // skip it.
  297. elseif (empty($this->highwaterField)) {
  298. if ($this->trackChanges) {
  299. if ($this->prepareRow($row) !== FALSE) {
  300. if ($this->dataChanged($row)) {
  301. // This is a keeper
  302. $this->currentRow = $row;
  303. break;
  304. }
  305. else {
  306. // No change, skip it.
  307. continue;
  308. }
  309. }
  310. else {
  311. // prepareRow() told us to skip it.
  312. continue;
  313. }
  314. }
  315. else {
  316. // No highwater and not tracking changes, skip.
  317. continue;
  318. }
  319. }
  320. // 5. The initial highwater mark, before anything is migrated, is ''. We
  321. // want to make sure we don't mistakenly skip rows with a highwater
  322. // field value of 0, so explicitly handle '' here.
  323. elseif ($this->originalHighwater === '') {
  324. // Fall through
  325. }
  326. // 6. So, we are using highwater marks. Take the row if its highwater
  327. // field value is greater than the saved mark, otherwise skip it.
  328. else {
  329. // Call prepareRow() here, in case the highwaterField needs preparation
  330. if ($this->prepareRow($row) !== FALSE) {
  331. if ($row->{$this->highwaterField['name']} > $this->originalHighwater) {
  332. $this->currentRow = $row;
  333. break;
  334. }
  335. else {
  336. // Skip
  337. continue;
  338. }
  339. }
  340. $prepared = TRUE;
  341. }
  342. // Allow the Migration to prepare this row. prepareRow() can return boolean
  343. // FALSE to ignore this row.
  344. if (!$prepared) {
  345. if ($this->prepareRow($row) !== FALSE) {
  346. // Finally, we've got a keeper.
  347. $this->currentRow = $row;
  348. break;
  349. }
  350. else {
  351. $this->currentRow = NULL;
  352. }
  353. }
  354. }
  355. migrate_instrument_stop(get_class($this) . ' getNextRow');
  356. if (!$this->currentRow) {
  357. $this->currentKey = NULL;
  358. }
  359. }
  360. /**
  361. * Give the calling migration a shot at manipulating, and possibly rejecting,
  362. * the source row.
  363. *
  364. * @return bool
  365. * FALSE if the row is to be skipped.
  366. */
  367. protected function prepareRow($row) {
  368. migrate_instrument_start(get_class($this->activeMigration) . ' prepareRow');
  369. $return = $this->activeMigration->prepareRow($row);
  370. migrate_instrument_stop(get_class($this->activeMigration) . ' prepareRow');
  371. // We're explicitly skipping this row - keep track in the map table
  372. if ($return === FALSE) {
  373. // Make sure we replace any previous messages for this item with any
  374. // new ones.
  375. $this->activeMigration->getMap()->delete($this->currentKey, TRUE);
  376. $this->activeMigration->saveQueuedMessages();
  377. $this->activeMigration->getMap()->saveIDMapping($row, array(),
  378. MigrateMap::STATUS_IGNORED, $this->activeMigration->rollbackAction);
  379. $this->numIgnored++;
  380. $this->currentRow = NULL;
  381. $this->currentKey = NULL;
  382. }
  383. else {
  384. $return = TRUE;
  385. // When tracking changed data, We want to quietly skip (rather than
  386. // "ignore") rows with changes. The caller needs to make that decision,
  387. // so we need to provide them with the necessary information (before and
  388. // after hashes).
  389. if ($this->trackChanges) {
  390. $unhashed_row = clone ($row);
  391. // Remove all map data, otherwise we'll have a false positive on the
  392. // second import (attempt) on a row.
  393. foreach ($unhashed_row as $field => $data) {
  394. if (strpos($field, 'migrate_map_') === 0) {
  395. unset($unhashed_row->$field);
  396. }
  397. }
  398. $row->migrate_map_original_hash = isset($row->migrate_map_hash) ?
  399. $row->migrate_map_hash : '';
  400. $row->migrate_map_hash = $this->hash($unhashed_row);
  401. }
  402. else {
  403. $row->migrate_map_hash = '';
  404. }
  405. }
  406. $this->numProcessed++;
  407. return $return;
  408. }
  409. /**
  410. * Determine whether this row has changed, and therefore whether it should
  411. * be processed.
  412. *
  413. * @param $row
  414. *
  415. * @return bool
  416. */
  417. protected function dataChanged($row) {
  418. if ($row->migrate_map_original_hash != $row->migrate_map_hash) {
  419. $return = TRUE;
  420. }
  421. else {
  422. $return = FALSE;
  423. }
  424. return $return;
  425. }
  426. /**
  427. * Generate a hash of the source row.
  428. *
  429. * @param $row
  430. *
  431. * @return string
  432. */
  433. protected function hash($row) {
  434. migrate_instrument_start('MigrateSource::hash');
  435. $hash = md5(serialize($row));
  436. migrate_instrument_stop('MigrateSource::hash');
  437. return $hash;
  438. }
  439. }