sql.inc 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494
  1. <?php
  2. /**
  3. * @file
  4. * Define a MigrateSource for importing from Drupal connections
  5. */
  6. /**
  7. * Implementation of MigrateSource, to handle imports from Drupal connections.
  8. */
  9. class MigrateSourceSQL extends MigrateSource {
  10. /**
  11. * The SQL query objects from which to obtain data, and counts of data
  12. *
  13. * @var SelectQueryInterface
  14. */
  15. protected $originalQuery, $query, $countQuery, $alteredQuery;
  16. /**
  17. * Return a reference to the base query, in particular so Migration classes
  18. * can add conditions/joins/etc to the query for a source defined in a
  19. * base class.
  20. *
  21. * @return SelectQueryInterface
  22. */
  23. public function &query() {
  24. return $this->originalQuery;
  25. }
  26. /**
  27. * The result object from executing the query - traversed to process the
  28. * incoming data.
  29. *
  30. * @var DatabaseStatementInterface
  31. */
  32. protected $result;
  33. /**
  34. * Number of eligible rows processed so far (used for itemlimit checking)
  35. *
  36. * @var int
  37. */
  38. protected $numProcessed = 0;
  39. /**
  40. * Current data batch.
  41. *
  42. * @var int
  43. */
  44. protected $batch = 0;
  45. /**
  46. * Number of records to fetch from the database during each batch. A value
  47. * of zero indicates no batching is to be done.
  48. *
  49. * @var int
  50. */
  51. protected $batchSize = 0;
  52. /**
  53. * List of available source fields.
  54. *
  55. * @var array
  56. */
  57. protected $fields = array();
  58. /**
  59. * If the map is a MigrateSQLMap, and the table is compatible with the
  60. * source query, we can join directly to the map and make things much faster
  61. * and simpler.
  62. *
  63. * @var boolean
  64. */
  65. protected $mapJoinable = FALSE;
  66. // Dynamically set whether the map is joinable - not really for production use,
  67. // this is primarily to support simpletests
  68. public function setMapJoinable($map_joinable) {
  69. $this->mapJoinable = $map_joinable;
  70. }
  71. /**
  72. * Whether this source is configured to use a highwater mark, and there is
  73. * a highwater mark present to use.
  74. *
  75. * @var boolean
  76. */
  77. protected $usingHighwater = FALSE;
  78. /**
  79. * Whether, in the current iteration, we have reached the highwater mark.
  80. *
  81. * @var boolen
  82. */
  83. protected $highwaterSeen = FALSE;
  84. /**
  85. * Return an options array for PDO sources.
  86. *
  87. * @param boolean $map_joinable
  88. * Indicates whether the map table can be joined directly to the source query.
  89. * @param boolean $cache_counts
  90. * Indicates whether to cache counts of source records.
  91. */
  92. static public function options($map_joinable, $cache_counts) {
  93. return compact('map_joinable', 'cache_counts');
  94. }
  95. /**
  96. * Simple initialization.
  97. *
  98. * @param SelectQueryInterface $query
  99. * The query we are iterating over.
  100. * @param array $fields
  101. * Optional - keys are field names, values are descriptions. Use to override
  102. * the default descriptions, or to add additional source fields which the
  103. * migration will add via other means (e.g., prepareRow()).
  104. * @param SelectQueryInterface $count_query
  105. * Optional - an explicit count query, primarily used when counting the
  106. * primary query is slow.
  107. * @param boolean $options
  108. * Options applied to this source.
  109. */
  110. public function __construct(SelectQueryInterface $query, array $fields = array(),
  111. SelectQueryInterface $count_query = NULL, array $options = array()) {
  112. parent::__construct($options);
  113. $this->originalQuery = $query;
  114. $this->query = clone $query;
  115. $this->fields = $fields;
  116. if (is_null($count_query)) {
  117. $this->countQuery = clone $query->countQuery();
  118. }
  119. else {
  120. $this->countQuery = $count_query;
  121. }
  122. if (isset($options['batch_size'])) {
  123. $this->batchSize = $options['batch_size'];
  124. // Joining to the map table is incompatible with batching, disable it.
  125. $options['map_joinable'] = FALSE;
  126. }
  127. // If we're tracking changes, then we need to fetch all rows to see if
  128. // they've changed, we can't make that determination through a direct join.
  129. if (!empty($options['track_changes'])) {
  130. $options['map_joinable'] = FALSE;
  131. }
  132. if (isset($options['map_joinable'])) {
  133. $this->mapJoinable = $options['map_joinable'];
  134. }
  135. else {
  136. // TODO: We want to automatically determine if the map table can be joined
  137. // directly to the query, but this won't work unless/until
  138. // http://drupal.org/node/802514 is committed, assume joinable for now
  139. $this->mapJoinable = TRUE;
  140. /* // To be able to join the map directly, it must be a PDO map on the same
  141. // connection, or a compatible connection
  142. $map = $migration->getMap();
  143. if (is_a($map, 'MigrateSQLMap')) {
  144. $map_options = $map->getConnection()->getConnectionOptions();
  145. $query_options = $this->query->connection()->getConnectionOptions();
  146. // Identical options means it will work
  147. if ($map_options == $query_options) {
  148. $this->mapJoinable = TRUE;
  149. }
  150. else {
  151. // Otherwise, the one scenario we know will work is if it's MySQL and
  152. // the credentials match (SQLite too?)
  153. if ($map_options['driver'] == 'mysql' && $query_options['driver'] == 'mysql') {
  154. if ($map_options['host'] == $query_options['host'] &&
  155. $map_options['port'] == $query_options['port'] &&
  156. $map_options['username'] == $query_options['username'] &&
  157. $map_options['password'] == $query_options['password']) {
  158. $this->mapJoinable = TRUE;
  159. }
  160. }
  161. }
  162. }*/
  163. }
  164. }
  165. /**
  166. * Return a string representing the source query.
  167. *
  168. * @return string
  169. */
  170. public function __toString() {
  171. $query = clone $this->query;
  172. $query = $query->extend('MigrateConnectionQuery');
  173. return $query->getString();
  174. }
  175. /**
  176. * Returns a list of fields available to be mapped from the source query.
  177. *
  178. * @return array
  179. * Keys: machine names of the fields (to be passed to addFieldMapping)
  180. * Values: Human-friendly descriptions of the fields.
  181. */
  182. public function fields() {
  183. $fields = array();
  184. $queryFields = $this->query->getFields();
  185. if ($queryFields) {
  186. // Not much we can do in terms of describing the fields without manual intervention
  187. foreach ($queryFields as $field_name => $field_info) {
  188. $fields[$field_name] = $field_info['table'] . '.' . $field_info['field'];
  189. }
  190. }
  191. else {
  192. // Detect available fields
  193. $detection_query = clone $this->query;
  194. $result = $detection_query->range(0, 1)->execute();
  195. $row = $result->fetchAssoc();
  196. if (is_array($row)) {
  197. foreach ($row as $field_name => $field_value) {
  198. $fields[$field_name] = t('Example Content: !value', array('!value' => $field_value));
  199. }
  200. }
  201. }
  202. /*
  203. * Handle queries without explicit field lists
  204. * TODO: Waiting on http://drupal.org/node/814312
  205. $info = Database::getConnectionInfo($query->getConnection());
  206. $database = $info['default']['database'];
  207. foreach ($this->query->getTables() as $table) {
  208. if (isset($table['all_fields']) && $table['all_fields']) {
  209. $database = 'plants';
  210. $table = $table['table'];
  211. $sql = 'SELECT column_name
  212. FROM information_schema.columns
  213. WHERE table_schema=:database AND table_name = :table
  214. ORDER BY ordinal_position';
  215. $result = dbtng_query($sql, array(':database' => $database, ':table' => $table));
  216. foreach ($result as $row) {
  217. $fields[$row->column_name] = $table . '.' . $row->column_name;
  218. }
  219. }
  220. }*/
  221. $expressionFields = $this->query->getExpressions();
  222. foreach ($expressionFields as $field_name => $field_info) {
  223. $fields[$field_name] = $field_info['alias'];
  224. }
  225. // Any caller-specified fields with the same names as extracted fields will
  226. // override them; any others will be added
  227. if ($this->fields) {
  228. $fields = $this->fields + $fields;
  229. }
  230. return $fields;
  231. }
  232. /**
  233. * Return a count of all available source records.
  234. */
  235. public function computeCount() {
  236. $count = $this->countQuery->execute()->fetchField();
  237. return $count;
  238. }
  239. /**
  240. * Implementation of MigrateSource::performRewind().
  241. *
  242. * We could simply execute the query and be functionally correct, but
  243. * we will take advantage of the PDO-based API to optimize the query up-front.
  244. */
  245. public function performRewind() {
  246. $this->result = NULL;
  247. $this->query = clone $this->originalQuery;
  248. $this->batch = 0;
  249. // Get the key values, for potential use in joining to the map table, or
  250. // enforcing idlist.
  251. $keys = array();
  252. foreach ($this->activeMap->getSourceKey() as $field_name => $field_schema) {
  253. if (isset($field_schema['alias'])) {
  254. $field_name = $field_schema['alias'] . '.' . $field_name;
  255. }
  256. $keys[] = $field_name;
  257. }
  258. // The rules for determining what conditions to add to the query are as
  259. // follows (applying first applicable rule)
  260. // 1. If idlist is provided, then only process items in that list (AND key
  261. // IN (idlist)). Only applicable with single-value keys.
  262. if ($this->idList) {
  263. $simple_ids = array();
  264. $compound_ids = array();
  265. $key_count = count($keys);
  266. foreach ($this->idList as $id) {
  267. // Look for multi-key separator. If there is only 1 key, ignore.
  268. if (strpos($id, $this->multikeySeparator) === FALSE || $key_count == 1) {
  269. $simple_ids[] = $id;
  270. continue;
  271. }
  272. $compound_ids[] = explode($this->multikeySeparator, $id);
  273. }
  274. // Check for compunded ids. If present add them with subsequent OR statements.
  275. if (!empty($compound_ids)) {
  276. $condition = db_or();
  277. if (!empty($simple_ids)) {
  278. $condition->condition($keys[0], $simple_ids, 'IN');
  279. }
  280. foreach ($compound_ids as $values) {
  281. $temp_and = db_and();
  282. foreach ($values as $pos => $value) {
  283. $temp_and->condition($keys[$pos], $value);
  284. }
  285. $condition->condition($temp_and);
  286. }
  287. $this->query->condition($condition);
  288. }
  289. else {
  290. $this->query->condition($keys[0], $simple_ids, 'IN');
  291. }
  292. }
  293. else {
  294. // 2. If the map is joinable, join it. We will want to accept all rows
  295. // which are either not in the map, or marked in the map as NEEDS_UPDATE.
  296. // Note that if highwater fields are in play, we want to accept all rows
  297. // above the highwater mark in addition to those selected by the map
  298. // conditions, so we need to OR them together (but AND with any existing
  299. // conditions in the query). So, ultimately the SQL condition will look
  300. // like (original conditions) AND (map IS NULL OR map needs update
  301. // OR above highwater).
  302. $conditions = db_or();
  303. $condition_added = FALSE;
  304. if ($this->mapJoinable) {
  305. // Build the join to the map table. Because the source key could have
  306. // multiple fields, we need to build things up.
  307. $count = 1;
  308. foreach ($this->activeMap->getSourceKey() as $field_name => $field_schema) {
  309. if (isset($field_schema['alias'])) {
  310. $field_name = $field_schema['alias'] . '.' . $field_name;
  311. }
  312. $map_key = 'sourceid' . $count++;
  313. if (!isset($map_join)) {
  314. $map_join = '';
  315. }
  316. else {
  317. $map_join .= ' AND ';
  318. }
  319. $map_join .= "$field_name = map.$map_key";
  320. }
  321. $alias = $this->query->leftJoin($this->activeMap->getQualifiedMapTable(),
  322. 'map', $map_join);
  323. $conditions->isNull($alias . '.sourceid1');
  324. $conditions->condition($alias . '.needs_update', MigrateMap::STATUS_NEEDS_UPDATE);
  325. $condition_added = TRUE;
  326. // And as long as we have the map table, add its data to the row.
  327. $count = 1;
  328. foreach ($this->activeMap->getSourceKey() as $field_name => $field_schema) {
  329. $map_key = 'sourceid' . $count++;
  330. $this->query->addField($alias, $map_key, "migrate_map_$map_key");
  331. }
  332. $count = 1;
  333. foreach ($this->activeMap->getDestinationKey() as $field_name => $field_schema) {
  334. $map_key = 'destid' . $count++;
  335. $this->query->addField($alias, $map_key, "migrate_map_$map_key");
  336. }
  337. $this->query->addField($alias, 'needs_update', 'migrate_map_needs_update');
  338. }
  339. // 3. If we are using highwater marks, also include rows above the mark.
  340. // But, include all rows if the highwater mark is not set.
  341. if (isset($this->highwaterField['name']) && $this->activeMigration->getHighwater() !== '') {
  342. // But, if there are any existing items marked as needing update which
  343. // fall below the highwater mark, and map_joinable is FALSE, those
  344. // items will be skipped. Thus, in that case do not add the highwater
  345. // optimization to the query.
  346. $add_highwater_condition = TRUE;
  347. if (!$this->mapJoinable) {
  348. $count_needs_update = db_query('SELECT COUNT(*) FROM {' .
  349. $this->activeMap->getQualifiedMapTable() . '} WHERE needs_update = 1')
  350. ->fetchField();
  351. if ($count_needs_update > 0) {
  352. $add_highwater_condition = FALSE;
  353. }
  354. }
  355. if ($add_highwater_condition) {
  356. if (isset($this->highwaterField['alias'])) {
  357. $highwater = $this->highwaterField['alias'] . '.' . $this->highwaterField['name'];
  358. }
  359. else {
  360. $highwater = $this->highwaterField['name'];
  361. }
  362. $conditions->condition($highwater, $this->activeMigration->getHighwater(), '>');
  363. $condition_added = TRUE;
  364. }
  365. }
  366. if ($condition_added) {
  367. $this->query->condition($conditions);
  368. }
  369. // 4. Download data in batches for performance.
  370. if ($this->batchSize > 0) {
  371. $this->query->range($this->batch * $this->batchSize, $this->batchSize);
  372. }
  373. }
  374. // Save our fixed-up query so getNextBatch() matches it.
  375. $this->alteredQuery = clone $this->query;
  376. migrate_instrument_start('MigrateSourceSQL execute');
  377. $this->result = $this->query->execute();
  378. migrate_instrument_stop('MigrateSourceSQL execute');
  379. }
  380. /**
  381. * Implementation of MigrateSource::getNextRow().
  382. *
  383. * @return object
  384. */
  385. public function getNextRow() {
  386. $row = $this->result->fetchObject();
  387. // We might be out of data entirely, or just out of data in the current batch.
  388. // Attempt to fetch the next batch and see.
  389. if (!is_object($row) && $this->batchSize > 0) {
  390. $this->getNextBatch();
  391. $row = $this->result->fetchObject();
  392. }
  393. if (is_object($row)) {
  394. return $row;
  395. }
  396. else {
  397. return NULL;
  398. }
  399. }
  400. /**
  401. * Downloads the next set of data from the source database.
  402. */
  403. protected function getNextBatch() {
  404. $this->batch++;
  405. $query = clone $this->alteredQuery;
  406. $query->range($this->batch * $this->batchSize, $this->batchSize);
  407. $this->result = $query->execute();
  408. }
  409. }
  410. /**
  411. * Query extender for retrieving the connection used on the query.
  412. */
  413. class MigrateConnectionQuery extends SelectQueryExtender {
  414. public function __construct(SelectQueryInterface $query, DatabaseConnection $connection) {
  415. parent::__construct($query, $connection);
  416. // Add the connection as metadata if anything else wants to access it.
  417. $query->addMetaData('connection', $connection);
  418. }
  419. /**
  420. * Return a string representing the source query.
  421. *
  422. * This is copied from devel module's dpq() function.
  423. *
  424. * @param bool $prefix
  425. * If the tables should be prefixed. If FALSE will return tables names in
  426. * the query like {tablename}.
  427. *
  428. * @return string
  429. * The SQL query.
  430. */
  431. public function getString($prefix = TRUE) {
  432. $query = $this;
  433. if (method_exists($this, 'preExecute')) {
  434. $query->preExecute();
  435. }
  436. $sql = (string) $this;
  437. $quoted = array();
  438. foreach ((array) $this->arguments() as $key => $val) {
  439. $quoted[$key] = $this->connection->quote($val);
  440. }
  441. $sql = strtr($sql, $quoted);
  442. if ($prefix) {
  443. $sql = $this->connection->prefixTables($sql);
  444. }
  445. return $sql;
  446. }
  447. }