queue.js 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249
  1. /**
  2. * @module vow-queue
  3. * @author Filatov Dmitry <dfilatov@yandex-team.ru>
  4. * @version 0.3.1
  5. * @license
  6. * Dual licensed under the MIT and GPL licenses:
  7. * * http://www.opensource.org/licenses/mit-license.php
  8. * * http://www.gnu.org/licenses/gpl.html
  9. */
  10. (function() {
  11. function getModule(vow, nextTick) {
  12. var extend = function() {
  13. var res = {};
  14. for(var i = 0, len = arguments.length; i < len; i++) {
  15. var obj = arguments[i];
  16. if(obj) {
  17. for(var key in obj) {
  18. obj.hasOwnProperty(key) && (res[key] = obj[key]);
  19. }
  20. }
  21. }
  22. return res;
  23. },
  24. DEFAULT_QUEUE_PARAMS = {
  25. weightLimit : 100
  26. },
  27. DEFAULT_TASK_PARAMS = {
  28. weight : 1,
  29. priority : 1
  30. };
  31. /**
  32. * @class Queue
  33. * @exports vow-queue
  34. */
  35. /**
  36. * @constructor
  37. * @param {Object} [params]
  38. * @param {Number} [params.weightLimit=100]
  39. */
  40. function Queue(params) {
  41. this._pendingTasks = [];
  42. this._params = extend(DEFAULT_QUEUE_PARAMS, params);
  43. this._curWeight = 0;
  44. this._isRunScheduled = false;
  45. this._isStopped = true;
  46. this._processedBuffer = [];
  47. this._stats = {
  48. pendingTasksCount : 0,
  49. processingTasksCount : 0,
  50. processedTasksCount : 0
  51. };
  52. }
  53. Queue.prototype = /** @lends Queue.prototype */ {
  54. /**
  55. * Adds task to queue
  56. *
  57. * @param {Function} taskFn
  58. * @param {Object} [taskParams]
  59. * @param {Number} [taskParams.weight=1]
  60. * @param {Number} [taskParams.priority=1]
  61. * @returns {vow:promise}
  62. */
  63. enqueue : function(taskFn, taskParams) {
  64. var task = this._buildTask(taskFn, taskParams);
  65. if(task.params.weight > this._params.weightLimit) {
  66. throw Error('task with weight of ' +
  67. task.params.weight +
  68. ' can\'t be performed in queue with limit of ' +
  69. this._params.weightLimit);
  70. }
  71. this._enqueueTask(task);
  72. this._isStopped || this._scheduleRun();
  73. task.defer.promise().always(
  74. function() {
  75. this._stats.processingTasksCount--;
  76. this._stats.processedTasksCount++;
  77. },
  78. this);
  79. return task.defer.promise();
  80. },
  81. /**
  82. * Starts processing of queue
  83. */
  84. start : function() {
  85. if(!this._isStopped) {
  86. return;
  87. }
  88. this._isStopped = false;
  89. var processedBuffer = this._processedBuffer;
  90. if(processedBuffer.length) {
  91. this._processedBuffer = [];
  92. nextTick(function() {
  93. while(processedBuffer.length) {
  94. processedBuffer.shift()();
  95. }
  96. });
  97. }
  98. this._hasPendingTasks() && this._scheduleRun();
  99. },
  100. /**
  101. * Stops processing of queue
  102. */
  103. stop : function() {
  104. this._isStopped = true;
  105. },
  106. /**
  107. * Checks whether the queue is started
  108. * @returns {Boolean}
  109. */
  110. isStarted : function() {
  111. return !this._isStopped;
  112. },
  113. /**
  114. * Sets params of queue
  115. *
  116. * @param {Object} params
  117. * @param {Number} [params.weightLimit]
  118. */
  119. params : function(params) {
  120. if(typeof params.weightLimit !== 'undefined') {
  121. this._params.weightLimit = params.weightLimit;
  122. this._scheduleRun();
  123. }
  124. },
  125. getStats : function() {
  126. return this._stats;
  127. },
  128. _buildTask : function(taskFn, taskParams) {
  129. return {
  130. fn : taskFn,
  131. params : extend(DEFAULT_TASK_PARAMS, taskParams),
  132. defer : vow.defer()
  133. };
  134. },
  135. _enqueueTask : function(task) {
  136. var pendingTasks = this._pendingTasks,
  137. i = pendingTasks.length;
  138. this._stats.pendingTasksCount++;
  139. while(i) {
  140. if(pendingTasks[i - 1].params.priority >= task.params.priority) {
  141. i === pendingTasks.length?
  142. pendingTasks.push(task) :
  143. pendingTasks.splice(i, 0, task);
  144. return;
  145. }
  146. i--;
  147. }
  148. pendingTasks.push(task);
  149. },
  150. _scheduleRun : function() {
  151. if(!this._isRunScheduled) {
  152. this._isRunScheduled = true;
  153. nextTick(this._run.bind(this));
  154. }
  155. },
  156. _run : function() {
  157. this._isRunScheduled = false;
  158. while(this._hasPendingTasks() && this._allowRunTask(this._pendingTasks[0])) {
  159. this._runTask(this._pendingTasks.shift());
  160. }
  161. },
  162. _hasPendingTasks : function() {
  163. return !!this._pendingTasks.length;
  164. },
  165. _allowRunTask : function(task) {
  166. return this._curWeight + task.params.weight <= this._params.weightLimit;
  167. },
  168. _runTask : function(task) {
  169. this._curWeight += task.params.weight;
  170. this._stats.pendingTasksCount--;
  171. this._stats.processingTasksCount++;
  172. var taskRes = vow.invoke(task.fn);
  173. taskRes
  174. .progress(
  175. task.defer.notify,
  176. task.defer)
  177. .always(
  178. function() {
  179. this._curWeight -= task.params.weight;
  180. if(this._isStopped) {
  181. this._processedBuffer.push(function() {
  182. task.defer.resolve(taskRes);
  183. });
  184. }
  185. else {
  186. task.defer.resolve(taskRes);
  187. this._scheduleRun();
  188. }
  189. },
  190. this);
  191. }
  192. };
  193. return Queue;
  194. }
  195. var nextTick = typeof setImmediate !== 'undefined'?
  196. setImmediate :
  197. typeof process === 'object' && process.nextTick?
  198. process.nextTick :
  199. function(fn) {
  200. setTimeout(fn, 0);
  201. };
  202. if(typeof modules !== 'undefined') {
  203. /* global modules */
  204. modules.define('vow-queue', ['vow'], function(provide, vow) {
  205. provide(getModule(vow, nextTick));
  206. });
  207. }
  208. else if(typeof exports === 'object') {
  209. module.exports = getModule(require('vow'), nextTick);
  210. }
  211. })();