sqlmap.inc 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622
  1. <?php
  2. /**
  3. * @file
  4. * Defines a Drupal db-based implementation of MigrateMap.
  5. */
  6. class MigrateSQLMap extends MigrateMap {
  7. /**
  8. * Names of tables created for tracking the migration.
  9. *
  10. * @var string
  11. */
  12. protected $mapTable, $messageTable;
  13. public function getMapTable() {
  14. return $this->mapTable;
  15. }
  16. public function getMessageTable() {
  17. return $this->messageTable;
  18. }
  19. /**
  20. * Qualifying the map table name with the database name makes cross-db joins
  21. * possible. Note that, because prefixes are applied after we do this (i.e.,
  22. * it will prefix the string we return), we do not qualify the table if it has
  23. * a prefix. This will work fine when the source data is in the default
  24. * (prefixed) database (in particular, for simpletest), but not if the primary
  25. * query is in an external database.
  26. *
  27. * @return string
  28. */
  29. public function getQualifiedMapTable() {
  30. $options = $this->connection->getConnectionOptions();
  31. $prefix = $this->connection->tablePrefix($this->mapTable);
  32. if ($prefix) {
  33. return $this->mapTable;
  34. }
  35. else {
  36. return $options['database'] . '.' . $this->mapTable;
  37. }
  38. }
  39. /**
  40. * sourceKey and destinationKey arrays are keyed by the field names; values
  41. * are the Drupal schema definition for the field.
  42. *
  43. * @var array
  44. */
  45. public function getSourceKey() {
  46. return $this->sourceKey;
  47. }
  48. public function getDestinationKey() {
  49. return $this->destinationKey;
  50. }
  51. /**
  52. * Drupal connection object on which to create the map/message tables
  53. * @var DatabaseConnection
  54. */
  55. protected $connection;
  56. public function getConnection() {
  57. return $this->connection;
  58. }
  59. /**
  60. * We don't need to check the tables more than once per request.
  61. *
  62. * @var boolean
  63. */
  64. protected $ensured;
  65. public function __construct($machine_name, array $source_key,
  66. array $destination_key, $connection_key = 'default') {
  67. // Default generated table names, limited to 63 characters
  68. $this->mapTable = 'migrate_map_' . drupal_strtolower($machine_name);
  69. $this->mapTable = drupal_substr($this->mapTable, 0, 63);
  70. $this->messageTable = 'migrate_message_' . drupal_strtolower($machine_name);
  71. $this->messageTable = drupal_substr($this->messageTable, 0, 63);
  72. $this->sourceKey = $source_key;
  73. $this->destinationKey = $destination_key;
  74. $this->connection = Database::getConnection('default', $connection_key);
  75. // Build the source and destination key maps
  76. $this->sourceKeyMap = array();
  77. $count = 1;
  78. foreach ($source_key as $field => $schema) {
  79. $this->sourceKeyMap[$field] = 'sourceid' . $count++;
  80. }
  81. $this->destinationKeyMap = array();
  82. $count = 1;
  83. foreach ($destination_key as $field => $schema) {
  84. $this->destinationKeyMap[$field] = 'destid' . $count++;
  85. }
  86. $this->ensureTables();
  87. }
  88. /**
  89. * Create the map and message tables if they don't already exist.
  90. */
  91. protected function ensureTables() {
  92. if (!$this->ensured) {
  93. if (!$this->connection->schema()->tableExists($this->mapTable)) {
  94. // Generate appropriate schema info for the map and message tables,
  95. // and map from the source field names to the map/msg field names
  96. $count = 1;
  97. $source_key_schema = array();
  98. $pks = array();
  99. foreach ($this->sourceKey as $field_schema) {
  100. $mapkey = 'sourceid' . $count++;
  101. $source_key_schema[$mapkey] = $field_schema;
  102. $pks[] = $mapkey;
  103. }
  104. $fields = $source_key_schema;
  105. // Add destination keys to map table
  106. // TODO: How do we discover the destination schema?
  107. $count = 1;
  108. foreach ($this->destinationKey as $field_schema) {
  109. // Allow dest key fields to be NULL (for IGNORED/FAILED cases)
  110. $field_schema['not null'] = FALSE;
  111. $mapkey = 'destid' . $count++;
  112. $fields[$mapkey] = $field_schema;
  113. }
  114. $fields['needs_update'] = array(
  115. 'type' => 'int',
  116. 'size' => 'tiny',
  117. 'unsigned' => TRUE,
  118. 'not null' => TRUE,
  119. 'default' => MigrateMap::STATUS_IMPORTED,
  120. 'description' => 'Indicates current status of the source row',
  121. );
  122. $fields['last_imported'] = array(
  123. 'type' => 'int',
  124. 'unsigned' => TRUE,
  125. 'not null' => TRUE,
  126. 'default' => 0,
  127. 'description' => 'UNIX timestamp of the last time this row was imported',
  128. );
  129. $schema = array(
  130. 'description' => t('Mappings from source key to destination key'),
  131. 'fields' => $fields,
  132. 'primary key' => $pks,
  133. );
  134. $this->connection->schema()->createTable($this->mapTable, $schema);
  135. // Now for the message table
  136. $fields = array();
  137. $fields['msgid'] = array(
  138. 'type' => 'serial',
  139. 'unsigned' => TRUE,
  140. 'not null' => TRUE,
  141. );
  142. $fields += $source_key_schema;
  143. $fields['level'] = array(
  144. 'type' => 'int',
  145. 'unsigned' => TRUE,
  146. 'not null' => TRUE,
  147. 'default' => 1,
  148. );
  149. $fields['message'] = array(
  150. 'type' => 'text',
  151. 'size' => 'medium',
  152. 'not null' => TRUE,
  153. );
  154. $schema = array(
  155. 'description' => t('Messages generated during a migration process'),
  156. 'fields' => $fields,
  157. 'primary key' => array('msgid'),
  158. 'indexes' => array('sourcekey' => $pks),
  159. );
  160. $this->connection->schema()->createTable($this->messageTable, $schema);
  161. }
  162. $this->ensured = TRUE;
  163. }
  164. }
  165. /**
  166. * Retrieve a row from the map table, given a source ID
  167. *
  168. * @param array $source_id
  169. */
  170. public function getRowBySource(array $source_id) {
  171. migrate_instrument_start('mapRowBySource');
  172. $query = $this->connection->select($this->mapTable, 'map')
  173. ->fields('map');
  174. foreach ($this->sourceKeyMap as $key_name) {
  175. $query = $query->condition("map.$key_name", array_shift($source_id), '=');
  176. }
  177. $result = $query->execute();
  178. migrate_instrument_stop('mapRowBySource');
  179. return $result->fetchAssoc();
  180. }
  181. /**
  182. * Retrieve a row from the map table, given a destination ID
  183. *
  184. * @param array $source_id
  185. */
  186. public function getRowByDestination(array $destination_id) {
  187. migrate_instrument_start('mapRowByDestination');
  188. $query = $this->connection->select($this->mapTable, 'map')
  189. ->fields('map');
  190. foreach ($this->destinationKeyMap as $key_name) {
  191. $query = $query->condition("map.$key_name", array_shift($destination_id), '=');
  192. }
  193. $result = $query->execute();
  194. migrate_instrument_stop('mapRowByDestination');
  195. return $result->fetchAssoc();
  196. }
  197. /**
  198. * Retrieve an array of map rows marked as needing update.
  199. *
  200. * @param int $count
  201. * Maximum rows to return; defaults to 10,000
  202. * @return array
  203. * Array of map row objects with needs_update==1.
  204. */
  205. public function getRowsNeedingUpdate($count) {
  206. $rows = array();
  207. $result = db_select($this->mapTable, 'map')
  208. ->fields('map')
  209. ->condition('needs_update', MigrateMap::STATUS_NEEDS_UPDATE)
  210. ->range(0, $count)
  211. ->execute();
  212. foreach ($result as $row) {
  213. $rows[] = $row;
  214. }
  215. return $rows;
  216. }
  217. /**
  218. * Given a (possibly multi-field) destination key, return the (possibly multi-field)
  219. * source key mapped to it.
  220. *
  221. * @param array $destination_id
  222. * Array of destination key values.
  223. * @return array
  224. * Array of source key values, or NULL on failure.
  225. */
  226. public function lookupSourceID(array $destination_id) {
  227. migrate_instrument_start('lookupSourceID');
  228. $query = $this->connection->select($this->mapTable, 'map')
  229. ->fields('map', $this->sourceKeyMap);
  230. foreach ($this->destinationKeyMap as $key_name) {
  231. $query = $query->condition("map.$key_name", array_shift($destination_id), '=');
  232. }
  233. $result = $query->execute();
  234. $source_id = $result->fetchAssoc();
  235. migrate_instrument_stop('lookupSourceID');
  236. return $source_id;
  237. }
  238. /**
  239. * Given a (possibly multi-field) source key, return the (possibly multi-field)
  240. * destination key it is mapped to.
  241. *
  242. * @param array $source_id
  243. * Array of source key values.
  244. * @return array
  245. * Array of destination key values, or NULL on failure.
  246. */
  247. public function lookupDestinationID(array $source_id) {
  248. migrate_instrument_start('lookupDestinationID');
  249. $query = $this->connection->select($this->mapTable, 'map')
  250. ->fields('map', $this->destinationKeyMap);
  251. foreach ($this->sourceKeyMap as $key_name) {
  252. $query = $query->condition("map.$key_name", array_shift($source_id), '=');
  253. }
  254. $result = $query->execute();
  255. $destination_id = $result->fetchAssoc();
  256. migrate_instrument_stop('lookupDestinationID');
  257. return $destination_id;
  258. }
  259. /**
  260. * Called upon successful import of one record, we record a mapping from
  261. * the source key to the destination key. Also may be called, setting the
  262. * third parameter to NEEDS_UPDATE, to signal an existing record should be remigrated.
  263. *
  264. * @param stdClass $source_row
  265. * The raw source data. We use the key map derived from the source object
  266. * to get the source key values.
  267. * @param array $dest_ids
  268. * The destination key values.
  269. * @param int $needs_update
  270. * Status of the source row in the map. Defaults to STATUS_IMPORTED.
  271. */
  272. public function saveIDMapping(stdClass $source_row, array $dest_ids, $needs_update = MigrateMap::STATUS_IMPORTED) {
  273. migrate_instrument_start('saveIDMapping');
  274. // Construct the source key
  275. $keys = array();
  276. foreach ($this->sourceKeyMap as $field_name => $key_name) {
  277. // A NULL key value will fail.
  278. if (is_null($source_row->$field_name)) {
  279. Migration::displayMessage(t(
  280. 'Could not save to map table due to NULL value for key field !field',
  281. array('!field' => $field_name)));
  282. migrate_instrument_stop('saveIDMapping');
  283. return;
  284. }
  285. $keys[$key_name] = $source_row->$field_name;
  286. }
  287. $fields = array('needs_update' => (int)$needs_update);
  288. $count = 1;
  289. foreach ($dest_ids as $dest_id) {
  290. $fields['destid' . $count++] = $dest_id;
  291. }
  292. if ($this->trackLastImported) {
  293. $fields['last_imported'] = time();
  294. }
  295. $this->connection->merge($this->mapTable)
  296. ->key($keys)
  297. ->fields($fields)
  298. ->execute();
  299. migrate_instrument_stop('saveIDMapping');
  300. }
  301. /**
  302. * Record a message in the migration's message table.
  303. *
  304. * @param array $source_key
  305. * Source ID of the record in error
  306. * @param string $message
  307. * The message to record.
  308. * @param int $level
  309. * Optional message severity (defaults to MESSAGE_ERROR).
  310. */
  311. public function saveMessage($source_key, $message, $level = Migration::MESSAGE_ERROR) {
  312. // Source IDs as arguments
  313. $count = 1;
  314. if (is_array($source_key)) {
  315. foreach ($source_key as $key_value) {
  316. $fields['sourceid' . $count++] = $key_value;
  317. // If any key value is empty, we can't save - print out and abort
  318. if (empty($key_value)) {
  319. print($message);
  320. return;
  321. }
  322. }
  323. $fields['level'] = $level;
  324. $fields['message'] = $message;
  325. $this->connection->insert($this->messageTable)
  326. ->fields($fields)
  327. ->execute();
  328. }
  329. else {
  330. // TODO: What else can we do?
  331. Migration::displayMessage($message);
  332. }
  333. }
  334. /**
  335. * Prepares this migration to run as an update - that is, in addition to
  336. * unmigrated content (source records not in the map table) being imported,
  337. * previously-migrated content will also be updated in place.
  338. */
  339. public function prepareUpdate() {
  340. $this->connection->update($this->mapTable)
  341. ->fields(array('needs_update' => MigrateMap::STATUS_NEEDS_UPDATE))
  342. ->execute();
  343. }
  344. /**
  345. * Returns a count of records in the map table (i.e., the number of
  346. * source records which have been processed for this migration).
  347. *
  348. * @return int
  349. */
  350. public function processedCount() {
  351. $query = $this->connection->select($this->mapTable);
  352. $query->addExpression('COUNT(*)', 'count');
  353. $count = $query->execute()->fetchField();
  354. return $count;
  355. }
  356. /**
  357. * Returns a count of imported records in the map table.
  358. *
  359. * @return int
  360. */
  361. public function importedCount() {
  362. $query = $this->connection->select($this->mapTable);
  363. $query->addExpression('COUNT(*)', 'count');
  364. $query->condition('needs_update', array(MigrateMap::STATUS_IMPORTED, MigrateMap::STATUS_NEEDS_UPDATE), 'IN');
  365. $count = $query->execute()->fetchField();
  366. return $count;
  367. }
  368. /**
  369. * Returns a count of records which are marked as needing update.
  370. *
  371. * @return int
  372. */
  373. public function updateCount() {
  374. $query = $this->connection->select($this->mapTable);
  375. $query->addExpression('COUNT(*)', 'count');
  376. $query->condition('needs_update', MigrateMap::STATUS_NEEDS_UPDATE);
  377. $count = $query->execute()->fetchField();
  378. return $count;
  379. }
  380. /**
  381. * Get the number of source records which failed to import.
  382. *
  383. * @return int
  384. * Number of records errored out.
  385. */
  386. public function errorCount() {
  387. $query = $this->connection->select($this->mapTable);
  388. $query->addExpression('COUNT(*)', 'count');
  389. $query->condition('needs_update', MigrateMap::STATUS_FAILED);
  390. $count = $query->execute()->fetchField();
  391. return $count;
  392. }
  393. /**
  394. * Get the number of messages saved.
  395. *
  396. * @return int
  397. * Number of messages.
  398. */
  399. public function messageCount() {
  400. $query = $this->connection->select($this->messageTable);
  401. $query->addExpression('COUNT(*)', 'count');
  402. $count = $query->execute()->fetchField();
  403. return $count;
  404. }
  405. /**
  406. * Delete the map entry and any message table entries for the specified source row.
  407. *
  408. * @param array $source_key
  409. */
  410. public function delete(array $source_key, $messages_only = FALSE) {
  411. if (!$messages_only) {
  412. $map_query = $this->connection->delete($this->mapTable);
  413. }
  414. $message_query = $this->connection->delete($this->messageTable);
  415. $count = 1;
  416. foreach ($source_key as $key_value) {
  417. if (!$messages_only) {
  418. $map_query->condition('sourceid' . $count, $key_value);
  419. }
  420. $message_query->condition('sourceid' . $count, $key_value);
  421. $count++;
  422. }
  423. if (!$messages_only) {
  424. $map_query->execute();
  425. }
  426. $message_query->execute();
  427. }
  428. /**
  429. * Delete the map entry and any message table entries for the specified destination row.
  430. *
  431. * @param array $destination_key
  432. */
  433. public function deleteDestination(array $destination_key) {
  434. $map_query = $this->connection->delete($this->mapTable);
  435. $message_query = $this->connection->delete($this->messageTable);
  436. $source_key = $this->lookupSourceID($destination_key);
  437. if (!empty($source_key)) {
  438. $count = 1;
  439. foreach ($destination_key as $key_value) {
  440. $map_query->condition('destid' . $count, $key_value);
  441. $count++;
  442. }
  443. $map_query->execute();
  444. $count = 1;
  445. foreach ($source_key as $key_value) {
  446. $message_query->condition('sourceid' . $count, $key_value);
  447. $count++;
  448. }
  449. $message_query->execute();
  450. }
  451. }
  452. /**
  453. * Set the specified row to be updated, if it exists.
  454. */
  455. public function setUpdate(array $source_key) {
  456. $query = $this->connection->update($this->mapTable)
  457. ->fields(array('needs_update' => MigrateMap::STATUS_NEEDS_UPDATE));
  458. $count = 1;
  459. foreach ($source_key as $key_value) {
  460. $query->condition('sourceid' . $count++, $key_value);
  461. }
  462. $query->execute();
  463. }
  464. /**
  465. * Delete all map and message table entries specified.
  466. *
  467. * @param array $source_keys
  468. * Each array member is an array of key fields for one source row.
  469. */
  470. public function deleteBulk(array $source_keys) {
  471. // If we have a single-column key, we can shortcut it
  472. if (count($this->sourceKey) == 1) {
  473. $sourceids = array();
  474. foreach ($source_keys as $source_key) {
  475. $sourceids[] = $source_key;
  476. }
  477. $this->connection->delete($this->mapTable)
  478. ->condition('sourceid1', $sourceids, 'IN')
  479. ->execute();
  480. $this->connection->delete($this->messageTable)
  481. ->condition('sourceid1', $sourceids, 'IN')
  482. ->execute();
  483. }
  484. else {
  485. foreach ($source_keys as $source_key) {
  486. $map_query = $this->connection->delete($this->mapTable);
  487. $message_query = $this->connection->delete($this->messageTable);
  488. $count = 1;
  489. foreach ($source_key as $key_value) {
  490. $map_query->condition('sourceid' . $count, $key_value);
  491. $message_query->condition('sourceid' . $count++, $key_value);
  492. }
  493. $map_query->execute();
  494. $message_query->execute();
  495. }
  496. }
  497. }
  498. /**
  499. * Clear all messages from the message table.
  500. */
  501. public function clearMessages() {
  502. $this->connection->truncate($this->messageTable)
  503. ->execute();
  504. }
  505. /**
  506. * Remove the associated map and message tables.
  507. */
  508. public function destroy() {
  509. $this->connection->schema()->dropTable($this->mapTable);
  510. $this->connection->schema()->dropTable($this->messageTable);
  511. }
  512. protected $result = NULL;
  513. protected $currentRow = NULL;
  514. protected $currentKey = array();
  515. public function getCurrentKey() {
  516. return $this->currentKey;
  517. }
  518. /**
  519. * Implementation of Iterator::rewind() - called before beginning a foreach loop.
  520. * TODO: Support idlist, itemlimit
  521. */
  522. public function rewind() {
  523. $this->currentRow = NULL;
  524. $fields = array();
  525. foreach ($this->sourceKeyMap as $field) {
  526. $fields[] = $field;
  527. }
  528. foreach ($this->destinationKeyMap as $field) {
  529. $fields[] = $field;
  530. }
  531. /* TODO
  532. if (isset($this->options['itemlimit'])) {
  533. $query = $query->range(0, $this->options['itemlimit']);
  534. }
  535. */
  536. $this->result = $this->connection->select($this->mapTable, 'map')
  537. ->fields('map', $fields)
  538. ->execute();
  539. $this->next();
  540. }
  541. /**
  542. * Implementation of Iterator::current() - called when entering a loop
  543. * iteration, returning the current row
  544. */
  545. public function current() {
  546. return $this->currentRow;
  547. }
  548. /**
  549. * Implementation of Iterator::key - called when entering a loop iteration, returning
  550. * the key of the current row. It must be a scalar - we will serialize
  551. * to fulfill the requirement, but using getCurrentKey() is preferable.
  552. */
  553. public function key() {
  554. return serialize($this->currentKey);
  555. }
  556. /**
  557. * Implementation of Iterator::next() - called at the bottom of the loop implicitly,
  558. * as well as explicitly from rewind().
  559. */
  560. public function next() {
  561. $this->currentRow = $this->result->fetchObject();
  562. $this->currentKey = array();
  563. if (!is_object($this->currentRow)) {
  564. $this->currentRow = NULL;
  565. }
  566. else {
  567. foreach ($this->sourceKeyMap as $map_field) {
  568. $this->currentKey[$map_field] = $this->currentRow->$map_field;
  569. // Leave only destination fields
  570. unset($this->currentRow->$map_field);
  571. }
  572. }
  573. }
  574. /**
  575. * Implementation of Iterator::valid() - called at the top of the loop, returning
  576. * TRUE to process the loop and FALSE to terminate it
  577. */
  578. public function valid() {
  579. // TODO: Check numProcessed against itemlimit
  580. return !is_null($this->currentRow);
  581. }
  582. }