index.js 976 B

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354
  1. var Stream = require('stream');
  2. function prop(propName) {
  3. return function (data) {
  4. return data[propName];
  5. };
  6. }
  7. module.exports = unique;
  8. function unique(propName) {
  9. var keyfn = JSON.stringify;
  10. if (typeof propName === 'string') {
  11. keyfn = prop(propName);
  12. } else if (typeof propName === 'function') {
  13. keyfn = propName;
  14. }
  15. var seen = {};
  16. var s = new Stream();
  17. s.readable = true;
  18. s.writable = true;
  19. var pipes = 0;
  20. s.write = function (data) {
  21. var key = keyfn(data);
  22. if (seen[key] === undefined) {
  23. seen[key] = true;
  24. s.emit('data', data);
  25. }
  26. };
  27. var ended = 0;
  28. s.end = function (data) {
  29. if (arguments.length) s.write(data);
  30. ended++;
  31. if (ended === pipes || pipes === 0) {
  32. s.writable = false;
  33. s.emit('end');
  34. }
  35. };
  36. s.destroy = function (data) {
  37. s.writable = false;
  38. };
  39. s.on('pipe', function () {
  40. pipes++;
  41. });
  42. s.on('unpipe', function () {
  43. pipes--;
  44. });
  45. return s;
  46. }