StreamBuffer.php 8.1KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337
  1. <?php
  2. /*
  3. * This file is part of SwiftMailer.
  4. * (c) 2004-2009 Chris Corbyn
  5. *
  6. * For the full copyright and license information, please view the LICENSE
  7. * file that was distributed with this source code.
  8. */
  9. /**
  10. * A generic IoBuffer implementation supporting remote sockets and local processes.
  11. * @package Swift
  12. * @subpackage Transport
  13. * @author Chris Corbyn
  14. */
  15. class Swift_Transport_StreamBuffer
  16. extends Swift_ByteStream_AbstractFilterableInputStream
  17. implements Swift_Transport_IoBuffer
  18. {
  19. /** A primary socket */
  20. private $_stream;
  21. /** The input stream */
  22. private $_in;
  23. /** The output stream */
  24. private $_out;
  25. /** Buffer initialization parameters */
  26. private $_params = array();
  27. /** The ReplacementFilterFactory */
  28. private $_replacementFactory;
  29. /** Translations performed on data being streamed into the buffer */
  30. private $_translations = array();
  31. /**
  32. * Create a new StreamBuffer using $replacementFactory for transformations.
  33. * @param Swift_ReplacementFilterFactory $replacementFactory
  34. */
  35. public function __construct(
  36. Swift_ReplacementFilterFactory $replacementFactory)
  37. {
  38. $this->_replacementFactory = $replacementFactory;
  39. }
  40. /**
  41. * Perform any initialization needed, using the given $params.
  42. * Parameters will vary depending upon the type of IoBuffer used.
  43. * @param array $params
  44. */
  45. public function initialize(array $params)
  46. {
  47. $this->_params = $params;
  48. switch ($params['type'])
  49. {
  50. case self::TYPE_PROCESS:
  51. $this->_establishProcessConnection();
  52. break;
  53. case self::TYPE_SOCKET:
  54. default:
  55. $this->_establishSocketConnection();
  56. break;
  57. }
  58. }
  59. /**
  60. * Set an individual param on the buffer (e.g. switching to SSL).
  61. * @param string $param
  62. * @param mixed $value
  63. */
  64. public function setParam($param, $value)
  65. {
  66. if (isset($this->_stream))
  67. {
  68. switch ($param)
  69. {
  70. case 'protocol':
  71. if (!array_key_exists('protocol', $this->_params)
  72. || $value != $this->_params['protocol'])
  73. {
  74. if ('tls' == $value)
  75. {
  76. stream_socket_enable_crypto(
  77. $this->_stream, true, STREAM_CRYPTO_METHOD_TLS_CLIENT
  78. );
  79. }
  80. }
  81. break;
  82. case 'timeout':
  83. if ($this->_stream)
  84. {
  85. stream_set_timeout($this->_stream, $value);
  86. }
  87. break;
  88. case 'blocking':
  89. if ($this->_stream)
  90. {
  91. stream_set_blocking($this->_stream, 1);
  92. }
  93. }
  94. }
  95. $this->_params[$param] = $value;
  96. }
  97. /**
  98. * Perform any shutdown logic needed.
  99. */
  100. public function terminate()
  101. {
  102. if (isset($this->_stream))
  103. {
  104. switch ($this->_params['type'])
  105. {
  106. case self::TYPE_PROCESS:
  107. fclose($this->_in);
  108. fclose($this->_out);
  109. proc_close($this->_stream);
  110. break;
  111. case self::TYPE_SOCKET:
  112. default:
  113. fclose($this->_stream);
  114. break;
  115. }
  116. }
  117. $this->_stream = null;
  118. $this->_out = null;
  119. $this->_in = null;
  120. }
  121. /**
  122. * Set an array of string replacements which should be made on data written
  123. * to the buffer. This could replace LF with CRLF for example.
  124. * @param string[] $replacements
  125. */
  126. public function setWriteTranslations(array $replacements)
  127. {
  128. foreach ($this->_translations as $search => $replace)
  129. {
  130. if (!isset($replacements[$search]))
  131. {
  132. $this->removeFilter($search);
  133. unset($this->_translations[$search]);
  134. }
  135. }
  136. foreach ($replacements as $search => $replace)
  137. {
  138. if (!isset($this->_translations[$search]))
  139. {
  140. $this->addFilter(
  141. $this->_replacementFactory->createFilter($search, $replace), $search
  142. );
  143. $this->_translations[$search] = true;
  144. }
  145. }
  146. }
  147. /**
  148. * Get a line of output (including any CRLF).
  149. * The $sequence number comes from any writes and may or may not be used
  150. * depending upon the implementation.
  151. * @param int $sequence of last write to scan from
  152. * @return string
  153. */
  154. public function readLine($sequence)
  155. {
  156. if (isset($this->_out) && !feof($this->_out))
  157. {
  158. $line = fgets($this->_out);
  159. if (strlen($line)==0)
  160. {
  161. $metas = stream_get_meta_data($this->_out);
  162. if ($metas['timed_out']) {
  163. throw new Swift_IoException(
  164. 'Connection to ' .
  165. $this->_getReadConnectionDescription() .
  166. ' Timed Out'
  167. );
  168. }
  169. }
  170. return $line;
  171. }
  172. }
  173. /**
  174. * Reads $length bytes from the stream into a string and moves the pointer
  175. * through the stream by $length. If less bytes exist than are requested the
  176. * remaining bytes are given instead. If no bytes are remaining at all, boolean
  177. * false is returned.
  178. * @param int $length
  179. * @return string
  180. */
  181. public function read($length)
  182. {
  183. if (isset($this->_out) && !feof($this->_out))
  184. {
  185. $ret = fread($this->_out, $length);
  186. if (strlen($ret)==0)
  187. {
  188. $metas = stream_get_meta_data($this->_out);
  189. if ($metas['timed_out'])
  190. {
  191. throw new Swift_IoException(
  192. 'Connection to ' .
  193. $this->_getReadConnectionDescription() .
  194. ' Timed Out'
  195. );
  196. }
  197. }
  198. return $ret;
  199. }
  200. }
  201. /** Not implemented */
  202. public function setReadPointer($byteOffset)
  203. {
  204. }
  205. // -- Protected methods
  206. /** Flush the stream contents */
  207. protected function _flush()
  208. {
  209. if (isset($this->_in))
  210. {
  211. fflush($this->_in);
  212. }
  213. }
  214. /** Write this bytes to the stream */
  215. protected function _commit($bytes)
  216. {
  217. if (isset($this->_in)
  218. && fwrite($this->_in, $bytes))
  219. {
  220. return ++$this->_sequence;
  221. }
  222. }
  223. // -- Private methods
  224. /**
  225. * Establishes a connection to a remote server.
  226. * @access private
  227. */
  228. private function _establishSocketConnection()
  229. {
  230. $host = $this->_params['host'];
  231. if (!empty($this->_params['protocol']))
  232. {
  233. $host = $this->_params['protocol'] . '://' . $host;
  234. }
  235. $timeout = 15;
  236. if (!empty($this->_params['timeout']))
  237. {
  238. $timeout = $this->_params['timeout'];
  239. }
  240. $options = array();
  241. if (!empty($this->_params['sourceIp']))
  242. {
  243. $options['socket']['bindto']=$this->_params['sourceIp'].':0';
  244. }
  245. if (!$this->_stream = stream_socket_client($host.':'.$this->_params['port'], $errno, $errstr, $timeout, STREAM_CLIENT_CONNECT, stream_context_create($options)))
  246. {
  247. throw new Swift_TransportException(
  248. 'Connection could not be established with host ' . $this->_params['host'] .
  249. ' [' . $errstr . ' #' . $errno . ']'
  250. );
  251. }
  252. if (!empty($this->_params['blocking']))
  253. {
  254. stream_set_blocking($this->_stream, 1);
  255. }
  256. else
  257. {
  258. stream_set_blocking($this->_stream, 0);
  259. }
  260. stream_set_timeout($this->_stream, $timeout);
  261. $this->_in =& $this->_stream;
  262. $this->_out =& $this->_stream;
  263. }
  264. /**
  265. * Opens a process for input/output.
  266. * @access private
  267. */
  268. private function _establishProcessConnection()
  269. {
  270. $command = $this->_params['command'];
  271. $descriptorSpec = array(
  272. 0 => array('pipe', 'r'),
  273. 1 => array('pipe', 'w'),
  274. 2 => array('pipe', 'w')
  275. );
  276. $this->_stream = proc_open($command, $descriptorSpec, $pipes);
  277. stream_set_blocking($pipes[2], 0);
  278. if ($err = stream_get_contents($pipes[2]))
  279. {
  280. throw new Swift_TransportException(
  281. 'Process could not be started [' . $err . ']'
  282. );
  283. }
  284. $this->_in =& $pipes[0];
  285. $this->_out =& $pipes[1];
  286. }
  287. private function _getReadConnectionDescription()
  288. {
  289. switch ($this->_params['type'])
  290. {
  291. case self::TYPE_PROCESS:
  292. return 'Process '.$this->_params['command'];
  293. break;
  294. case self::TYPE_SOCKET:
  295. default:
  296. $host = $this->_params['host'];
  297. if (!empty($this->_params['protocol']))
  298. {
  299. $host = $this->_params['protocol'] . '://' . $host;
  300. }
  301. $host.=':'.$this->_params['port'];
  302. return $host;
  303. break;
  304. }
  305. }
  306. }