runTask.js 1.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
  1. /*jshint node:true */
  2. "use strict";
  3. var eos = require('end-of-stream');
  4. var consume = require('stream-consume');
  5. module.exports = function (task, done) {
  6. var that = this, finish, cb, isDone = false, start, r;
  7. finish = function (err, runMethod) {
  8. var hrDuration = process.hrtime(start);
  9. if (isDone && !err) {
  10. err = new Error('task completion callback called too many times');
  11. }
  12. isDone = true;
  13. var duration = hrDuration[0] + (hrDuration[1] / 1e9); // seconds
  14. done.call(that, err, {
  15. duration: duration, // seconds
  16. hrDuration: hrDuration, // [seconds,nanoseconds]
  17. runMethod: runMethod
  18. });
  19. };
  20. cb = function (err) {
  21. finish(err, 'callback');
  22. };
  23. try {
  24. start = process.hrtime();
  25. r = task(cb);
  26. } catch (err) {
  27. return finish(err, 'catch');
  28. }
  29. if (r && typeof r.then === 'function') {
  30. // wait for promise to resolve
  31. // FRAGILE: ASSUME: Promises/A+, see http://promises-aplus.github.io/promises-spec/
  32. r.then(function () {
  33. finish(null, 'promise');
  34. }, function(err) {
  35. finish(err, 'promise');
  36. });
  37. } else if (r && typeof r.pipe === 'function') {
  38. // wait for stream to end
  39. eos(r, { error: true, readable: r.readable, writable: r.writable && !r.readable }, function(err){
  40. finish(err, 'stream');
  41. });
  42. // Ensure that the stream completes
  43. consume(r);
  44. } else if (task.length === 0) {
  45. // synchronous, function took in args.length parameters, and the callback was extra
  46. finish(null, 'sync');
  47. //} else {
  48. // FRAGILE: ASSUME: callback
  49. }
  50. };