StreamBuffer.php 8.8KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300
  1. <?php
  2. /*
  3. * This file is part of SwiftMailer.
  4. * (c) 2004-2009 Chris Corbyn
  5. *
  6. * For the full copyright and license information, please view the LICENSE
  7. * file that was distributed with this source code.
  8. */
  9. /**
  10. * A generic IoBuffer implementation supporting remote sockets and local processes.
  11. * @package Swift
  12. * @subpackage Transport
  13. * @author Chris Corbyn
  14. */
  15. class Swift_Transport_StreamBuffer extends Swift_ByteStream_AbstractFilterableInputStream implements Swift_Transport_IoBuffer
  16. {
  17. /** A primary socket */
  18. private $_stream;
  19. /** The input stream */
  20. private $_in;
  21. /** The output stream */
  22. private $_out;
  23. /** Buffer initialization parameters */
  24. private $_params = array();
  25. /** The ReplacementFilterFactory */
  26. private $_replacementFactory;
  27. /** Translations performed on data being streamed into the buffer */
  28. private $_translations = array();
  29. /**
  30. * Create a new StreamBuffer using $replacementFactory for transformations.
  31. * @param Swift_ReplacementFilterFactory $replacementFactory
  32. */
  33. public function __construct(Swift_ReplacementFilterFactory $replacementFactory)
  34. {
  35. $this->_replacementFactory = $replacementFactory;
  36. }
  37. /**
  38. * Perform any initialization needed, using the given $params.
  39. * Parameters will vary depending upon the type of IoBuffer used.
  40. * @param array $params
  41. */
  42. public function initialize(array $params)
  43. {
  44. $this->_params = $params;
  45. switch ($params['type']) {
  46. case self::TYPE_PROCESS:
  47. $this->_establishProcessConnection();
  48. break;
  49. case self::TYPE_SOCKET:
  50. default:
  51. $this->_establishSocketConnection();
  52. break;
  53. }
  54. }
  55. /**
  56. * Set an individual param on the buffer (e.g. switching to SSL).
  57. * @param string $param
  58. * @param mixed $value
  59. */
  60. public function setParam($param, $value)
  61. {
  62. if (isset($this->_stream)) {
  63. switch ($param) {
  64. case 'timeout':
  65. if ($this->_stream) {
  66. stream_set_timeout($this->_stream, $value);
  67. }
  68. break;
  69. case 'blocking':
  70. if ($this->_stream) {
  71. stream_set_blocking($this->_stream, 1);
  72. }
  73. }
  74. }
  75. $this->_params[$param] = $value;
  76. }
  77. public function startTLS()
  78. {
  79. return stream_socket_enable_crypto($this->_stream, true, STREAM_CRYPTO_METHOD_TLS_CLIENT);
  80. }
  81. /**
  82. * Perform any shutdown logic needed.
  83. */
  84. public function terminate()
  85. {
  86. if (isset($this->_stream)) {
  87. switch ($this->_params['type']) {
  88. case self::TYPE_PROCESS:
  89. fclose($this->_in);
  90. fclose($this->_out);
  91. proc_close($this->_stream);
  92. break;
  93. case self::TYPE_SOCKET:
  94. default:
  95. fclose($this->_stream);
  96. break;
  97. }
  98. }
  99. $this->_stream = null;
  100. $this->_out = null;
  101. $this->_in = null;
  102. }
  103. /**
  104. * Set an array of string replacements which should be made on data written
  105. * to the buffer. This could replace LF with CRLF for example.
  106. * @param string[] $replacements
  107. */
  108. public function setWriteTranslations(array $replacements)
  109. {
  110. foreach ($this->_translations as $search => $replace) {
  111. if (!isset($replacements[$search])) {
  112. $this->removeFilter($search);
  113. unset($this->_translations[$search]);
  114. }
  115. }
  116. foreach ($replacements as $search => $replace) {
  117. if (!isset($this->_translations[$search])) {
  118. $this->addFilter(
  119. $this->_replacementFactory->createFilter($search, $replace), $search
  120. );
  121. $this->_translations[$search] = true;
  122. }
  123. }
  124. }
  125. /**
  126. * Get a line of output (including any CRLF).
  127. * The $sequence number comes from any writes and may or may not be used
  128. * depending upon the implementation.
  129. * @param int $sequence of last write to scan from
  130. * @return string
  131. */
  132. public function readLine($sequence)
  133. {
  134. if (isset($this->_out) && !feof($this->_out)) {
  135. $line = fgets($this->_out);
  136. if (strlen($line)==0) {
  137. $metas = stream_get_meta_data($this->_out);
  138. if ($metas['timed_out']) {
  139. throw new Swift_IoException(
  140. 'Connection to ' .
  141. $this->_getReadConnectionDescription() .
  142. ' Timed Out'
  143. );
  144. }
  145. }
  146. return $line;
  147. }
  148. }
  149. /**
  150. * Reads $length bytes from the stream into a string and moves the pointer
  151. * through the stream by $length. If less bytes exist than are requested the
  152. * remaining bytes are given instead. If no bytes are remaining at all, boolean
  153. * false is returned.
  154. * @param int $length
  155. * @return string
  156. */
  157. public function read($length)
  158. {
  159. if (isset($this->_out) && !feof($this->_out)) {
  160. $ret = fread($this->_out, $length);
  161. if (strlen($ret)==0) {
  162. $metas = stream_get_meta_data($this->_out);
  163. if ($metas['timed_out']) {
  164. throw new Swift_IoException(
  165. 'Connection to ' .
  166. $this->_getReadConnectionDescription() .
  167. ' Timed Out'
  168. );
  169. }
  170. }
  171. return $ret;
  172. }
  173. }
  174. /** Not implemented */
  175. public function setReadPointer($byteOffset)
  176. {
  177. }
  178. // -- Protected methods
  179. /** Flush the stream contents */
  180. protected function _flush()
  181. {
  182. if (isset($this->_in)) {
  183. fflush($this->_in);
  184. }
  185. }
  186. /** Write this bytes to the stream */
  187. protected function _commit($bytes)
  188. {
  189. if (isset($this->_in)
  190. && fwrite($this->_in, $bytes))
  191. {
  192. return ++$this->_sequence;
  193. }
  194. }
  195. // -- Private methods
  196. /**
  197. * Establishes a connection to a remote server.
  198. * @access private
  199. */
  200. private function _establishSocketConnection()
  201. {
  202. $host = $this->_params['host'];
  203. if (!empty($this->_params['protocol'])) {
  204. $host = $this->_params['protocol'] . '://' . $host;
  205. }
  206. $timeout = 15;
  207. if (!empty($this->_params['timeout'])) {
  208. $timeout = $this->_params['timeout'];
  209. }
  210. $options = array();
  211. if (!empty($this->_params['sourceIp'])) {
  212. $options['socket']['bindto']=$this->_params['sourceIp'].':0';
  213. }
  214. $this->_stream = @stream_socket_client($host.':'.$this->_params['port'], $errno, $errstr, $timeout, STREAM_CLIENT_CONNECT, stream_context_create($options));
  215. if (false === $this->_stream) {
  216. throw new Swift_TransportException(
  217. 'Connection could not be established with host ' . $this->_params['host'] .
  218. ' [' . $errstr . ' #' . $errno . ']'
  219. );
  220. }
  221. if (!empty($this->_params['blocking'])) {
  222. stream_set_blocking($this->_stream, 1);
  223. } else {
  224. stream_set_blocking($this->_stream, 0);
  225. }
  226. stream_set_timeout($this->_stream, $timeout);
  227. $this->_in =& $this->_stream;
  228. $this->_out =& $this->_stream;
  229. }
  230. /**
  231. * Opens a process for input/output.
  232. * @access private
  233. */
  234. private function _establishProcessConnection()
  235. {
  236. $command = $this->_params['command'];
  237. $descriptorSpec = array(
  238. 0 => array('pipe', 'r'),
  239. 1 => array('pipe', 'w'),
  240. 2 => array('pipe', 'w')
  241. );
  242. $this->_stream = proc_open($command, $descriptorSpec, $pipes);
  243. stream_set_blocking($pipes[2], 0);
  244. if ($err = stream_get_contents($pipes[2])) {
  245. throw new Swift_TransportException(
  246. 'Process could not be started [' . $err . ']'
  247. );
  248. }
  249. $this->_in =& $pipes[0];
  250. $this->_out =& $pipes[1];
  251. }
  252. private function _getReadConnectionDescription()
  253. {
  254. switch ($this->_params['type']) {
  255. case self::TYPE_PROCESS:
  256. return 'Process '.$this->_params['command'];
  257. break;
  258. case self::TYPE_SOCKET:
  259. default:
  260. $host = $this->_params['host'];
  261. if (!empty($this->_params['protocol'])) {
  262. $host = $this->_params['protocol'] . '://' . $host;
  263. }
  264. $host.=':'.$this->_params['port'];
  265. return $host;
  266. break;
  267. }
  268. }
  269. }