sql.inc 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337
  1. <?php
  2. /**
  3. * @file
  4. * Define a MigrateSource for importing from Drupal connections
  5. */
  6. /**
  7. * Implementation of MigrateSource, to handle imports from Drupal connections.
  8. */
  9. class MigrateSourceSQL extends MigrateSource {
  10. /**
  11. * The SQL query objects from which to obtain data, and counts of data
  12. *
  13. * @var SelectQueryInterface
  14. */
  15. protected $originalQuery, $query, $countQuery;
  16. /**
  17. * The result object from executing the query - traversed to process the
  18. * incoming data.
  19. *
  20. * @var DatabaseStatementInterface
  21. */
  22. protected $result;
  23. /**
  24. * Number of eligible rows processed so far (used for itemlimit checking)
  25. *
  26. * @var int
  27. */
  28. protected $numProcessed = 0;
  29. /**
  30. * List of available source fields.
  31. *
  32. * @var array
  33. */
  34. protected $fields = array();
  35. /**
  36. * If the map is a MigrateSQLMap, and the table is compatible with the
  37. * source query, we can join directly to the map and make things much faster
  38. * and simpler.
  39. *
  40. * @var boolean
  41. */
  42. protected $mapJoinable = FALSE;
  43. // Dynamically set whether the map is joinable - not really for production use,
  44. // this is primarily to support simpletests
  45. public function setMapJoinable($map_joinable) {
  46. $this->mapJoinable = $map_joinable;
  47. }
  48. /**
  49. * Whether this source is configured to use a highwater mark, and there is
  50. * a highwater mark present to use.
  51. *
  52. * @var boolean
  53. */
  54. protected $usingHighwater = FALSE;
  55. /**
  56. * Whether, in the current iteration, we have reached the highwater mark.
  57. *
  58. * @var boolen
  59. */
  60. protected $highwaterSeen = FALSE;
  61. /**
  62. * Return an options array for PDO sources.
  63. *
  64. * @param boolean $map_joinable
  65. * Indicates whether the map table can be joined directly to the source query.
  66. * @param boolean $cache_counts
  67. * Indicates whether to cache counts of source records.
  68. */
  69. static public function options($map_joinable, $cache_counts) {
  70. return compact('map_joinable', 'cache_counts');
  71. }
  72. /**
  73. * Simple initialization.
  74. *
  75. * @param SelectQueryInterface $query
  76. * The query we are iterating over.
  77. * @param array $fields
  78. * Optional - keys are field names, values are descriptions. Use to override
  79. * the default descriptions, or to add additional source fields which the
  80. * migration will add via other means (e.g., prepareRow()).
  81. * @param SelectQueryInterface $count_query
  82. * Optional - an explicit count query, primarily used when counting the
  83. * primary query is slow.
  84. * @param boolean $options
  85. * Options applied to this source.
  86. */
  87. public function __construct(SelectQueryInterface $query, array $fields = array(),
  88. SelectQueryInterface $count_query = NULL, array $options = array()) {
  89. parent::__construct($options);
  90. $this->originalQuery = $query;
  91. $this->query = clone $query;
  92. $this->fields = $fields;
  93. if (is_null($count_query)) {
  94. $this->countQuery = clone $query->countQuery();
  95. }
  96. else {
  97. $this->countQuery = $count_query;
  98. }
  99. if (isset($options['map_joinable'])) {
  100. $this->mapJoinable = $options['map_joinable'];
  101. }
  102. else {
  103. // TODO: We want to automatically determine if the map table can be joined
  104. // directly to the query, but this won't work unless/until
  105. // http://drupal.org/node/802514 is committed, assume joinable for now
  106. $this->mapJoinable = TRUE;
  107. /* // To be able to join the map directly, it must be a PDO map on the same
  108. // connection, or a compatible connection
  109. $map = $migration->getMap();
  110. if (is_a($map, 'MigrateSQLMap')) {
  111. $map_options = $map->getConnection()->getConnectionOptions();
  112. $query_options = $this->query->connection()->getConnectionOptions();
  113. // Identical options means it will work
  114. if ($map_options == $query_options) {
  115. $this->mapJoinable = TRUE;
  116. }
  117. else {
  118. // Otherwise, the one scenario we know will work is if it's MySQL and
  119. // the credentials match (SQLite too?)
  120. if ($map_options['driver'] == 'mysql' && $query_options['driver'] == 'mysql') {
  121. if ($map_options['host'] == $query_options['host'] &&
  122. $map_options['port'] == $query_options['port'] &&
  123. $map_options['username'] == $query_options['username'] &&
  124. $map_options['password'] == $query_options['password']) {
  125. $this->mapJoinable = TRUE;
  126. }
  127. }
  128. }
  129. }*/
  130. }
  131. }
  132. /**
  133. * Return a string representing the source query.
  134. *
  135. * @return string
  136. */
  137. public function __toString() {
  138. return (string) $this->query;
  139. }
  140. /**
  141. * Returns a list of fields available to be mapped from the source query.
  142. *
  143. * @return array
  144. * Keys: machine names of the fields (to be passed to addFieldMapping)
  145. * Values: Human-friendly descriptions of the fields.
  146. */
  147. public function fields() {
  148. $fields = array();
  149. $queryFields = $this->query->getFields();
  150. if ($queryFields) {
  151. // Not much we can do in terms of describing the fields without manual intervention
  152. foreach ($queryFields as $field_name => $field_info) {
  153. // Lower case, because Drupal forces lowercase on fetch
  154. $fields[drupal_strtolower($field_name)] = drupal_strtolower(
  155. $field_info['table'] . '.' . $field_info['field']);
  156. }
  157. }
  158. else {
  159. // Detect available fields
  160. $detection_query = clone $this->query;
  161. $result = $detection_query->range(0, 1)->execute();
  162. $row = $result->fetchAssoc();
  163. if (is_array($row)) {
  164. foreach ($row as $field_name => $field_value) {
  165. // Lower case, because Drupal forces lowercase on fetch
  166. $fields[drupal_strtolower($field_name)] = t('Example Content: !value',
  167. array('!value' => $field_value));
  168. }
  169. }
  170. }
  171. /*
  172. * Handle queries without explicit field lists
  173. * TODO: Waiting on http://drupal.org/node/814312
  174. $info = Database::getConnectionInfo($query->getConnection());
  175. $database = $info['default']['database'];
  176. foreach ($this->query->getTables() as $table) {
  177. if (isset($table['all_fields']) && $table['all_fields']) {
  178. $database = 'plants';
  179. $table = $table['table'];
  180. $sql = 'SELECT column_name
  181. FROM information_schema.columns
  182. WHERE table_schema=:database AND table_name = :table
  183. ORDER BY ordinal_position';
  184. $result = dbtng_query($sql, array(':database' => $database, ':table' => $table));
  185. foreach ($result as $row) {
  186. $fields[drupal_strtolower($row->column_name)] = drupal_strtolower(
  187. $table . '.' . $row->column_name);
  188. }
  189. }
  190. }*/
  191. $expressionFields = $this->query->getExpressions();
  192. foreach ($expressionFields as $field_name => $field_info) {
  193. // Lower case, because Drupal forces lowercase on fetch
  194. $fields[drupal_strtolower($field_name)] = drupal_strtolower($field_info['alias']);
  195. }
  196. // Any caller-specified fields with the same names as extracted fields will
  197. // override them; any others will be added
  198. if ($this->fields) {
  199. $fields = $this->fields + $fields;
  200. }
  201. return $fields;
  202. }
  203. /**
  204. * Return a count of all available source records.
  205. */
  206. public function computeCount() {
  207. $count = $this->countQuery->execute()->fetchField();
  208. return $count;
  209. }
  210. /**
  211. * Implementation of MigrateSource::performRewind().
  212. *
  213. * We could simply execute the query and be functionally correct, but
  214. * we will take advantage of the PDO-based API to optimize the query up-front.
  215. */
  216. public function performRewind() {
  217. $this->result = NULL;
  218. $this->query = clone $this->originalQuery;
  219. // Get the key values, for potential use in joining to the map table, or
  220. // enforcing idlist.
  221. $keys = array();
  222. foreach ($this->activeMap->getSourceKey() as $field_name => $field_schema) {
  223. if (isset($field_schema['alias'])) {
  224. $field_name = $field_schema['alias'] . '.' . $field_name;
  225. }
  226. $keys[] = $field_name;
  227. }
  228. // The rules for determining what conditions to add to the query are as
  229. // follows (applying first applicable rule)
  230. // 1. If idlist is provided, then only process items in that list (AND key
  231. // IN (idlist)). Only applicable with single-value keys.
  232. if ($this->idList) {
  233. $this->query->condition($keys[0], $this->idList, 'IN');
  234. }
  235. else {
  236. // 2. If the map is joinable, join it. We will want to accept all rows
  237. // which are either not in the map, or marked in the map as NEEDS_UPDATE.
  238. // Note that if highwater fields are in play, we want to accept all rows
  239. // above the highwater mark in addition to those selected by the map
  240. // conditions, so we need to OR them together (but AND with any existing
  241. // conditions in the query). So, ultimately the SQL condition will look
  242. // like (original conditions) AND (map IS NULL OR map needs update
  243. // OR above highwater).
  244. $conditions = db_or();
  245. $condition_added = FALSE;
  246. if ($this->mapJoinable) {
  247. // Build the join to the map table. Because the source key could have
  248. // multiple fields, we need to build things up.
  249. $count = 1;
  250. foreach ($this->activeMap->getSourceKey() as $field_name => $field_schema) {
  251. if (isset($field_schema['alias'])) {
  252. $field_name = $field_schema['alias'] . '.' . $field_name;
  253. }
  254. $map_key = 'sourceid' . $count++;
  255. if (!isset($map_join)) {
  256. $map_join = '';
  257. }
  258. else {
  259. $map_join .= ' AND ';
  260. }
  261. $map_join .= "$field_name = map.$map_key";
  262. }
  263. $alias = $this->query->leftJoin($this->activeMap->getQualifiedMapTable(),
  264. 'map', $map_join);
  265. $conditions->isNull($alias . '.sourceid1');
  266. $conditions->condition($alias . '.needs_update', MigrateMap::STATUS_NEEDS_UPDATE);
  267. $condition_added = TRUE;
  268. // And as long as we have the map table, add its data to the row.
  269. $count = 1;
  270. foreach ($this->activeMap->getSourceKey() as $field_name => $field_schema) {
  271. $map_key = 'sourceid' . $count++;
  272. $this->query->addField($alias, $map_key, "migrate_map_$map_key");
  273. }
  274. $count = 1;
  275. foreach ($this->activeMap->getDestinationKey() as $field_name => $field_schema) {
  276. $map_key = 'destid' . $count++;
  277. $this->query->addField($alias, $map_key, "migrate_map_$map_key");
  278. }
  279. $this->query->addField($alias, 'needs_update', 'migrate_map_needs_update');
  280. }
  281. // 3. If we are using highwater marks, also include rows above the mark.
  282. if (isset($this->highwaterField['name'])) {
  283. if (isset($this->highwaterField['alias'])) {
  284. $highwater = $this->highwaterField['alias'] . '.' . $this->highwaterField['name'];
  285. }
  286. else {
  287. $highwater = $this->highwaterField['name'];
  288. }
  289. $conditions->condition($highwater, $this->activeMigration->getHighwater(), '>');
  290. $condition_added = TRUE;
  291. }
  292. if ($condition_added) {
  293. $this->query->condition($conditions);
  294. }
  295. }
  296. migrate_instrument_start('MigrateSourceSQL execute');
  297. $this->result = $this->query->execute();
  298. migrate_instrument_stop('MigrateSourceSQL execute');
  299. }
  300. /**
  301. * Implementation of MigrateSource::getNextRow().
  302. *
  303. * @return object
  304. */
  305. public function getNextRow() {
  306. return $this->result->fetchObject();
  307. }
  308. }