sqlmap.inc 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739
  1. <?php
  2. /**
  3. * @file
  4. * Defines a Drupal db-based implementation of MigrateMap.
  5. */
  6. class MigrateSQLMap extends MigrateMap {
  7. /**
  8. * Names of tables created for tracking the migration.
  9. *
  10. * @var string
  11. */
  12. protected $mapTable, $messageTable;
  13. public function getMapTable() {
  14. return $this->mapTable;
  15. }
  16. public function getMessageTable() {
  17. return $this->messageTable;
  18. }
  19. /**
  20. * Qualifying the map table name with the database name makes cross-db joins
  21. * possible. Note that, because prefixes are applied after we do this (i.e.,
  22. * it will prefix the string we return), we do not qualify the table if it has
  23. * a prefix. This will work fine when the source data is in the default
  24. * (prefixed) database (in particular, for simpletest), but not if the primary
  25. * query is in an external database.
  26. *
  27. * @return string
  28. */
  29. public function getQualifiedMapTable() {
  30. $options = $this->connection->getConnectionOptions();
  31. $prefix = $this->connection->tablePrefix($this->mapTable);
  32. if ($prefix) {
  33. return $this->mapTable;
  34. }
  35. else {
  36. return $options['database'] . '.' . $this->mapTable;
  37. }
  38. }
  39. /**
  40. * sourceKey and destinationKey arrays are keyed by the field names; values
  41. * are the Drupal schema definition for the field.
  42. *
  43. * @var array
  44. */
  45. public function getSourceKey() {
  46. return $this->sourceKey;
  47. }
  48. public function getDestinationKey() {
  49. return $this->destinationKey;
  50. }
  51. /**
  52. * Drupal connection object on which to create the map/message tables
  53. * @var DatabaseConnection
  54. */
  55. protected $connection;
  56. public function getConnection() {
  57. return $this->connection;
  58. }
  59. /**
  60. * We don't need to check the tables more than once per request.
  61. *
  62. * @var boolean
  63. */
  64. protected $ensured;
  65. /**
  66. * Provide caching for Source or Desination Map Lookups.
  67. */
  68. protected $cacheMapLookups;
  69. /**
  70. * Constructor.
  71. *
  72. * @param string $machine_name
  73. * The unique reference to the migration that we are mapping.
  74. * @param array $source_key
  75. * The database schema for the source key.
  76. * @param array $destination_key
  77. * The database schema for the destination key.
  78. * @param string $connection_key
  79. * Optional - The connection used to create the mapping tables. By default
  80. * this is the destination (Drupal). If it's not possible to make joins
  81. * between the destination database and your source database you can specify
  82. * a different connection to create the mapping tables on.
  83. * @param array $options
  84. * Optional - Options applied to this source.
  85. */
  86. public function __construct($machine_name, array $source_key,
  87. array $destination_key, $connection_key = 'default', $options = array()) {
  88. if (isset($options['track_last_imported'])) {
  89. $this->trackLastImported = TRUE;
  90. }
  91. if (isset($options['cache_map_lookups'])) {
  92. $this->cacheMapLookups = $options['cache_map_lookups'];
  93. }
  94. else {
  95. $this->cacheMapLookups = FALSE;
  96. }
  97. $this->connection = Database::getConnection('default', $connection_key);
  98. // Default generated table names, limited to 63 characters
  99. $prefixLength = strlen($this->connection->tablePrefix()) ;
  100. $this->mapTable = 'migrate_map_' . drupal_strtolower($machine_name);
  101. $this->mapTable = drupal_substr($this->mapTable, 0, 63 - $prefixLength);
  102. $this->messageTable = 'migrate_message_' . drupal_strtolower($machine_name);
  103. $this->messageTable = drupal_substr($this->messageTable, 0, 63 - $prefixLength);
  104. $this->sourceKey = $source_key;
  105. $this->destinationKey = $destination_key;
  106. // Build the source and destination key maps
  107. $this->sourceKeyMap = array();
  108. $count = 1;
  109. foreach ($source_key as $field => $schema) {
  110. $this->sourceKeyMap[$field] = 'sourceid' . $count++;
  111. }
  112. $this->destinationKeyMap = array();
  113. $count = 1;
  114. foreach ($destination_key as $field => $schema) {
  115. $this->destinationKeyMap[$field] = 'destid' . $count++;
  116. }
  117. $this->ensureTables();
  118. }
  119. /**
  120. * Create the map and message tables if they don't already exist.
  121. */
  122. protected function ensureTables() {
  123. if (!$this->ensured) {
  124. if (!$this->connection->schema()->tableExists($this->mapTable)) {
  125. // Generate appropriate schema info for the map and message tables,
  126. // and map from the source field names to the map/msg field names
  127. $count = 1;
  128. $source_key_schema = array();
  129. $pks = array();
  130. foreach ($this->sourceKey as $field_schema) {
  131. $mapkey = 'sourceid' . $count++;
  132. $source_key_schema[$mapkey] = $field_schema;
  133. $pks[] = $mapkey;
  134. }
  135. $fields = $source_key_schema;
  136. // Add destination keys to map table
  137. // TODO: How do we discover the destination schema?
  138. $count = 1;
  139. foreach ($this->destinationKey as $field_schema) {
  140. // Allow dest key fields to be NULL (for IGNORED/FAILED cases)
  141. $field_schema['not null'] = FALSE;
  142. $mapkey = 'destid' . $count++;
  143. $fields[$mapkey] = $field_schema;
  144. }
  145. $fields['needs_update'] = array(
  146. 'type' => 'int',
  147. 'size' => 'tiny',
  148. 'unsigned' => TRUE,
  149. 'not null' => TRUE,
  150. 'default' => MigrateMap::STATUS_IMPORTED,
  151. 'description' => 'Indicates current status of the source row',
  152. );
  153. $fields['rollback_action'] = array(
  154. 'type' => 'int',
  155. 'size' => 'tiny',
  156. 'unsigned' => TRUE,
  157. 'not null' => TRUE,
  158. 'default' => MigrateMap::ROLLBACK_DELETE,
  159. 'description' => 'Flag indicating what to do for this item on rollback',
  160. );
  161. $fields['last_imported'] = array(
  162. 'type' => 'int',
  163. 'unsigned' => TRUE,
  164. 'not null' => TRUE,
  165. 'default' => 0,
  166. 'description' => 'UNIX timestamp of the last time this row was imported',
  167. );
  168. $fields['hash'] = array(
  169. 'type' => 'varchar',
  170. 'length' => '32',
  171. 'not null' => FALSE,
  172. 'description' => 'Hash of source row data, for detecting changes',
  173. );
  174. $schema = array(
  175. 'description' => t('Mappings from source key to destination key'),
  176. 'fields' => $fields,
  177. 'primary key' => $pks,
  178. );
  179. $this->connection->schema()->createTable($this->mapTable, $schema);
  180. // Now for the message table
  181. $fields = array();
  182. $fields['msgid'] = array(
  183. 'type' => 'serial',
  184. 'unsigned' => TRUE,
  185. 'not null' => TRUE,
  186. );
  187. $fields += $source_key_schema;
  188. $fields['level'] = array(
  189. 'type' => 'int',
  190. 'unsigned' => TRUE,
  191. 'not null' => TRUE,
  192. 'default' => 1,
  193. );
  194. $fields['message'] = array(
  195. 'type' => 'text',
  196. 'size' => 'medium',
  197. 'not null' => TRUE,
  198. );
  199. $schema = array(
  200. 'description' => t('Messages generated during a migration process'),
  201. 'fields' => $fields,
  202. 'primary key' => array('msgid'),
  203. 'indexes' => array('sourcekey' => $pks),
  204. );
  205. $this->connection->schema()->createTable($this->messageTable, $schema);
  206. }
  207. else {
  208. // Add any missing columns to the map table
  209. if (!$this->connection->schema()->fieldExists($this->mapTable,
  210. 'rollback_action')) {
  211. $this->connection->schema()->addField($this->mapTable,
  212. 'rollback_action', array(
  213. 'type' => 'int',
  214. 'size' => 'tiny',
  215. 'unsigned' => TRUE,
  216. 'not null' => TRUE,
  217. 'default' => 0,
  218. 'description' => 'Flag indicating what to do for this item on rollback',
  219. ));
  220. }
  221. if (!$this->connection->schema()->fieldExists($this->mapTable, 'hash')) {
  222. $this->connection->schema()->addField($this->mapTable, 'hash', array(
  223. 'type' => 'varchar',
  224. 'length' => '32',
  225. 'not null' => FALSE,
  226. 'description' => 'Hash of source row data, for detecting changes',
  227. ));
  228. }
  229. }
  230. $this->ensured = TRUE;
  231. }
  232. }
  233. /**
  234. * Retrieve a row from the map table, given a source ID
  235. *
  236. * @param array $source_id
  237. */
  238. public function getRowBySource(array $source_id) {
  239. migrate_instrument_start('mapRowBySource');
  240. $query = $this->connection->select($this->mapTable, 'map')
  241. ->fields('map');
  242. foreach ($this->sourceKeyMap as $key_name) {
  243. $query = $query->condition("map.$key_name", array_shift($source_id), '=');
  244. }
  245. $result = $query->execute();
  246. migrate_instrument_stop('mapRowBySource');
  247. return $result->fetchAssoc();
  248. }
  249. /**
  250. * Retrieve a row from the map table, given a destination ID
  251. *
  252. * @param array $source_id
  253. */
  254. public function getRowByDestination(array $destination_id) {
  255. migrate_instrument_start('getRowByDestination');
  256. $query = $this->connection->select($this->mapTable, 'map')
  257. ->fields('map');
  258. foreach ($this->destinationKeyMap as $key_name) {
  259. $query = $query->condition("map.$key_name", array_shift($destination_id), '=');
  260. }
  261. $result = $query->execute();
  262. migrate_instrument_stop('getRowByDestination');
  263. return $result->fetchAssoc();
  264. }
  265. /**
  266. * Retrieve an array of map rows marked as needing update.
  267. *
  268. * @param int $count
  269. * Maximum rows to return; defaults to 10,000
  270. * @return array
  271. * Array of map row objects with needs_update==1.
  272. */
  273. public function getRowsNeedingUpdate($count) {
  274. $rows = array();
  275. $result = $this->connection->select($this->mapTable, 'map')
  276. ->fields('map')
  277. ->condition('needs_update', MigrateMap::STATUS_NEEDS_UPDATE)
  278. ->range(0, $count)
  279. ->execute();
  280. foreach ($result as $row) {
  281. $rows[] = $row;
  282. }
  283. return $rows;
  284. }
  285. /**
  286. * Given a (possibly multi-field) destination key, return the (possibly multi-field)
  287. * source key mapped to it.
  288. *
  289. * @param array $destination_id
  290. * Array of destination key values.
  291. * @return array
  292. * Array of source key values, or NULL on failure.
  293. */
  294. public function lookupSourceID(array $destination_id) {
  295. migrate_instrument_start('lookupSourceID');
  296. // Try a cache lookup if enabled.
  297. if ($this->cacheMapLookups) {
  298. $cache = &drupal_static($this->mapTable . '_sourceIDCache');
  299. $serialized = json_encode($destination_id);
  300. if (isset($cache[$serialized])) {
  301. migrate_instrument_stop('lookupSourceID');
  302. return $cache[$serialized];
  303. }
  304. }
  305. $query = $this->connection->select($this->mapTable, 'map')
  306. ->fields('map', $this->sourceKeyMap);
  307. foreach ($this->destinationKeyMap as $key_name) {
  308. $query = $query->condition("map.$key_name", array_shift($destination_id), '=');
  309. }
  310. $result = $query->execute();
  311. $source_id = $result->fetchAssoc();
  312. // Store the id in a cache if enabled.
  313. if ($this->cacheMapLookups) {
  314. $cache[$serialized] = $destination_id;
  315. }
  316. migrate_instrument_stop('lookupSourceID');
  317. return $source_id;
  318. }
  319. /**
  320. * Given a (possibly multi-field) source key, return the (possibly multi-field)
  321. * destination key it is mapped to.
  322. *
  323. * @param array $source_id
  324. * Array of source key values.
  325. * @return array
  326. * Array of destination key values, or NULL on failure.
  327. */
  328. public function lookupDestinationID(array $source_id) {
  329. migrate_instrument_start('lookupDestinationID');
  330. // Try a cache lookup if enabled.
  331. if ($this->cacheMapLookups) {
  332. $cache = &drupal_static($this->mapTable . '_destinationIDCache');
  333. $serialized = json_encode($source_id);
  334. if (isset($cache[$serialized])) {
  335. migrate_instrument_stop('lookupDestinationID');
  336. return $cache[$serialized];
  337. }
  338. }
  339. $query = $this->connection->select($this->mapTable, 'map')
  340. ->fields('map', $this->destinationKeyMap);
  341. foreach ($this->sourceKeyMap as $key_name) {
  342. $query = $query->condition("map.$key_name", array_shift($source_id), '=');
  343. }
  344. $result = $query->execute();
  345. $destination_id = $result->fetchAssoc();
  346. // Store the id in a cache if enabled.
  347. if ($this->cacheMapLookups) {
  348. $cache[$serialized] = $destination_id;
  349. }
  350. migrate_instrument_stop('lookupDestinationID');
  351. return $destination_id;
  352. }
  353. /**
  354. * Called upon import of one record, we record a mapping from the source key
  355. * to the destination key. Also may be called, setting the third parameter to
  356. * NEEDS_UPDATE, to signal an existing record should be remigrated.
  357. *
  358. * @param stdClass $source_row
  359. * The raw source data. We use the key map derived from the source object
  360. * to get the source key values.
  361. * @param array $dest_ids
  362. * The destination key values.
  363. * @param int $needs_update
  364. * Status of the source row in the map. Defaults to STATUS_IMPORTED.
  365. * @param int $rollback_action
  366. * How to handle the destination object on rollback. Defaults to
  367. * ROLLBACK_DELETE.
  368. * $param string $hash
  369. * If hashing is enabled, the hash of the raw source row.
  370. */
  371. public function saveIDMapping(stdClass $source_row, array $dest_ids,
  372. $needs_update = MigrateMap::STATUS_IMPORTED,
  373. $rollback_action = MigrateMap::ROLLBACK_DELETE, $hash = NULL) {
  374. migrate_instrument_start('saveIDMapping');
  375. // Construct the source key
  376. $keys = array();
  377. foreach ($this->sourceKeyMap as $field_name => $key_name) {
  378. // A NULL key value will fail.
  379. if (is_null($source_row->$field_name)) {
  380. Migration::displayMessage(t(
  381. 'Could not save to map table due to NULL value for key field !field',
  382. array('!field' => $field_name)));
  383. migrate_instrument_stop('saveIDMapping');
  384. return;
  385. }
  386. $keys[$key_name] = $source_row->$field_name;
  387. }
  388. $fields = array(
  389. 'needs_update' => (int)$needs_update,
  390. 'rollback_action' => (int)$rollback_action,
  391. 'hash' => $hash,
  392. );
  393. $count = 1;
  394. if (!empty($dest_ids)) {
  395. foreach ($dest_ids as $dest_id) {
  396. $fields['destid' . $count++] = $dest_id;
  397. }
  398. }
  399. if ($this->trackLastImported) {
  400. $fields['last_imported'] = time();
  401. }
  402. $this->connection->merge($this->mapTable)
  403. ->key($keys)
  404. ->fields($fields)
  405. ->execute();
  406. migrate_instrument_stop('saveIDMapping');
  407. }
  408. /**
  409. * Record a message in the migration's message table.
  410. *
  411. * @param array $source_key
  412. * Source ID of the record in error
  413. * @param string $message
  414. * The message to record.
  415. * @param int $level
  416. * Optional message severity (defaults to MESSAGE_ERROR).
  417. */
  418. public function saveMessage($source_key, $message, $level = Migration::MESSAGE_ERROR) {
  419. // Source IDs as arguments
  420. $count = 1;
  421. if (is_array($source_key)) {
  422. foreach ($source_key as $key_value) {
  423. $fields['sourceid' . $count++] = $key_value;
  424. // If any key value is not set, we can't save - print out and abort
  425. if (!isset($key_value)) {
  426. print($message);
  427. return;
  428. }
  429. }
  430. $fields['level'] = $level;
  431. $fields['message'] = $message;
  432. $this->connection->insert($this->messageTable)
  433. ->fields($fields)
  434. ->execute();
  435. }
  436. else {
  437. // TODO: What else can we do?
  438. Migration::displayMessage($message);
  439. }
  440. }
  441. /**
  442. * Prepares this migration to run as an update - that is, in addition to
  443. * unmigrated content (source records not in the map table) being imported,
  444. * previously-migrated content will also be updated in place.
  445. */
  446. public function prepareUpdate() {
  447. $this->connection->update($this->mapTable)
  448. ->fields(array('needs_update' => MigrateMap::STATUS_NEEDS_UPDATE))
  449. ->execute();
  450. }
  451. /**
  452. * Returns a count of records in the map table (i.e., the number of
  453. * source records which have been processed for this migration).
  454. *
  455. * @return int
  456. */
  457. public function processedCount() {
  458. $query = $this->connection->select($this->mapTable);
  459. $query->addExpression('COUNT(*)', 'count');
  460. $count = $query->execute()->fetchField();
  461. return $count;
  462. }
  463. /**
  464. * Returns a count of imported records in the map table.
  465. *
  466. * @return int
  467. */
  468. public function importedCount() {
  469. $query = $this->connection->select($this->mapTable);
  470. $query->addExpression('COUNT(*)', 'count');
  471. $query->condition('needs_update', array(MigrateMap::STATUS_IMPORTED, MigrateMap::STATUS_NEEDS_UPDATE), 'IN');
  472. $count = $query->execute()->fetchField();
  473. return $count;
  474. }
  475. /**
  476. * Returns a count of records which are marked as needing update.
  477. *
  478. * @return int
  479. */
  480. public function updateCount() {
  481. $query = $this->connection->select($this->mapTable);
  482. $query->addExpression('COUNT(*)', 'count');
  483. $query->condition('needs_update', MigrateMap::STATUS_NEEDS_UPDATE);
  484. $count = $query->execute()->fetchField();
  485. return $count;
  486. }
  487. /**
  488. * Get the number of source records which failed to import.
  489. *
  490. * @return int
  491. * Number of records errored out.
  492. */
  493. public function errorCount() {
  494. $query = $this->connection->select($this->mapTable);
  495. $query->addExpression('COUNT(*)', 'count');
  496. $query->condition('needs_update', MigrateMap::STATUS_FAILED);
  497. $count = $query->execute()->fetchField();
  498. return $count;
  499. }
  500. /**
  501. * Get the number of messages saved.
  502. *
  503. * @return int
  504. * Number of messages.
  505. */
  506. public function messageCount() {
  507. $query = $this->connection->select($this->messageTable);
  508. $query->addExpression('COUNT(*)', 'count');
  509. $count = $query->execute()->fetchField();
  510. return $count;
  511. }
  512. /**
  513. * Delete the map entry and any message table entries for the specified source row.
  514. *
  515. * @param array $source_key
  516. */
  517. public function delete(array $source_key, $messages_only = FALSE) {
  518. if (!$messages_only) {
  519. $map_query = $this->connection->delete($this->mapTable);
  520. }
  521. $message_query = $this->connection->delete($this->messageTable);
  522. $count = 1;
  523. foreach ($source_key as $key_value) {
  524. if (!$messages_only) {
  525. $map_query->condition('sourceid' . $count, $key_value);
  526. }
  527. $message_query->condition('sourceid' . $count, $key_value);
  528. $count++;
  529. }
  530. if (!$messages_only) {
  531. $map_query->execute();
  532. }
  533. $message_query->execute();
  534. }
  535. /**
  536. * Delete the map entry and any message table entries for the specified destination row.
  537. *
  538. * @param array $destination_key
  539. */
  540. public function deleteDestination(array $destination_key) {
  541. $map_query = $this->connection->delete($this->mapTable);
  542. $message_query = $this->connection->delete($this->messageTable);
  543. $source_key = $this->lookupSourceID($destination_key);
  544. if (!empty($source_key)) {
  545. $count = 1;
  546. foreach ($destination_key as $key_value) {
  547. $map_query->condition('destid' . $count, $key_value);
  548. $count++;
  549. }
  550. $map_query->execute();
  551. $count = 1;
  552. foreach ($source_key as $key_value) {
  553. $message_query->condition('sourceid' . $count, $key_value);
  554. $count++;
  555. }
  556. $message_query->execute();
  557. }
  558. }
  559. /**
  560. * Set the specified row to be updated, if it exists.
  561. */
  562. public function setUpdate(array $source_key) {
  563. $query = $this->connection->update($this->mapTable)
  564. ->fields(array('needs_update' => MigrateMap::STATUS_NEEDS_UPDATE));
  565. $count = 1;
  566. foreach ($source_key as $key_value) {
  567. $query->condition('sourceid' . $count++, $key_value);
  568. }
  569. $query->execute();
  570. }
  571. /**
  572. * Delete all map and message table entries specified.
  573. *
  574. * @param array $source_keys
  575. * Each array member is an array of key fields for one source row.
  576. */
  577. public function deleteBulk(array $source_keys) {
  578. // If we have a single-column key, we can shortcut it
  579. if (count($this->sourceKey) == 1) {
  580. $sourceids = array();
  581. foreach ($source_keys as $source_key) {
  582. $sourceids[] = $source_key;
  583. }
  584. $this->connection->delete($this->mapTable)
  585. ->condition('sourceid1', $sourceids, 'IN')
  586. ->execute();
  587. $this->connection->delete($this->messageTable)
  588. ->condition('sourceid1', $sourceids, 'IN')
  589. ->execute();
  590. }
  591. else {
  592. foreach ($source_keys as $source_key) {
  593. $map_query = $this->connection->delete($this->mapTable);
  594. $message_query = $this->connection->delete($this->messageTable);
  595. $count = 1;
  596. foreach ($source_key as $key_value) {
  597. $map_query->condition('sourceid' . $count, $key_value);
  598. $message_query->condition('sourceid' . $count++, $key_value);
  599. }
  600. $map_query->execute();
  601. $message_query->execute();
  602. }
  603. }
  604. }
  605. /**
  606. * Clear all messages from the message table.
  607. */
  608. public function clearMessages() {
  609. $this->connection->truncate($this->messageTable)
  610. ->execute();
  611. }
  612. /**
  613. * Remove the associated map and message tables.
  614. */
  615. public function destroy() {
  616. $this->connection->schema()->dropTable($this->mapTable);
  617. $this->connection->schema()->dropTable($this->messageTable);
  618. }
  619. protected $result = NULL;
  620. protected $currentRow = NULL;
  621. protected $currentKey = array();
  622. public function getCurrentKey() {
  623. return $this->currentKey;
  624. }
  625. /**
  626. * Implementation of Iterator::rewind() - called before beginning a foreach loop.
  627. * TODO: Support idlist, itemlimit
  628. */
  629. public function rewind() {
  630. $this->currentRow = NULL;
  631. $fields = array();
  632. foreach ($this->sourceKeyMap as $field) {
  633. $fields[] = $field;
  634. }
  635. foreach ($this->destinationKeyMap as $field) {
  636. $fields[] = $field;
  637. }
  638. /* TODO
  639. if (isset($this->options['itemlimit'])) {
  640. $query = $query->range(0, $this->options['itemlimit']);
  641. }
  642. */
  643. $this->result = $this->connection->select($this->mapTable, 'map')
  644. ->fields('map', $fields)
  645. ->execute();
  646. $this->next();
  647. }
  648. /**
  649. * Implementation of Iterator::current() - called when entering a loop
  650. * iteration, returning the current row
  651. */
  652. public function current() {
  653. return $this->currentRow;
  654. }
  655. /**
  656. * Implementation of Iterator::key - called when entering a loop iteration, returning
  657. * the key of the current row. It must be a scalar - we will serialize
  658. * to fulfill the requirement, but using getCurrentKey() is preferable.
  659. */
  660. public function key() {
  661. return serialize($this->currentKey);
  662. }
  663. /**
  664. * Implementation of Iterator::next() - called at the bottom of the loop implicitly,
  665. * as well as explicitly from rewind().
  666. */
  667. public function next() {
  668. $this->currentRow = $this->result->fetchObject();
  669. $this->currentKey = array();
  670. if (!is_object($this->currentRow)) {
  671. $this->currentRow = NULL;
  672. }
  673. else {
  674. foreach ($this->sourceKeyMap as $map_field) {
  675. $this->currentKey[$map_field] = $this->currentRow->$map_field;
  676. // Leave only destination fields
  677. unset($this->currentRow->$map_field);
  678. }
  679. }
  680. }
  681. /**
  682. * Implementation of Iterator::valid() - called at the top of the loop, returning
  683. * TRUE to process the loop and FALSE to terminate it
  684. */
  685. public function valid() {
  686. // TODO: Check numProcessed against itemlimit
  687. return !is_null($this->currentRow);
  688. }
  689. }