123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568
/*******************************************************************************

        copyright:      Copyright (c) 2004 Kris Bell. All rights reserved

        license:        BSD style: $(LICENSE)

        version:        Mar 2004: Initial release

        author:         Kris

*******************************************************************************/

module tango.io.device.Conduit;

private import tango.core.Thread,
               tango.core.Exception;

public  import tango.io.model.IConduit;

/*******************************************************************************

        Conduit abstract base-class, implementing interface IConduit.
        Only the conduit-specific read(), write(), detach() and 
        bufferSize() need to be implemented for a concrete conduit 
        implementation. See File for an example.

        Conduits provide virtualized access to external content, and
        represent things like files or Internet connections. Conduits
        expose a pair of streams, are modelled by tango.io.model.IConduit, 
        and are implemented via classes such as File & SocketConduit. 

        Additional kinds of conduit are easy to construct: one either
        subclasses tango.io.device.Conduit, or implements tango.io.model.IConduit.
        A conduit typically reads and writes from/to a Buffer in large
        chunks, typically the entire buffer. Alternatively, one can invoke
        input.read(dst[]) and/or output.write(src[]) directly.

*******************************************************************************/

class Conduit : IConduit
{
        protected Fiber.Scheduler scheduler;            // optional scheduler
        private   uint            duration = -1;        // scheduling timeout

        /***********************************************************************

                Test for asynchronous capability. This will be eligable
                for scheduling where (a) it is created within a fiber and
                (b) there is a scheduler attached to the fiber at the time
                this is invoked.

                Note that fibers may schedule just one outstanding I/O 
                request at a time

        ***********************************************************************/

        this ()
        {
                auto f = Fiber.getThis;
                if (f)
                    scheduler = f.event.scheduler;
        }

        /***********************************************************************

                Clean up when collected. See method detach()

        ***********************************************************************/

        ~this ()
        {
                detach;
        }

        /***********************************************************************
        
                Return the name of this conduit

        ***********************************************************************/

        abstract char[] toString (); 
                     
        /***********************************************************************

                Return a preferred size for buffering conduit I/O

        ***********************************************************************/

        abstract size_t bufferSize ();

        /***********************************************************************

                Read from conduit into a target array. The provided dst 
                will be populated with content from the conduit. 

                Returns the number of bytes read, which may be less than
                requested in dst. Eof is returned whenever an end-of-flow 
                condition arises.

        ***********************************************************************/

        abstract size_t read (void[] dst);

        /***********************************************************************

                Write to conduit from a source array. The provided src
                content will be written to the conduit.

                Returns the number of bytes written from src, which may
                be less than the quantity provided. Eof is returned when 
                an end-of-flow condition arises.

        ***********************************************************************/

        abstract size_t write (void [] src);

        /***********************************************************************

                Disconnect this conduit. Note that this may be invoked
                both explicitly by the user, and implicitly by the GC.
                Be sure to manage multiple detachment requests correctly:
                set a flag, or sentinel value as necessary

        ***********************************************************************/

        abstract void detach ();

        /***********************************************************************

                Set the active timeout period for IO calls (in milliseconds)

        ***********************************************************************/

        final void timeout (uint millisec)
        {
                duration = millisec;
        }

        /***********************************************************************

                Get the active timeout period for IO calls (in milliseconds)

        ***********************************************************************/

        final uint timeout ()
        {
                return duration;
        }

        /***********************************************************************

                Is the conduit alive? Default behaviour returns true

        ***********************************************************************/

        bool isAlive ()
        {
                return true;
        }

        /***********************************************************************
        
                Return the host. This is part of the Stream interface

        ***********************************************************************/

        final IConduit conduit ()
        {
                return this;
        }
                            
        /***********************************************************************

                Emit buffered output or reset buffered input

        ***********************************************************************/

        IOStream flush () 
        {
                return this;
        }

        /***********************************************************************

                Close this conduit
                
                Both input and output are detached, and are no longer usable

        ***********************************************************************/

        void close ()
        {
                this.detach;
        }

        /***********************************************************************

                Return the input stream 

        ***********************************************************************/
        
        final InputStream input ()
        {
                return this;
        }

        /***********************************************************************

                Return the output stream

        ***********************************************************************/
        
        final OutputStream output ()
        {
                return this;
        }

        /***********************************************************************

                Throw an IOException, with the provided message

        ***********************************************************************/

        final void error (char[] msg)
        {
                throw new IOException (msg);
        }

        /***********************************************************************

                Transfer the content of another conduit to this one. Returns
                the dst OutputStream, or throws IOException on failure.

        ***********************************************************************/

        OutputStream copy (InputStream src, size_t max = -1)
        {
                transfer (src, this, max);
                return this;
        }

        /***********************************************************************

                Load the bits from a stream, and return them all in an
                array. The dst array can be provided as an option, which
                will be expanded as necessary to consume the input.

                Returns an array representing the content, and throws
                IOException on error
                
        ***********************************************************************/

        void[] load (size_t max = -1)
        {
                return load (this, max);
        }

        /***********************************************************************
        
                Seek on this stream. Source conduits that don't support
                seeking will throw an IOException

        ***********************************************************************/

        long seek (long offset, Anchor anchor = Anchor.Begin)
        {
                error (this.toString ~ " does not support seek requests");
                return 0;
        }

