FeedsSource.inc 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871
  1. <?php
  2. /**
  3. * @file
  4. * Definition of FeedsSourceInterface and FeedsSource class.
  5. */
  6. /**
  7. * Distinguish exceptions occurring when handling locks.
  8. */
  9. class FeedsLockException extends Exception {}
  10. /**
  11. * Denote a import or clearing stage. Used for multi page processing.
  12. */
  13. define('FEEDS_START', 'start_time');
  14. define('FEEDS_FETCH', 'fetch');
  15. define('FEEDS_PARSE', 'parse');
  16. define('FEEDS_PROCESS', 'process');
  17. define('FEEDS_PROCESS_CLEAR', 'process_clear');
  18. define('FEEDS_PROCESS_EXPIRE', 'process_expire');
  19. /**
  20. * Declares an interface for a class that defines default values and form
  21. * descriptions for a FeedSource.
  22. */
  23. interface FeedsSourceInterface {
  24. /**
  25. * Crutch: for ease of use, we implement FeedsSourceInterface for every
  26. * plugin, but then we need to have a handle which plugin actually implements
  27. * a source.
  28. *
  29. * @see FeedsPlugin class.
  30. *
  31. * @return
  32. * TRUE if a plugin handles source specific configuration, FALSE otherwise.
  33. */
  34. public function hasSourceConfig();
  35. /**
  36. * Return an associative array of default values.
  37. */
  38. public function sourceDefaults();
  39. /**
  40. * Return a Form API form array that defines a form configuring values. Keys
  41. * correspond to the keys of the return value of sourceDefaults().
  42. */
  43. public function sourceForm($source_config);
  44. /**
  45. * Validate user entered values submitted by sourceForm().
  46. */
  47. public function sourceFormValidate(&$source_config);
  48. /**
  49. * A source is being saved.
  50. */
  51. public function sourceSave(FeedsSource $source);
  52. /**
  53. * A source is being deleted.
  54. */
  55. public function sourceDelete(FeedsSource $source);
  56. }
  57. /**
  58. * Status of an import or clearing operation on a source.
  59. */
  60. class FeedsState {
  61. /**
  62. * Floating point number denoting the progress made. 0.0 meaning no progress
  63. * 1.0 = FEEDS_BATCH_COMPLETE meaning finished.
  64. */
  65. public $progress;
  66. /**
  67. * Used as a pointer to store where left off. Must be serializable.
  68. */
  69. public $pointer;
  70. /**
  71. * Natural numbers denoting more details about the progress being made.
  72. */
  73. public $total;
  74. public $created;
  75. public $updated;
  76. public $deleted;
  77. public $unpublished;
  78. public $blocked;
  79. public $skipped;
  80. public $failed;
  81. /**
  82. * IDs of entities to be removed.
  83. */
  84. public $removeList;
  85. /**
  86. * Constructor, initialize variables.
  87. */
  88. public function __construct() {
  89. $this->progress = FEEDS_BATCH_COMPLETE;
  90. $this->total =
  91. $this->created =
  92. $this->updated =
  93. $this->deleted =
  94. $this->unpublished =
  95. $this->blocked =
  96. $this->skipped =
  97. $this->failed = 0;
  98. }
  99. /**
  100. * Safely report progress.
  101. *
  102. * When $total == $progress, the state of the task tracked by this state is
  103. * regarded to be complete.
  104. *
  105. * Handles the following cases gracefully:
  106. *
  107. * - $total is 0
  108. * - $progress is larger than $total
  109. * - $progress approximates $total so that $finished rounds to 1.0
  110. *
  111. * @param $total
  112. * A natural number that is the total to be worked off.
  113. * @param $progress
  114. * A natural number that is the progress made on $total.
  115. */
  116. public function progress($total, $progress) {
  117. if ($progress > $total) {
  118. $this->progress = FEEDS_BATCH_COMPLETE;
  119. }
  120. elseif ($total) {
  121. $this->progress = (float) $progress / $total;
  122. if ($this->progress == FEEDS_BATCH_COMPLETE && $total != $progress) {
  123. $this->progress = 0.99;
  124. }
  125. }
  126. else {
  127. $this->progress = FEEDS_BATCH_COMPLETE;
  128. }
  129. }
  130. }
  131. /**
  132. * This class encapsulates a source of a feed. It stores where the feed can be
  133. * found and how to import it.
  134. *
  135. * Information on how to import a feed is encapsulated in a FeedsImporter object
  136. * which is identified by the common id of the FeedsSource and the
  137. * FeedsImporter. More than one FeedsSource can use the same FeedsImporter
  138. * therefore a FeedsImporter never holds a pointer to a FeedsSource object, nor
  139. * does it hold any other information for a particular FeedsSource object.
  140. *
  141. * Classes extending FeedsPlugin can implement a sourceForm to expose
  142. * configuration for a FeedsSource object. This is for instance how FeedsFetcher
  143. * exposes a text field for a feed URL or how FeedsCSVParser exposes a select
  144. * field for choosing between colon or semicolon delimiters.
  145. *
  146. * It is important that a FeedsPlugin does not directly hold information about
  147. * a source but leave all storage up to FeedsSource. An instance of a
  148. * FeedsPlugin class only exists once per FeedsImporter configuration, while an
  149. * instance of a FeedsSource class exists once per feed_nid to be imported.
  150. *
  151. * As with FeedsImporter, the idea with FeedsSource is that it can be used
  152. * without actually saving the object to the database.
  153. */
  154. class FeedsSource extends FeedsConfigurable {
  155. // Contains the node id of the feed this source info object is attached to.
  156. // Equals 0 if not attached to any node - i. e. if used on a
  157. // standalone import form within Feeds or by other API users.
  158. protected $feed_nid;
  159. // The FeedsImporter object that this source is expected to be used with.
  160. protected $importer;
  161. // A FeedsSourceState object holding the current import/clearing state of this
  162. // source.
  163. protected $state;
  164. // Fetcher result, used to cache fetcher result when batching.
  165. protected $fetcher_result;
  166. // Timestamp when this source was imported the last time.
  167. protected $imported;
  168. // Holds an exception object in case an exception occurs during importing.
  169. protected $exception;
  170. /**
  171. * Instantiate a unique object per class/id/feed_nid. Don't use
  172. * directly, use feeds_source() instead.
  173. */
  174. public static function instance($importer_id, $feed_nid) {
  175. $class = variable_get('feeds_source_class', 'FeedsSource');
  176. $instances = &drupal_static(__METHOD__, array());
  177. if (!isset($instances[$class][$importer_id][$feed_nid])) {
  178. $instances[$class][$importer_id][$feed_nid] = new $class($importer_id, $feed_nid);
  179. }
  180. return $instances[$class][$importer_id][$feed_nid];
  181. }
  182. /**
  183. * Constructor.
  184. */
  185. protected function __construct($importer_id, $feed_nid) {
  186. $this->feed_nid = $feed_nid;
  187. $this->importer = feeds_importer($importer_id);
  188. parent::__construct($importer_id);
  189. $this->load();
  190. }
  191. /**
  192. * Returns the FeedsImporter object that this source is expected to be used with.
  193. */
  194. public function importer() {
  195. return $this->importer;
  196. }
  197. /**
  198. * Preview = fetch and parse a feed.
  199. *
  200. * @return
  201. * FeedsParserResult object.
  202. *
  203. * @throws
  204. * Throws Exception if an error occurs when fetching or parsing.
  205. */
  206. public function preview() {
  207. $result = $this->importer->fetcher->fetch($this);
  208. $result = $this->importer->parser->parse($this, $result);
  209. module_invoke_all('feeds_after_parse', $this, $result);
  210. return $result;
  211. }
  212. /**
  213. * Start importing a source.
  214. *
  215. * This method starts an import job. Depending on the configuration of the
  216. * importer of this source, a Batch API job or a background job with Job
  217. * Scheduler will be created.
  218. *
  219. * @throws Exception
  220. * If processing in background is enabled, the first batch chunk of the
  221. * import will be executed on the current page request. This means that this
  222. * method may throw the same exceptions as FeedsSource::import().
  223. */
  224. public function startImport() {
  225. $config = $this->importer->getConfig();
  226. if ($config['process_in_background']) {
  227. $this->startBackgroundJob('import');
  228. }
  229. else {
  230. $this->startBatchAPIJob(t('Importing'), 'import');
  231. }
  232. }
  233. /**
  234. * Start deleting all imported items of a source.
  235. *
  236. * This method starts a clear job. Depending on the configuration of the
  237. * importer of this source, a Batch API job or a background job with Job
  238. * Scheduler will be created.
  239. *
  240. * @throws Exception
  241. * If processing in background is enabled, the first batch chunk of the
  242. * clear task will be executed on the current page request. This means that
  243. * this method may throw the same exceptions as FeedsSource::clear().
  244. */
  245. public function startClear() {
  246. $config = $this->importer->getConfig();
  247. if ($config['process_in_background']) {
  248. $this->startBackgroundJob('clear');
  249. }
  250. else {
  251. $this->startBatchAPIJob(t('Deleting'), 'clear');
  252. }
  253. }
  254. /**
  255. * Schedule all periodic tasks for this source.
  256. */
  257. public function schedule() {
  258. $this->scheduleImport();
  259. $this->scheduleExpire();
  260. }
  261. /**
  262. * Schedule periodic or background import tasks.
  263. */
  264. public function scheduleImport() {
  265. // Check whether any fetcher is overriding the import period.
  266. $period = $this->importer->config['import_period'];
  267. $fetcher_period = $this->importer->fetcher->importPeriod($this);
  268. if (is_numeric($fetcher_period)) {
  269. $period = $fetcher_period;
  270. }
  271. $job = array(
  272. 'type' => $this->id,
  273. 'id' => $this->feed_nid,
  274. 'period' => $period,
  275. 'periodic' => TRUE,
  276. );
  277. if ($period == FEEDS_SCHEDULE_NEVER && $this->progressImporting() === FEEDS_BATCH_COMPLETE) {
  278. JobScheduler::get('feeds_source_import')->remove($job);
  279. }
  280. elseif ($this->progressImporting() === FEEDS_BATCH_COMPLETE) {
  281. JobScheduler::get('feeds_source_import')->set($job);
  282. }
  283. else {
  284. // Feed is not fully imported yet, so we put this job back in the queue
  285. // immediately for further processing.
  286. $queue = DrupalQueue::get('feeds_source_import');
  287. $queue->createItem($job);
  288. }
  289. }
  290. /**
  291. * Schedule background expire tasks.
  292. */
  293. public function scheduleExpire() {
  294. // Schedule as soon as possible if a batch is active.
  295. $period = $this->progressExpiring() === FEEDS_BATCH_COMPLETE ? 3600 : 0;
  296. $job = array(
  297. 'type' => $this->id,
  298. 'id' => $this->feed_nid,
  299. 'period' => $period,
  300. 'periodic' => TRUE,
  301. );
  302. if ($this->importer->processor->expiryTime() == FEEDS_EXPIRE_NEVER) {
  303. JobScheduler::get('feeds_source_expire')->remove($job);
  304. }
  305. else {
  306. JobScheduler::get('feeds_source_expire')->set($job);
  307. }
  308. }
  309. /**
  310. * Schedule background clearing tasks.
  311. */
  312. public function scheduleClear() {
  313. $job = array(
  314. 'type' => $this->id,
  315. 'id' => $this->feed_nid,
  316. 'period' => 0,
  317. 'periodic' => TRUE,
  318. );
  319. // Remove job if batch is complete.
  320. if ($this->progressClearing() === FEEDS_BATCH_COMPLETE) {
  321. JobScheduler::get('feeds_source_clear')->remove($job);
  322. }
  323. // Schedule as soon as possible if batch is not complete.
  324. else {
  325. JobScheduler::get('feeds_source_clear')->set($job);
  326. }
  327. }
  328. /**
  329. * Import a source: execute fetching, parsing and processing stage.
  330. *
  331. * This method only executes the current batch chunk, then returns. If you are
  332. * looking to import an entire source, use FeedsSource::startImport() instead.
  333. *
  334. * @return
  335. * FEEDS_BATCH_COMPLETE if the import process finished. A decimal between
  336. * 0.0 and 0.9 periodic if import is still in progress.
  337. *
  338. * @throws
  339. * Throws Exception if an error occurs when importing.
  340. */
  341. public function import() {
  342. $this->acquireLock();
  343. try {
  344. // If fetcher result is empty, we are starting a new import, log.
  345. if (empty($this->fetcher_result)) {
  346. module_invoke_all('feeds_before_import', $this);
  347. $this->state[FEEDS_START] = time();
  348. }
  349. // Fetch.
  350. if (empty($this->fetcher_result) || FEEDS_BATCH_COMPLETE == $this->progressParsing()) {
  351. $this->fetcher_result = $this->importer->fetcher->fetch($this);
  352. // Clean the parser's state, we are parsing an entirely new file.
  353. unset($this->state[FEEDS_PARSE]);
  354. }
  355. // Parse.
  356. $parser_result = $this->importer->parser->parse($this, $this->fetcher_result);
  357. module_invoke_all('feeds_after_parse', $this, $parser_result);
  358. // Process.
  359. $this->importer->processor->process($this, $parser_result);
  360. // Import finished without exceptions, so unset any potentially previously
  361. // recorded exceptions.
  362. unset($this->exception);
  363. }
  364. catch (Exception $e) {
  365. // $e is stored and re-thrown once we've had a chance to log our progress.
  366. // Set the exception so that other modules can check if an exception
  367. // occurred in hook_feeds_after_import().
  368. $this->exception = $e;
  369. }
  370. // Clean up.
  371. $result = $this->progressImporting();
  372. if ($result == FEEDS_BATCH_COMPLETE || isset($e)) {
  373. $this->imported = time();
  374. $this->log('import', 'Imported in @s seconds.', array('@s' => $this->imported - $this->state[FEEDS_START]), WATCHDOG_INFO);
  375. module_invoke_all('feeds_after_import', $this);
  376. unset($this->fetcher_result, $this->state);
  377. }
  378. $this->save();
  379. $this->releaseLock();
  380. if (isset($e)) {
  381. throw $e;
  382. }
  383. return $result;
  384. }
  385. /**
  386. * Imports a fetcher result all at once in memory.
  387. *
  388. * @param FeedsFetcherResult $fetcher_result
  389. * The fetcher result to process.
  390. *
  391. * @throws Exception
  392. * Thrown if an error occurs when importing.
  393. */
  394. public function pushImport(FeedsFetcherResult $fetcher_result) {
  395. // Since locks only work during a request, check if an import is active.
  396. if (!empty($this->fetcher_result) || !empty($this->state)) {
  397. throw new RuntimeException('The feed is currently importing.');
  398. }
  399. $this->acquireLock();
  400. $start = time();
  401. try {
  402. module_invoke_all('feeds_before_import', $this);
  403. // Parse.
  404. do {
  405. $parser_result = $this->importer->parser->parse($this, $fetcher_result);
  406. module_invoke_all('feeds_after_parse', $this, $parser_result);
  407. // Process.
  408. $this->importer->processor->process($this, $parser_result);
  409. } while ($this->progressParsing() !== FEEDS_BATCH_COMPLETE);
  410. }
  411. catch (Exception $e) {
  412. // $e is stored and re-thrown once we've had a chance to log our progress.
  413. // Set the exception so that other modules can check if an exception
  414. // occurred in hook_feeds_after_import().
  415. $this->exception = $e;
  416. }
  417. module_invoke_all('feeds_after_import', $this);
  418. $this->imported = time();
  419. $this->log('import', 'Imported in @s seconds.', array('@s' => $this->imported - $start), WATCHDOG_INFO);
  420. unset($this->fetcher_result, $this->state);
  421. $this->save();
  422. $this->releaseLock();
  423. if (isset($e)) {
  424. throw $e;
  425. }
  426. }
  427. /**
  428. * Remove all items from a feed.
  429. *
  430. * This method only executes the current batch chunk, then returns. If you are
  431. * looking to delete all items of a source, use FeedsSource::startClear()
  432. * instead.
  433. *
  434. * @return
  435. * FEEDS_BATCH_COMPLETE if the clearing process finished. A decimal between
  436. * 0.0 and 0.9 periodic if clearing is still in progress.
  437. *
  438. * @throws
  439. * Throws Exception if an error occurs when clearing.
  440. */
  441. public function clear() {
  442. $this->acquireLock();
  443. try {
  444. $this->importer->fetcher->clear($this);
  445. $this->importer->parser->clear($this);
  446. $this->importer->processor->clear($this);
  447. }
  448. catch (Exception $e) {
  449. // $e is stored and re-thrown once we've had a chance to log our progress.
  450. }
  451. $this->releaseLock();
  452. // Clean up.
  453. $result = $this->progressClearing();
  454. if ($result == FEEDS_BATCH_COMPLETE || isset($e)) {
  455. module_invoke_all('feeds_after_clear', $this);
  456. unset($this->state);
  457. }
  458. $this->save();
  459. if (isset($e)) {
  460. throw $e;
  461. }
  462. return $result;
  463. }
  464. /**
  465. * Removes all expired items from a feed.
  466. */
  467. public function expire() {
  468. $this->acquireLock();
  469. try {
  470. $result = $this->importer->processor->expire($this);
  471. }
  472. catch (Exception $e) {
  473. // Will throw after the lock is released.
  474. }
  475. $this->releaseLock();
  476. if (isset($e)) {
  477. throw $e;
  478. }
  479. return $result;
  480. }
  481. /**
  482. * Report progress as float between 0 and 1. 1 = FEEDS_BATCH_COMPLETE.
  483. */
  484. public function progressParsing() {
  485. return $this->state(FEEDS_PARSE)->progress;
  486. }
  487. /**
  488. * Report progress as float between 0 and 1. 1 = FEEDS_BATCH_COMPLETE.
  489. */
  490. public function progressImporting() {
  491. $fetcher = $this->state(FEEDS_FETCH);
  492. $parser = $this->state(FEEDS_PARSE);
  493. if ($fetcher->progress == FEEDS_BATCH_COMPLETE && $parser->progress == FEEDS_BATCH_COMPLETE) {
  494. return FEEDS_BATCH_COMPLETE;
  495. }
  496. // Fetching envelops parsing.
  497. // @todo: this assumes all fetchers neatly use total. May not be the case.
  498. $fetcher_fraction = $fetcher->total ? 1.0 / $fetcher->total : 1.0;
  499. $parser_progress = $parser->progress * $fetcher_fraction;
  500. $result = $fetcher->progress - $fetcher_fraction + $parser_progress;
  501. if ($result == FEEDS_BATCH_COMPLETE) {
  502. return 0.99;
  503. }
  504. return $result;
  505. }
  506. /**
  507. * Report progress on clearing.
  508. */
  509. public function progressClearing() {
  510. return $this->state(FEEDS_PROCESS_CLEAR)->progress;
  511. }
  512. /**
  513. * Report progress on expiry.
  514. */
  515. public function progressExpiring() {
  516. return $this->state(FEEDS_PROCESS_EXPIRE)->progress;
  517. }
  518. /**
  519. * Return a state object for a given stage. Lazy instantiates new states.
  520. *
  521. * @todo Rename getConfigFor() accordingly to config().
  522. *
  523. * @param $stage
  524. * One of FEEDS_FETCH, FEEDS_PARSE, FEEDS_PROCESS or FEEDS_PROCESS_CLEAR.
  525. *
  526. * @return
  527. * The FeedsState object for the given stage.
  528. */
  529. public function state($stage) {
  530. if (!is_array($this->state)) {
  531. $this->state = array();
  532. }
  533. if (!isset($this->state[$stage])) {
  534. $this->state[$stage] = new FeedsState();
  535. }
  536. return $this->state[$stage];
  537. }
  538. /**
  539. * Count items imported by this source.
  540. */
  541. public function itemCount() {
  542. return $this->importer->processor->itemCount($this);
  543. }
  544. /**
  545. * Save configuration.
  546. */
  547. public function save() {
  548. // Alert implementers of FeedsSourceInterface to the fact that we're saving.
  549. foreach ($this->importer->plugin_types as $type) {
  550. $this->importer->$type->sourceSave($this);
  551. }
  552. $config = $this->getConfig();
  553. // Store the source property of the fetcher in a separate column so that we
  554. // can do fast lookups on it.
  555. $source = '';
  556. if (isset($config[get_class($this->importer->fetcher)]['source'])) {
  557. $source = $config[get_class($this->importer->fetcher)]['source'];
  558. }
  559. $object = array(
  560. 'id' => $this->id,
  561. 'feed_nid' => $this->feed_nid,
  562. 'imported' => $this->imported,
  563. 'config' => $config,
  564. 'source' => $source,
  565. 'state' => isset($this->state) ? $this->state : FALSE,
  566. 'fetcher_result' => isset($this->fetcher_result) ? $this->fetcher_result : FALSE,
  567. );
  568. if (db_query_range("SELECT 1 FROM {feeds_source} WHERE id = :id AND feed_nid = :nid", 0, 1, array(':id' => $this->id, ':nid' => $this->feed_nid))->fetchField()) {
  569. drupal_write_record('feeds_source', $object, array('id', 'feed_nid'));
  570. }
  571. else {
  572. drupal_write_record('feeds_source', $object);
  573. }
  574. }
  575. /**
  576. * Load configuration and unpack.
  577. *
  578. * @todo Patch CTools to move constants from export.inc to ctools.module.
  579. */
  580. public function load() {
  581. if ($record = db_query("SELECT imported, config, state, fetcher_result FROM {feeds_source} WHERE id = :id AND feed_nid = :nid", array(':id' => $this->id, ':nid' => $this->feed_nid))->fetchObject()) {
  582. // While FeedsSource cannot be exported, we still use CTool's export.inc
  583. // export definitions.
  584. ctools_include('export');
  585. $this->export_type = EXPORT_IN_DATABASE;
  586. $this->imported = $record->imported;
  587. $this->config = unserialize($record->config);
  588. if (!empty($record->state)) {
  589. $this->state = unserialize($record->state);
  590. }
  591. if (!is_array($this->state)) {
  592. $this->state = array();
  593. }
  594. if (!empty($record->fetcher_result)) {
  595. $this->fetcher_result = unserialize($record->fetcher_result);
  596. }
  597. }
  598. }
  599. /**
  600. * Delete configuration. Removes configuration information
  601. * from database, does not delete configuration itself.
  602. */
  603. public function delete() {
  604. // Alert implementers of FeedsSourceInterface to the fact that we're
  605. // deleting.
  606. foreach ($this->importer->plugin_types as $type) {
  607. $this->importer->$type->sourceDelete($this);
  608. }
  609. db_delete('feeds_source')
  610. ->condition('id', $this->id)
  611. ->condition('feed_nid', $this->feed_nid)
  612. ->execute();
  613. // Remove from schedule.
  614. $job = array(
  615. 'type' => $this->id,
  616. 'id' => $this->feed_nid,
  617. );
  618. JobScheduler::get('feeds_source_import')->remove($job);
  619. JobScheduler::get('feeds_source_expire')->remove($job);
  620. }
  621. /**
  622. * Only return source if configuration is persistent and valid.
  623. *
  624. * @see FeedsConfigurable::existing().
  625. */
  626. public function existing() {
  627. // If there is no feed nid given, there must be no content type specified.
  628. // If there is a feed nid given, there must be a content type specified.
  629. // Ensure that importer is persistent (= defined in code or DB).
  630. // Ensure that source is persistent (= defined in DB).
  631. if ((empty($this->feed_nid) && empty($this->importer->config['content_type'])) ||
  632. (!empty($this->feed_nid) && !empty($this->importer->config['content_type']))) {
  633. $this->importer->existing();
  634. return parent::existing();
  635. }
  636. throw new FeedsNotExistingException(t('Source configuration not valid.'));
  637. }
  638. /**
  639. * Returns the configuration for a specific client class.
  640. *
  641. * @param FeedsSourceInterface $client
  642. * An object that is an implementer of FeedsSourceInterface.
  643. *
  644. * @return
  645. * An array stored for $client.
  646. */
  647. public function getConfigFor(FeedsSourceInterface $client) {
  648. $class = get_class($client);
  649. return isset($this->config[$class]) ? $this->config[$class] : $client->sourceDefaults();
  650. }
  651. /**
  652. * Sets the configuration for a specific client class.
  653. *
  654. * @param FeedsSourceInterface $client
  655. * An object that is an implementer of FeedsSourceInterface.
  656. * @param $config
  657. * The configuration for $client.
  658. *
  659. * @return
  660. * An array stored for $client.
  661. */
  662. public function setConfigFor(FeedsSourceInterface $client, $config) {
  663. $this->config[get_class($client)] = $config;
  664. }
  665. /**
  666. * Return defaults for feed configuration.
  667. */
  668. public function configDefaults() {
  669. // Collect information from plugins.
  670. $defaults = array();
  671. foreach ($this->importer->plugin_types as $type) {
  672. if ($this->importer->$type->hasSourceConfig()) {
  673. $defaults[get_class($this->importer->$type)] = $this->importer->$type->sourceDefaults();
  674. }
  675. }
  676. return $defaults;
  677. }
  678. /**
  679. * Override parent::configForm().
  680. */
  681. public function configForm(&$form_state) {
  682. // Collect information from plugins.
  683. $form = array();
  684. foreach ($this->importer->plugin_types as $type) {
  685. if ($this->importer->$type->hasSourceConfig()) {
  686. $class = get_class($this->importer->$type);
  687. $config = isset($this->config[$class]) ? $this->config[$class] : array();
  688. $form[$class] = $this->importer->$type->sourceForm($config);
  689. $form[$class]['#tree'] = TRUE;
  690. }
  691. }
  692. return $form;
  693. }
  694. /**
  695. * Override parent::configFormValidate().
  696. */
  697. public function configFormValidate(&$values) {
  698. foreach ($this->importer->plugin_types as $type) {
  699. $class = get_class($this->importer->$type);
  700. if (isset($values[$class]) && $this->importer->$type->hasSourceConfig()) {
  701. $this->importer->$type->sourceFormValidate($values[$class]);
  702. }
  703. }
  704. }
  705. /**
  706. * Writes to feeds log.
  707. */
  708. public function log($type, $message, $variables = array(), $severity = WATCHDOG_NOTICE) {
  709. feeds_log($this->id, $this->feed_nid, $type, $message, $variables, $severity);
  710. }
  711. /**
  712. * Background job helper. Starts a background job using Job Scheduler.
  713. *
  714. * Execute the first batch chunk of a background job on the current page load,
  715. * moves the rest of the job processing to a cron powered background job.
  716. *
  717. * Executing the first batch chunk is important, otherwise, when a user
  718. * submits a source for import or clearing, we will leave her without any
  719. * visual indicators of an ongoing job.
  720. *
  721. * @see FeedsSource::startImport().
  722. * @see FeedsSource::startClear().
  723. *
  724. * @param $method
  725. * Method to execute on importer; one of 'import' or 'clear'.
  726. *
  727. * @throws Exception $e
  728. */
  729. protected function startBackgroundJob($method) {
  730. if (FEEDS_BATCH_COMPLETE != $this->$method()) {
  731. $job = array(
  732. 'type' => $this->id,
  733. 'id' => $this->feed_nid,
  734. 'period' => 0,
  735. 'periodic' => FALSE,
  736. );
  737. JobScheduler::get("feeds_source_{$method}")->set($job);
  738. }
  739. }
  740. /**
  741. * Batch API helper. Starts a Batch API job.
  742. *
  743. * @see FeedsSource::startImport().
  744. * @see FeedsSource::startClear().
  745. * @see feeds_batch()
  746. *
  747. * @param $title
  748. * Title to show to user when executing batch.
  749. * @param $method
  750. * Method to execute on importer; one of 'import' or 'clear'.
  751. */
  752. protected function startBatchAPIJob($title, $method) {
  753. $batch = array(
  754. 'title' => $title,
  755. 'operations' => array(
  756. array('feeds_batch', array($method, $this->id, $this->feed_nid)),
  757. ),
  758. 'progress_message' => '',
  759. );
  760. batch_set($batch);
  761. }
  762. /**
  763. * Acquires a lock for this source.
  764. *
  765. * @throws FeedsLockException
  766. * If a lock for the requested job could not be acquired.
  767. */
  768. protected function acquireLock() {
  769. if (!lock_acquire("feeds_source_{$this->id}_{$this->feed_nid}", 60.0)) {
  770. throw new FeedsLockException(t('Cannot acquire lock for source @id / @feed_nid.', array('@id' => $this->id, '@feed_nid' => $this->feed_nid)));
  771. }
  772. }
  773. /**
  774. * Releases a lock for this source.
  775. */
  776. protected function releaseLock() {
  777. lock_release("feeds_source_{$this->id}_{$this->feed_nid}");
  778. }
  779. /**
  780. * Implements FeedsConfigurable::dependencies().
  781. */
  782. public function dependencies() {
  783. $dependencies = parent::dependencies();
  784. return array_merge($dependencies, $this->importer()->dependencies());
  785. }
  786. }