CallbackFilter.php 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. <?php
  2. namespace Clue\StreamFilter;
  3. use php_user_filter;
  4. use InvalidArgumentException;
  5. use ReflectionFunction;
  6. use Exception;
  7. /**
  8. *
  9. * @internal
  10. * @see append()
  11. * @see prepend()
  12. */
  13. class CallbackFilter extends php_user_filter
  14. {
  15. private $callback;
  16. private $closed = true;
  17. private $supportsClose = false;
  18. public function onCreate()
  19. {
  20. $this->closed = false;
  21. if (!is_callable($this->params)) {
  22. throw new InvalidArgumentException('No valid callback parameter given to stream_filter_(append|prepend)');
  23. }
  24. $this->callback = $this->params;
  25. // callback supports end event if it accepts invocation without arguments
  26. $ref = new ReflectionFunction($this->callback);
  27. $this->supportsClose = ($ref->getNumberOfRequiredParameters() === 0);
  28. return true;
  29. }
  30. public function onClose()
  31. {
  32. $this->closed = true;
  33. // callback supports closing and is not already closed
  34. if ($this->supportsClose) {
  35. $this->supportsClose = false;
  36. // invoke without argument to signal end and discard resulting buffer
  37. try {
  38. call_user_func($this->callback);
  39. } catch (Exception $ignored) {
  40. // this might be called during engine shutdown, so it's not safe
  41. // to raise any errors or exceptions here
  42. // trigger_error('Error closing filter: ' . $ignored->getMessage(), E_USER_WARNING);
  43. }
  44. }
  45. $this->callback = null;
  46. }
  47. public function filter($in, $out, &$consumed, $closing)
  48. {
  49. // concatenate whole buffer from input brigade
  50. $data = '';
  51. while ($bucket = stream_bucket_make_writeable($in)) {
  52. $consumed += $bucket->datalen;
  53. $data .= $bucket->data;
  54. }
  55. // skip processing callback that already ended
  56. if ($this->closed) {
  57. return PSFS_FEED_ME;
  58. }
  59. // only invoke filter function if buffer is not empty
  60. // this may skip flushing a closing filter
  61. if ($data !== '') {
  62. try {
  63. $data = call_user_func($this->callback, $data);
  64. } catch (Exception $e) {
  65. // exception should mark filter as closed
  66. $this->onClose();
  67. trigger_error('Error invoking filter: ' . $e->getMessage(), E_USER_WARNING);
  68. return PSFS_ERR_FATAL;
  69. }
  70. }
  71. // mark filter as closed after processing closing chunk
  72. if ($closing) {
  73. $this->closed = true;
  74. // callback supports closing and is not already closed
  75. if ($this->supportsClose) {
  76. $this->supportsClose = false;
  77. // invoke without argument to signal end and append resulting buffer
  78. try {
  79. $data .= call_user_func($this->callback);
  80. } catch (Exception $e) {
  81. trigger_error('Error ending filter: ' . $e->getMessage(), E_USER_WARNING);
  82. return PSFS_ERR_FATAL;
  83. }
  84. }
  85. }
  86. if ($data !== '') {
  87. // create a new bucket for writing the resulting buffer to the output brigade
  88. // reusing an existing bucket turned out to be bugged in some environments (ancient PHP versions and HHVM)
  89. $bucket = @stream_bucket_new($this->stream, $data);
  90. // legacy PHP versions (PHP < 5.4) do not support passing data from the event signal handler
  91. // because closing the stream invalidates the stream and its stream bucket brigade before
  92. // invoking the filter close handler.
  93. if ($bucket !== false) {
  94. stream_bucket_append($out, $bucket);
  95. }
  96. }
  97. return PSFS_PASS_ON;
  98. }
  99. }