* @author Nicolas Grekas
* @author Chris Corbyn
*
* @internal
*/
abstract class AbstractStream
{
protected $stream;
protected $in;
protected $out;
private string $debug = '';
public function write(string $bytes, bool $debug = true): void
{
if ($debug) {
foreach (explode("\n", trim($bytes)) as $line) {
$this->debug .= sprintf("> %s\n", $line);
}
}
$bytesToWrite = \strlen($bytes);
$totalBytesWritten = 0;
while ($totalBytesWritten < $bytesToWrite) {
$bytesWritten = @fwrite($this->in, substr($bytes, $totalBytesWritten));
if (false === $bytesWritten || 0 === $bytesWritten) {
throw new TransportException('Unable to write bytes on the wire.');
}
$totalBytesWritten += $bytesWritten;
}
}
/**
* Flushes the contents of the stream (empty it) and set the internal pointer to the beginning.
*/
public function flush(): void
{
fflush($this->in);
}
/**
* Performs any initialization needed.
*/
abstract public function initialize(): void;
public function terminate(): void
{
$this->stream = $this->out = $this->in = null;
}
public function readLine(): string
{
if (feof($this->out)) {
return '';
}
$line = fgets($this->out);
if ('' === $line || false === $line) {
$metas = stream_get_meta_data($this->out);
if ($metas['timed_out']) {
throw new TransportException(sprintf('Connection to "%s" timed out.', $this->getReadConnectionDescription()));
}
if ($metas['eof']) {
throw new TransportException(sprintf('Connection to "%s" has been closed unexpectedly.', $this->getReadConnectionDescription()));
}
}
$this->debug .= sprintf('< %s', $line);
return $line;
}
public function getDebug(): string
{
$debug = $this->debug;
$this->debug = '';
return $debug;
}
public static function replace(string $from, string $to, iterable $chunks): \Generator
{
if ('' === $from) {
yield from $chunks;
return;
}
$carry = '';
$fromLen = \strlen($from);
foreach ($chunks as $chunk) {
if ('' === $chunk = $carry.$chunk) {
continue;
}
if (str_contains($chunk, $from)) {
$chunk = explode($from, $chunk);
$carry = array_pop($chunk);
yield implode($to, $chunk).$to;
} else {
$carry = $chunk;
}
if (\strlen($carry) > $fromLen) {
yield substr($carry, 0, -$fromLen);
$carry = substr($carry, -$fromLen);
}
}
if ('' !== $carry) {
yield $carry;
}
}
abstract protected function getReadConnectionDescription(): string;
}