        /***********************************************************************

                Load the bits from a stream, and return them all in an
                array. The dst array can be provided as an option, which
                will be expanded as necessary to consume input.

                Returns an array representing the content, and throws
                IOException on error
                
        ***********************************************************************/

        static void[] load (InputStream src, size_t max=-1)
        {
                void[]  dst;
                size_t  i,
                        len,
                        chunk;

                if (max != -1)
                    chunk = max;
                else
                   chunk = src.conduit.bufferSize;

                while (len < max)
                      {
                      if (dst.length - len is 0)
                          dst.length = len + chunk;

                      if ((i = src.read (dst[len .. $])) is Eof)
                           break;
                      len += i;
                      }

                return dst [0 .. len];
        }

        /***********************************************************************
                
                Low-level data transfer, where max represents the maximum
                number of bytes to transfer. 

                Returns Eof on failure, number of bytes copied on success

        ***********************************************************************/

        static size_t transfer (InputStream src, OutputStream dst, size_t max=-1)
        {
                byte[8192] tmp;
                size_t     done;

                while (max)
                      {
                      auto len = max;
                      if (len > tmp.length)
                          len = tmp.length;

                      if ((len = src.read(tmp[0 .. len])) is Eof)
                           max = 0;
                      else
                         {
                         max -= len;
                         done += len;
                         auto p = tmp.ptr;
                         for (auto j=0; len > 0; len -= j, p += j)
                              if ((j = dst.write (p[0 .. len])) is Eof)
                                   return Eof;
                         }
                      }

                return done;
        }
}


/*******************************************************************************

        Base class for input stream filtering

*******************************************************************************/

class InputFilter : InputStream
{
        protected InputStream source;

        /***********************************************************************

                Attach to the provided stream. The provided source stream 
                should generally never be null, though some filters have a
                need to set this lazily

        ***********************************************************************/

        this (InputStream source)
        {
                this.source = source;
        }

        /***********************************************************************

                Return the hosting conduit

        ***********************************************************************/

        IConduit conduit ()
        {
                return source.conduit;
        }

        /***********************************************************************

                Read from conduit into a target array. The provided dst 
                will be populated with content from the conduit. 

                Returns the number of bytes read, which may be less than
                requested in dst. Eof is returned whenever an end-of-flow 
                condition arises.

        ***********************************************************************/

        size_t read (void[] dst)
        {
                return source.read (dst);
        }

        /***********************************************************************

                Load the bits from a stream, and return them all in an
                array. The dst array can be provided as an option, which
                will be expanded as necessary to consume the input.

                Returns an array representing the content, and throws
                IOException on error
                              
        ***********************************************************************/

        void[] load (size_t max = -1)
        {
                return Conduit.load (this, max);
        }

        /***********************************************************************

                Clear any buffered content

        ***********************************************************************/

        IOStream flush ()
        {
                source.flush;
                return this;
        }

        /***********************************************************************
        
                Seek on this stream. Target conduits that don't support
                seeking will throw an IOException

        ***********************************************************************/

        long seek (long offset, Anchor anchor = Anchor.Begin)
        {
                return source.seek (offset, anchor);
        }

        /***********************************************************************

                Return the upstream host of this filter
                        
        ***********************************************************************/

        InputStream input ()
        {
                return source;
        }            

        /***********************************************************************

                Close the input

        ***********************************************************************/

        void close ()
        {
                source.close;
        }
}


/*******************************************************************************

         Base class for output stream filtering. The provided sink stream 
                should generally never be null, though some filters have a
                need to set this lazily

*******************************************************************************/

class OutputFilter : OutputStream
{
        protected OutputStream sink;

        /***********************************************************************

                Attach to the provided stream

        ***********************************************************************/

        this (OutputStream sink)
        {
                this.sink = sink;
        }

        /***********************************************************************

                Return the hosting conduit

        ***********************************************************************/

        IConduit conduit ()
        {
                return sink.conduit;
        }

        /***********************************************************************

                Write to conduit from a source array. The provided src
                content will be written to the conduit.

                Returns the number of bytes written from src, which may
                be less than the quantity provided. Eof is returned when 
                an end-of-flow condition arises.

        ***********************************************************************/

        size_t write (void[] src)
        {
                return sink.write (src);
        }

        /***********************************************************************

                Transfer the content of another conduit to this one. Returns
                a reference to this class, or throws IOException on failure.

        ***********************************************************************/

        OutputStream copy (InputStream src, size_t max = -1)
        {
                Conduit.transfer (src, this, max);
                return this;
        }

        /***********************************************************************

                Emit/purge buffered content

        ***********************************************************************/

        IOStream flush ()
        {
                sink.flush;
                return this;
        }

        /***********************************************************************
        
                Seek on this stream. Target conduits that don't support
                seeking will throw an IOException

        ***********************************************************************/

        long seek (long offset, Anchor anchor = Anchor.Begin)
        {
                return sink.seek (offset, anchor);
        }

        /***********************************************************************
        
                Return the upstream host of this filter
                        
        ***********************************************************************/

        OutputStream output ()
        {
                return sink;
        }              

        /***********************************************************************

                Close the output

        ***********************************************************************/

        void close ()
        {
                sink.close;
        }
}