StreamBuffer.php 7.9KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330
  1. <?php
  2. /*
  3. * This file is part of SwiftMailer.
  4. * (c) 2004-2009 Chris Corbyn
  5. *
  6. * For the full copyright and license information, please view the LICENSE
  7. * file that was distributed with this source code.
  8. */
  9. /**
  10. * A generic IoBuffer implementation supporting remote sockets and local processes.
  11. * @package Swift
  12. * @subpackage Transport
  13. * @author Chris Corbyn
  14. */
  15. class Swift_Transport_StreamBuffer
  16. extends Swift_ByteStream_AbstractFilterableInputStream
  17. implements Swift_Transport_IoBuffer
  18. {
  19. /** A primary socket */
  20. private $_stream;
  21. /** The input stream */
  22. private $_in;
  23. /** The output stream */
  24. private $_out;
  25. /** Buffer initialization parameters */
  26. private $_params = array();
  27. /** The ReplacementFilterFactory */
  28. private $_replacementFactory;
  29. /** Translations performed on data being streamed into the buffer */
  30. private $_translations = array();
  31. /**
  32. * Create a new StreamBuffer using $replacementFactory for transformations.
  33. * @param Swift_ReplacementFilterFactory $replacementFactory
  34. */
  35. public function __construct(
  36. Swift_ReplacementFilterFactory $replacementFactory)
  37. {
  38. $this->_replacementFactory = $replacementFactory;
  39. }
  40. /**
  41. * Perform any initialization needed, using the given $params.
  42. * Parameters will vary depending upon the type of IoBuffer used.
  43. * @param array $params
  44. */
  45. public function initialize(array $params)
  46. {
  47. $this->_params = $params;
  48. switch ($params['type'])
  49. {
  50. case self::TYPE_PROCESS:
  51. $this->_establishProcessConnection();
  52. break;
  53. case self::TYPE_SOCKET:
  54. default:
  55. $this->_establishSocketConnection();
  56. break;
  57. }
  58. }
  59. /**
  60. * Set an individual param on the buffer (e.g. switching to SSL).
  61. * @param string $param
  62. * @param mixed $value
  63. */
  64. public function setParam($param, $value)
  65. {
  66. if (isset($this->_stream))
  67. {
  68. switch ($param)
  69. {
  70. case 'timeout':
  71. if ($this->_stream)
  72. {
  73. stream_set_timeout($this->_stream, $value);
  74. }
  75. break;
  76. case 'blocking':
  77. if ($this->_stream)
  78. {
  79. stream_set_blocking($this->_stream, 1);
  80. }
  81. }
  82. }
  83. $this->_params[$param] = $value;
  84. }
  85. public function startTLS()
  86. {
  87. return stream_socket_enable_crypto($this->_stream, true, STREAM_CRYPTO_METHOD_TLS_CLIENT);
  88. }
  89. /**
  90. * Perform any shutdown logic needed.
  91. */
  92. public function terminate()
  93. {
  94. if (isset($this->_stream))
  95. {
  96. switch ($this->_params['type'])
  97. {
  98. case self::TYPE_PROCESS:
  99. fclose($this->_in);
  100. fclose($this->_out);
  101. proc_close($this->_stream);
  102. break;
  103. case self::TYPE_SOCKET:
  104. default:
  105. fclose($this->_stream);
  106. break;
  107. }
  108. }
  109. $this->_stream = null;
  110. $this->_out = null;
  111. $this->_in = null;
  112. }
  113. /**
  114. * Set an array of string replacements which should be made on data written
  115. * to the buffer. This could replace LF with CRLF for example.
  116. * @param string[] $replacements
  117. */
  118. public function setWriteTranslations(array $replacements)
  119. {
  120. foreach ($this->_translations as $search => $replace)
  121. {
  122. if (!isset($replacements[$search]))
  123. {
  124. $this->removeFilter($search);
  125. unset($this->_translations[$search]);
  126. }
  127. }
  128. foreach ($replacements as $search => $replace)
  129. {
  130. if (!isset($this->_translations[$search]))
  131. {
  132. $this->addFilter(
  133. $this->_replacementFactory->createFilter($search, $replace), $search
  134. );
  135. $this->_translations[$search] = true;
  136. }
  137. }
  138. }
  139. /**
  140. * Get a line of output (including any CRLF).
  141. * The $sequence number comes from any writes and may or may not be used
  142. * depending upon the implementation.
  143. * @param int $sequence of last write to scan from
  144. * @return string
  145. */
  146. public function readLine($sequence)
  147. {
  148. if (isset($this->_out) && !feof($this->_out))
  149. {
  150. $line = fgets($this->_out);
  151. if (strlen($line)==0)
  152. {
  153. $metas = stream_get_meta_data($this->_out);
  154. if ($metas['timed_out']) {
  155. throw new Swift_IoException(
  156. 'Connection to ' .
  157. $this->_getReadConnectionDescription() .
  158. ' Timed Out'
  159. );
  160. }
  161. }
  162. return $line;
  163. }
  164. }
  165. /**
  166. * Reads $length bytes from the stream into a string and moves the pointer
  167. * through the stream by $length. If less bytes exist than are requested the
  168. * remaining bytes are given instead. If no bytes are remaining at all, boolean
  169. * false is returned.
  170. * @param int $length
  171. * @return string
  172. */
  173. public function read($length)
  174. {
  175. if (isset($this->_out) && !feof($this->_out))
  176. {
  177. $ret = fread($this->_out, $length);
  178. if (strlen($ret)==0)
  179. {
  180. $metas = stream_get_meta_data($this->_out);
  181. if ($metas['timed_out'])
  182. {
  183. throw new Swift_IoException(
  184. 'Connection to ' .
  185. $this->_getReadConnectionDescription() .
  186. ' Timed Out'
  187. );
  188. }
  189. }
  190. return $ret;
  191. }
  192. }
  193. /** Not implemented */
  194. public function setReadPointer($byteOffset)
  195. {
  196. }
  197. // -- Protected methods
  198. /** Flush the stream contents */
  199. protected function _flush()
  200. {
  201. if (isset($this->_in))
  202. {
  203. fflush($this->_in);
  204. }
  205. }
  206. /** Write this bytes to the stream */
  207. protected function _commit($bytes)
  208. {
  209. if (isset($this->_in)
  210. && fwrite($this->_in, $bytes))
  211. {
  212. return ++$this->_sequence;
  213. }
  214. }
  215. // -- Private methods
  216. /**
  217. * Establishes a connection to a remote server.
  218. * @access private
  219. */
  220. private function _establishSocketConnection()
  221. {
  222. $host = $this->_params['host'];
  223. if (!empty($this->_params['protocol']))
  224. {
  225. $host = $this->_params['protocol'] . '://' . $host;
  226. }
  227. $timeout = 15;
  228. if (!empty($this->_params['timeout']))
  229. {
  230. $timeout = $this->_params['timeout'];
  231. }
  232. $options = array();
  233. if (!empty($this->_params['sourceIp']))
  234. {
  235. $options['socket']['bindto']=$this->_params['sourceIp'].':0';
  236. }
  237. $this->_stream = @stream_socket_client($host.':'.$this->_params['port'], $errno, $errstr, $timeout, STREAM_CLIENT_CONNECT, stream_context_create($options));
  238. if (false === $this->_stream)
  239. {
  240. throw new Swift_TransportException(
  241. 'Connection could not be established with host ' . $this->_params['host'] .
  242. ' [' . $errstr . ' #' . $errno . ']'
  243. );
  244. }
  245. if (!empty($this->_params['blocking']))
  246. {
  247. stream_set_blocking($this->_stream, 1);
  248. }
  249. else
  250. {
  251. stream_set_blocking($this->_stream, 0);
  252. }
  253. stream_set_timeout($this->_stream, $timeout);
  254. $this->_in =& $this->_stream;
  255. $this->_out =& $this->_stream;
  256. }
  257. /**
  258. * Opens a process for input/output.
  259. * @access private
  260. */
  261. private function _establishProcessConnection()
  262. {
  263. $command = $this->_params['command'];
  264. $descriptorSpec = array(
  265. 0 => array('pipe', 'r'),
  266. 1 => array('pipe', 'w'),
  267. 2 => array('pipe', 'w')
  268. );
  269. $this->_stream = proc_open($command, $descriptorSpec, $pipes);
  270. stream_set_blocking($pipes[2], 0);
  271. if ($err = stream_get_contents($pipes[2]))
  272. {
  273. throw new Swift_TransportException(
  274. 'Process could not be started [' . $err . ']'
  275. );
  276. }
  277. $this->_in =& $pipes[0];
  278. $this->_out =& $pipes[1];
  279. }
  280. private function _getReadConnectionDescription()
  281. {
  282. switch ($this->_params['type'])
  283. {
  284. case self::TYPE_PROCESS:
  285. return 'Process '.$this->_params['command'];
  286. break;
  287. case self::TYPE_SOCKET:
  288. default:
  289. $host = $this->_params['host'];
  290. if (!empty($this->_params['protocol']))
  291. {
  292. $host = $this->_params['protocol'] . '://' . $host;
  293. }
  294. $host.=':'.$this->_params['port'];
  295. return $host;
  296. break;
  297. }
  298. }
  299. }