source.inc 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483
  1. <?php
  2. /**
  3. * @file
  4. * Define base for migration sources.
  5. */
  6. /**
  7. * Abstract base class for source handling.
  8. *
  9. * Derived classes are expected to define __toString(), returning a string
  10. * describing the source and significant options. See
  11. * MigrateSourceSQL for an example.
  12. */
  13. abstract class MigrateSource implements Iterator {
  14. /**
  15. * The current row from the quey
  16. *
  17. * @var stdClass
  18. */
  19. protected $currentRow;
  20. /**
  21. * The primary key of the current row
  22. *
  23. * @var array
  24. */
  25. protected $currentKey;
  26. public function getCurrentKey() {
  27. return $this->currentKey;
  28. }
  29. /**
  30. * The Migration class currently invoking us, during rewind() and next().
  31. *
  32. * @var Migration
  33. */
  34. protected $activeMigration;
  35. /**
  36. * The MigrateMap class for the current migration.
  37. *
  38. * @var MigrateMap
  39. */
  40. protected $activeMap;
  41. /**
  42. * Number of rows intentionally ignored (prepareRow() returned FALSE)
  43. *
  44. * @var int
  45. */
  46. protected $numIgnored = 0;
  47. public function getIgnored() {
  48. return $this->numIgnored;
  49. }
  50. /**
  51. * Number of rows we've at least looked at.
  52. *
  53. * @var int
  54. */
  55. protected $numProcessed = 0;
  56. public function getProcessed() {
  57. return $this->numProcessed;
  58. }
  59. /**
  60. * Reset numIgnored back to 0.
  61. */
  62. public function resetStats() {
  63. $this->numIgnored = 0;
  64. }
  65. /**
  66. * Information on the highwater mark for the current migration, if any.
  67. *
  68. * @var array
  69. */
  70. protected $highwaterField;
  71. /**
  72. * The highwater mark at the beginning of the import operation.
  73. *
  74. * @var
  75. */
  76. protected $originalHighwater = '';
  77. /**
  78. * Used in the case of multiple key sources that need to use idlist.
  79. *
  80. * @var string
  81. */
  82. protected $multikeySeparator = ':';
  83. /**
  84. * List of source IDs to process.
  85. *
  86. * @var array
  87. */
  88. protected $idList = array();
  89. /**
  90. * Derived classes must implement fields(), returning a list of available
  91. * source fields.
  92. *
  93. * @return array
  94. * Keys: machine names of the fields (to be passed to addFieldMapping)
  95. * Values: Human-friendly descriptions of the fields.
  96. */
  97. abstract public function fields();
  98. /**
  99. * Whether this instance should cache the source count.
  100. *
  101. * @var boolean
  102. */
  103. protected $cacheCounts = FALSE;
  104. /**
  105. * Key to use for caching counts.
  106. *
  107. * @var string
  108. */
  109. protected $cacheKey;
  110. /**
  111. * Whether this instance should not attempt to count the source.
  112. *
  113. * @var boolean
  114. */
  115. protected $skipCount = FALSE;
  116. /**
  117. * If TRUE, we will maintain hashed source rows to determine whether incoming
  118. * data has changed.
  119. *
  120. * @var bool
  121. */
  122. protected $trackChanges = FALSE;
  123. /**
  124. * By default, next() will directly read the map row and add it to the data
  125. * row. A source plugin implementation may do this itself (in particular, the
  126. * SQL source can incorporate the map table into the query) - if so, it should
  127. * set this TRUE so we don't duplicate the effort.
  128. *
  129. * @var bool
  130. */
  131. protected $mapRowAdded = FALSE;
  132. /**
  133. * Return a count of available source records, from the cache if appropriate.
  134. * Returns -1 if the source is not countable.
  135. *
  136. * @param boolean $refresh
  137. */
  138. public function count($refresh = FALSE) {
  139. if ($this->skipCount) {
  140. return -1;
  141. }
  142. if (!isset($this->cacheKey)) {
  143. $this->cacheKey = md5((string) $this);
  144. }
  145. // If a refresh is requested, or we're not caching counts, ask the derived
  146. // class to get the count from the source.
  147. if ($refresh || !$this->cacheCounts) {
  148. $count = $this->computeCount();
  149. cache_set($this->cacheKey, $count, 'cache');
  150. }
  151. else {
  152. // Caching is in play, first try to retrieve a cached count.
  153. $cache_object = cache_get($this->cacheKey, 'cache');
  154. if (is_object($cache_object)) {
  155. // Success
  156. $count = $cache_object->data;
  157. }
  158. else {
  159. // No cached count, ask the derived class to count 'em up, and cache
  160. // the result
  161. $count = $this->computeCount();
  162. cache_set($this->cacheKey, $count, 'cache');
  163. }
  164. }
  165. return $count;
  166. }
  167. /**
  168. * Derived classes must implement computeCount(), to retrieve a fresh count of
  169. * source records.
  170. */
  171. //abstract public function computeCount();
  172. /**
  173. * Class constructor.
  174. *
  175. * @param array $options
  176. * Optional array of options.
  177. */
  178. public function __construct($options = array()) {
  179. if (!empty($options['cache_counts'])) {
  180. $this->cacheCounts = TRUE;
  181. }
  182. if (!empty($options['skip_count'])) {
  183. $this->skipCount = TRUE;
  184. }
  185. if (!empty($options['cache_key'])) {
  186. $this->cacheKey = $options['cache_key'];
  187. }
  188. if (!empty($options['track_changes'])) {
  189. $this->trackChanges = $options['track_changes'];
  190. }
  191. }
  192. /**
  193. * Default implementations of Iterator methods - many derivations will find
  194. * these adequate and will only need to implement rewind() and next()
  195. */
  196. /**
  197. * Implementation of Iterator::current() - called when entering a loop
  198. * iteration, returning the current row
  199. */
  200. public function current() {
  201. return $this->currentRow;
  202. }
  203. /**
  204. * Implementation of Iterator::key - called when entering a loop iteration,
  205. * returning the key of the current row. It must be a scalar - we will
  206. * serialize to fulfill the requirement, but using getCurrentKey() is
  207. * preferable.
  208. */
  209. public function key() {
  210. return serialize($this->currentKey);
  211. }
  212. /**
  213. * Implementation of Iterator::valid() - called at the top of the loop,
  214. * returning TRUE to process the loop and FALSE to terminate it
  215. */
  216. public function valid() {
  217. return !is_null($this->currentRow);
  218. }
  219. /**
  220. * Implementation of Iterator::rewind() - subclasses of MigrateSource should
  221. * implement performRewind() to do any class-specific setup for iterating
  222. * source records.
  223. */
  224. public function rewind() {
  225. $this->activeMigration = Migration::currentMigration();
  226. $this->activeMap = $this->activeMigration->getMap();
  227. $this->numProcessed = 0;
  228. $this->numIgnored = 0;
  229. $this->highwaterField = $this->activeMigration->getHighwaterField();
  230. if (!empty($this->highwaterField)) {
  231. $this->originalHighwater = $this->activeMigration->getHighwater();
  232. }
  233. if ($this->activeMigration->getOption('idlist')) {
  234. $this->idList = explode(',', $this->activeMigration->getOption('idlist'));
  235. }
  236. else {
  237. $this->idList = array();
  238. }
  239. migrate_instrument_start(get_class($this) . ' performRewind');
  240. $this->performRewind();
  241. migrate_instrument_stop(get_class($this) . ' performRewind');
  242. $this->next();
  243. }
  244. /**
  245. * Implementation of Iterator::next() - subclasses of MigrateSource should
  246. * implement getNextRow() to retrieve the next valid source rocord to process.
  247. */
  248. public function next() {
  249. $this->currentKey = NULL;
  250. $this->currentRow = NULL;
  251. migrate_instrument_start(get_class($this) . ' getNextRow');
  252. while ($row = $this->getNextRow()) {
  253. migrate_instrument_stop(get_class($this) . ' getNextRow');
  254. // Populate the source key for this row
  255. $this->currentKey = $this->activeMigration->prepareKey(
  256. $this->activeMap->getSourceKey(), $row);
  257. // Pick up the existing map row, if any, unless getNextRow() did it.
  258. if (!$this->mapRowAdded) {
  259. $map_row = $this->activeMap->getRowBySource($this->currentKey);
  260. // Add map info to the row, if present
  261. if ($map_row) {
  262. foreach ($map_row as $field => $value) {
  263. $field = 'migrate_map_' . $field;
  264. $row->{$field} = $value;
  265. }
  266. }
  267. }
  268. // First, determine if this row should be passed to prepareRow(), or
  269. // skipped entirely. The rules are:
  270. // 1. If there's an explicit idlist, that's all we care about (ignore
  271. // highwaters and map rows).
  272. $prepared = FALSE;
  273. if (!empty($this->idList)) {
  274. // Check first source key.
  275. if (!in_array(reset($this->currentKey), $this->idList)) {
  276. // If this is a compound source key, check the full key.
  277. $compoundKey = implode($this->multikeySeparator, $this->currentKey);
  278. if (count($this->currentKey) == 1 || !in_array($compoundKey, $this->idList)) {
  279. // Could not find the key, skip.
  280. continue;
  281. }
  282. }
  283. }
  284. // 2. If the row is not in the map (we have never tried to import it
  285. // before), we always want to try it.
  286. elseif (!isset($row->migrate_map_sourceid1)) {
  287. // Fall through
  288. }
  289. // 3. If the row is marked as needing update, pass it.
  290. elseif ($row->migrate_map_needs_update == MigrateMap::STATUS_NEEDS_UPDATE) {
  291. // Fall through
  292. }
  293. // 4. At this point, we have a row which has previously been imported and
  294. // not marked for update. If we're not using highwater marks, then we
  295. // will not take this row. Except, if we're looking for changes in the
  296. // data, we need to go through prepareRow() before we can decide to
  297. // skip it.
  298. elseif (empty($this->highwaterField)) {
  299. if ($this->trackChanges) {
  300. if ($this->prepareRow($row) !== FALSE) {
  301. if ($this->dataChanged($row)) {
  302. // This is a keeper
  303. $this->currentRow = $row;
  304. break;
  305. }
  306. else {
  307. // No change, skip it.
  308. continue;
  309. }
  310. }
  311. else {
  312. // prepareRow() told us to skip it.
  313. continue;
  314. }
  315. }
  316. else {
  317. // No highwater and not tracking changes, skip.
  318. continue;
  319. }
  320. }
  321. // 5. The initial highwater mark, before anything is migrated, is ''. We
  322. // want to make sure we don't mistakenly skip rows with a highwater
  323. // field value of 0, so explicitly handle '' here.
  324. elseif ($this->originalHighwater === '') {
  325. // Fall through
  326. }
  327. // 6. So, we are using highwater marks. Take the row if its highwater
  328. // field value is greater than the saved mark, otherwise skip it.
  329. else {
  330. // Call prepareRow() here, in case the highwaterField needs preparation
  331. if ($this->prepareRow($row) !== FALSE) {
  332. if ($row->{$this->highwaterField['name']} > $this->originalHighwater) {
  333. $this->currentRow = $row;
  334. break;
  335. }
  336. else {
  337. // Skip
  338. continue;
  339. }
  340. }
  341. $prepared = TRUE;
  342. }
  343. // Allow the Migration to prepare this row. prepareRow() can return boolean
  344. // FALSE to ignore this row.
  345. if (!$prepared) {
  346. if ($this->prepareRow($row) !== FALSE) {
  347. // Finally, we've got a keeper.
  348. $this->currentRow = $row;
  349. break;
  350. }
  351. else {
  352. $this->currentRow = NULL;
  353. }
  354. }
  355. }
  356. migrate_instrument_stop(get_class($this) . ' getNextRow');
  357. if (!$this->currentRow) {
  358. $this->currentKey = NULL;
  359. }
  360. }
  361. /**
  362. * Give the calling migration a shot at manipulating, and possibly rejecting,
  363. * the source row.
  364. *
  365. * @return bool
  366. * FALSE if the row is to be skipped.
  367. */
  368. protected function prepareRow($row) {
  369. migrate_instrument_start(get_class($this->activeMigration) . ' prepareRow');
  370. $return = $this->activeMigration->prepareRow($row);
  371. migrate_instrument_stop(get_class($this->activeMigration) . ' prepareRow');
  372. // We're explicitly skipping this row - keep track in the map table
  373. if ($return === FALSE) {
  374. // Make sure we replace any previous messages for this item with any
  375. // new ones.
  376. $this->activeMigration->getMap()->delete($this->currentKey, TRUE);
  377. $this->activeMigration->saveQueuedMessages();
  378. $this->activeMigration->getMap()->saveIDMapping($row, array(),
  379. MigrateMap::STATUS_IGNORED, $this->activeMigration->rollbackAction);
  380. $this->numIgnored++;
  381. $this->currentRow = NULL;
  382. $this->currentKey = NULL;
  383. }
  384. else {
  385. $return = TRUE;
  386. // When tracking changed data, We want to quietly skip (rather than
  387. // "ignore") rows with changes. The caller needs to make that decision,
  388. // so we need to provide them with the necessary information (before and
  389. // after hashes).
  390. if ($this->trackChanges) {
  391. $unhashed_row = clone $row;
  392. // Remove all map data, otherwise we'll have a false positive on the
  393. // second import (attempt) on a row.
  394. foreach ($unhashed_row as $field => $data) {
  395. if (strpos($field, 'migrate_map_') === 0) {
  396. unset($unhashed_row->{$field});
  397. }
  398. }
  399. $row->migrate_map_original_hash = isset($row->migrate_map_hash) ?
  400. $row->migrate_map_hash : '';
  401. $row->migrate_map_hash = $this->hash($unhashed_row);
  402. }
  403. else {
  404. $row->migrate_map_hash = '';
  405. }
  406. }
  407. $this->numProcessed++;
  408. return $return;
  409. }
  410. /**
  411. * Determine whether this row has changed, and therefore whether it should
  412. * be processed.
  413. *
  414. * @param $row
  415. *
  416. * @return bool
  417. */
  418. protected function dataChanged($row) {
  419. if ($row->migrate_map_original_hash != $row->migrate_map_hash) {
  420. $return = TRUE;
  421. }
  422. else {
  423. $return = FALSE;
  424. }
  425. return $return;
  426. }
  427. /**
  428. * Generate a hash of the source row.
  429. *
  430. * @param $row
  431. *
  432. * @return string
  433. */
  434. protected function hash($row) {
  435. migrate_instrument_start('MigrateSource::hash');
  436. $hash = md5(serialize($row));
  437. migrate_instrument_stop('MigrateSource::hash');
  438. return $hash;
  439. }
  440. }