123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568 |
|
/*******************************************************************************
copyright: Copyright (c) 2004 Kris Bell. All rights reserved
license: BSD style: $(LICENSE)
version: Mar 2004: Initial release
author: Kris
*******************************************************************************/
module tango.io.device.Conduit;
private import tango.core.Thread,
tango.core.Exception;
public import tango.io.model.IConduit;
/*******************************************************************************
Conduit abstract base-class, implementing interface IConduit.
Only the conduit-specific read(), write(), detach() and
bufferSize() need to be implemented for a concrete conduit
implementation. See File for an example.
Conduits provide virtualized access to external content, and
represent things like files or Internet connections. Conduits
expose a pair of streams, are modelled by tango.io.model.IConduit,
and are implemented via classes such as File & SocketConduit.
Additional kinds of conduit are easy to construct: one either
subclasses tango.io.device.Conduit, or implements tango.io.model.IConduit.
A conduit typically reads and writes from/to a Buffer in large
chunks, typically the entire buffer. Alternatively, one can invoke
input.read(dst[]) and/or output.write(src[]) directly.
*******************************************************************************/
class Conduit : IConduit
{
protected Fiber.Scheduler scheduler; // optional scheduler
private uint duration = -1; // scheduling timeout
/***********************************************************************
Test for asynchronous capability. This will be eligable
for scheduling where (a) it is created within a fiber and
(b) there is a scheduler attached to the fiber at the time
this is invoked.
Note that fibers may schedule just one outstanding I/O
request at a time
***********************************************************************/
this ()
{
auto f = Fiber.getThis;
if (f)
scheduler = f.event.scheduler;
}
/***********************************************************************
Clean up when collected. See method detach()
***********************************************************************/
~this ()
{
detach;
}
/***********************************************************************
Return the name of this conduit
***********************************************************************/
abstract char[] toString ();
/***********************************************************************
Return a preferred size for buffering conduit I/O
***********************************************************************/
abstract size_t bufferSize ();
/***********************************************************************
Read from conduit into a target array. The provided dst
will be populated with content from the conduit.
Returns the number of bytes read, which may be less than
requested in dst. Eof is returned whenever an end-of-flow
condition arises.
***********************************************************************/
abstract size_t read (void[] dst);
/***********************************************************************
Write to conduit from a source array. The provided src
content will be written to the conduit.
Returns the number of bytes written from src, which may
be less than the quantity provided. Eof is returned when
an end-of-flow condition arises.
***********************************************************************/
abstract size_t write (void [] src);
/***********************************************************************
Disconnect this conduit. Note that this may be invoked
both explicitly by the user, and implicitly by the GC.
Be sure to manage multiple detachment requests correctly:
set a flag, or sentinel value as necessary
***********************************************************************/
abstract void detach ();
/***********************************************************************
Set the active timeout period for IO calls (in milliseconds)
***********************************************************************/
final void timeout (uint millisec)
{
duration = millisec;
}
/***********************************************************************
Get the active timeout period for IO calls (in milliseconds)
***********************************************************************/
final uint timeout ()
{
return duration;
}
/***********************************************************************
Is the conduit alive? Default behaviour returns true
***********************************************************************/
bool isAlive ()
{
return true;
}
/***********************************************************************
Return the host. This is part of the Stream interface
***********************************************************************/
final IConduit conduit ()
{
return this;
}
/***********************************************************************
Emit buffered output or reset buffered input
***********************************************************************/
IOStream flush ()
{
return this;
}
/***********************************************************************
Close this conduit
Both input and output are detached, and are no longer usable
***********************************************************************/
void close ()
{
this.detach;
}
/***********************************************************************
Return the input stream
***********************************************************************/
final InputStream input ()
{
return this;
}
/***********************************************************************
Return the output stream
***********************************************************************/
final OutputStream output ()
{
return this;
}
/***********************************************************************
Throw an IOException, with the provided message
***********************************************************************/
final void error (char[] msg)
{
throw new IOException (msg);
}
/***********************************************************************
Transfer the content of another conduit to this one. Returns
the dst OutputStream, or throws IOException on failure.
***********************************************************************/
OutputStream copy (InputStream src, size_t max = -1)
{
transfer (src, this, max);
return this;
}
/***********************************************************************
Load the bits from a stream, and return them all in an
array. The dst array can be provided as an option, which
will be expanded as necessary to consume the input.
Returns an array representing the content, and throws
IOException on error
***********************************************************************/
void[] load (size_t max = -1)
{
return load (this, max);
}
/***********************************************************************
Seek on this stream. Source conduits that don't support
seeking will throw an IOException
***********************************************************************/
long seek (long offset, Anchor anchor = Anchor.Begin)
{
error (this.toString ~ " does not support seek requests");
return 0;
}
/***********************************************************************
Load the bits from a stream, and return them all in an
array. The dst array can be provided as an option, which
will be expanded as necessary to consume input.
Returns an array representing the content, and throws
IOException on error
***********************************************************************/
static void[] load (InputStream src, size_t max=-1)
{
void[] dst;
size_t i,
len,
chunk;
if (max != -1)
chunk = max;
else
chunk = src.conduit.bufferSize;
while (len < max)
{
if (dst.length - len is 0)
dst.length = len + chunk;
if ((i = src.read (dst[len .. $])) is Eof)
break;
len += i;
}
return dst [0 .. len];
}
/***********************************************************************
Low-level data transfer, where max represents the maximum
number of bytes to transfer.
Returns Eof on failure, number of bytes copied on success
***********************************************************************/
static size_t transfer (InputStream src, OutputStream dst, size_t max=-1)
{
byte[8192] tmp;
size_t done;
while (max)
{
auto len = max;
if (len > tmp.length)
len = tmp.length;
if ((len = src.read(tmp[0 .. len])) is Eof)
max = 0;
else
{
max -= len;
done += len;
auto p = tmp.ptr;
for (auto j=0; len > 0; len -= j, p += j)
if ((j = dst.write (p[0 .. len])) is Eof)
return Eof;
}
}
return done;
}
}
/*******************************************************************************
Base class for input stream filtering
*******************************************************************************/
class InputFilter : InputStream
{
protected InputStream source;
/***********************************************************************
Attach to the provided stream. The provided source stream
should generally never be null, though some filters have a
need to set this lazily
***********************************************************************/
this (InputStream source)
{
this.source = source;
}
/***********************************************************************
Return the hosting conduit
***********************************************************************/
IConduit conduit ()
{
return source.conduit;
}
/***********************************************************************
Read from conduit into a target array. The provided dst
will be populated with content from the conduit.
Returns the number of bytes read, which may be less than
requested in dst. Eof is returned whenever an end-of-flow
condition arises.
***********************************************************************/
size_t read (void[] dst)
{
return source.read (dst);
}
/***********************************************************************
Load the bits from a stream, and return them all in an
array. The dst array can be provided as an option, which
will be expanded as necessary to consume the input.
Returns an array representing the content, and throws
IOException on error
***********************************************************************/
void[] load (size_t max = -1)
{
return Conduit.load (this, max);
}
/***********************************************************************
Clear any buffered content
***********************************************************************/
IOStream flush ()
{
source.flush;
return this;
}
/***********************************************************************
Seek on this stream. Target conduits that don't support
seeking will throw an IOException
***********************************************************************/
long seek (long offset, Anchor anchor = Anchor.Begin)
{
return source.seek (offset, anchor);
}
/***********************************************************************
Return the upstream host of this filter
***********************************************************************/
InputStream input ()
{
return source;
}
/***********************************************************************
Close the input
***********************************************************************/
void close ()
{
source.close;
}
}
/*******************************************************************************
Base class for output stream filtering. The provided sink stream
should generally never be null, though some filters have a
need to set this lazily
*******************************************************************************/
class OutputFilter : OutputStream
{
protected OutputStream sink;
/***********************************************************************
Attach to the provided stream
***********************************************************************/
this (OutputStream sink)
{
this.sink = sink;
}
/***********************************************************************
Return the hosting conduit
***********************************************************************/
IConduit conduit ()
{
return sink.conduit;
}
/***********************************************************************
Write to conduit from a source array. The provided src
content will be written to the conduit.
Returns the number of bytes written from src, which may
be less than the quantity provided. Eof is returned when
an end-of-flow condition arises.
***********************************************************************/
size_t write (void[] src)
{
return sink.write (src);
}
/***********************************************************************
Transfer the content of another conduit to this one. Returns
a reference to this class, or throws IOException on failure.
***********************************************************************/
OutputStream copy (InputStream src, size_t max = -1)
{
Conduit.transfer (src, this, max);
return this;
}
/***********************************************************************
Emit/purge buffered content
***********************************************************************/
IOStream flush ()
{
sink.flush;
return this;
}
/***********************************************************************
Seek on this stream. Target conduits that don't support
seeking will throw an IOException
***********************************************************************/
long seek (long offset, Anchor anchor = Anchor.Begin)
{
return sink.seek (offset, anchor);
}
/***********************************************************************
Return the upstream host of this filter
***********************************************************************/
OutputStream output ()
{
return sink;
}
/***********************************************************************
Close the output
***********************************************************************/
void close ()
{
sink.close;
}
}
|