Changeset 2847


Ignore:
Timestamp:
12/01/2011 00:11:53 (17 months ago)
Author:
chris
Message:

Add an implementation of a stream wrapper that limits reading rate, to
control bandwidth usage.

Location:
box/trunk/lib/common
Files:
2 copied

Legend:

Unmodified
Added
Removed
  • box/trunk/lib/common/RateLimitingStream.cpp

    r2732 r2847  
    22// 
    33// File 
    4 //              Name:    BufferedWriteStream.cpp 
    5 //              Purpose: Buffering write-only wrapper around IOStreams 
    6 //              Created: 2010/09/13 
     4//              Name:    RateLimitingStream.cpp 
     5//              Purpose: Rate-limiting write-only wrapper around IOStreams 
     6//              Created: 2011/01/11 
    77// 
    88// -------------------------------------------------------------------------- 
    99 
    1010#include "Box.h" 
    11 #include "BufferedWriteStream.h" 
     11#include "RateLimitingStream.h" 
    1212#include "CommonException.h" 
    1313 
     
    1919// 
    2020// Function 
    21 //              Name:    BufferedWriteStream::BufferedWriteStream(const char *, int, int) 
     21//              Name:    RateLimitingStream::RateLimitingStream(const char *, int, int) 
    2222//              Purpose: Constructor, set up buffer 
    23 //              Created: 2007/01/16 
     23//              Created: 2011/01/11 
    2424// 
    2525// -------------------------------------------------------------------------- 
    26 BufferedWriteStream::BufferedWriteStream(IOStream& rSink) 
    27 : mrSink(rSink), mBufferPosition(0) 
     26RateLimitingStream::RateLimitingStream(IOStream& rSink, size_t targetBytesPerSecond) 
     27: mrSink(rSink), mStartTime(GetCurrentBoxTime()), mTotalBytesRead(0), 
     28  mTargetBytesPerSecond(targetBytesPerSecond) 
    2829{ } 
    29  
    3030 
    3131// -------------------------------------------------------------------------- 
    3232// 
    3333// Function 
    34 //              Name:    BufferedWriteStream::Read(void *, int) 
    35 //              Purpose: Reads bytes from the file - throws exception 
    36 //              Created: 2007/01/16 
     34//              Name:    RateLimitingStream::Read(void *pBuffer, int NBytes, 
     35//                       int Timeout) 
     36//              Purpose: Reads bytes to the underlying stream at no more than 
     37//                       a fixed rate 
     38//              Created: 2011/01/11 
    3739// 
    3840// -------------------------------------------------------------------------- 
    39 int BufferedWriteStream::Read(void *pBuffer, int NBytes, int Timeout) 
     41int RateLimitingStream::Read(void *pBuffer, int NBytes, int Timeout) 
    4042{ 
    41         THROW_EXCEPTION(CommonException, NotSupported); 
     43        if(NBytes > 0 && (size_t)NBytes > mTargetBytesPerSecond) 
     44        { 
     45                // Limit to one second's worth of data for performance 
     46                BOX_TRACE("Reducing read size from " << NBytes << " to " << 
     47                        mTargetBytesPerSecond << " to smooth upload rate"); 
     48                NBytes = mTargetBytesPerSecond; 
     49        } 
     50 
     51        int bytesReadThisTime = mrSink.Read(pBuffer, NBytes, Timeout); 
     52 
     53        // How many bytes we will have written after this write finishes? 
     54        mTotalBytesRead += bytesReadThisTime; 
     55 
     56        // When should it be completed by? 
     57        box_time_t desiredFinishTime = mStartTime + 
     58                SecondsToBoxTime(mTotalBytesRead / mTargetBytesPerSecond); 
     59 
     60        // How long do we have to wait? 
     61        box_time_t currentTime = GetCurrentBoxTime(); 
     62        int64_t waitTime = desiredFinishTime - currentTime; 
     63 
     64        // How are we doing so far? (for logging only) 
     65        box_time_t currentDuration = currentTime - mStartTime; 
     66        uint64_t effectiveRateSoFar = (mTotalBytesRead * MICRO_SEC_IN_SEC_LL) 
     67                / currentDuration; 
     68 
     69        if(waitTime > 0) 
     70        { 
     71                BOX_TRACE("Current rate " << effectiveRateSoFar << 
     72                        " higher than desired rate " << mTargetBytesPerSecond << 
     73                        ", sleeping for " << BoxTimeToMilliSeconds(waitTime) << 
     74                        " ms"); 
     75                ShortSleep(waitTime, false); 
     76        } 
     77        else 
     78        { 
     79                BOX_TRACE("Current rate " << effectiveRateSoFar << 
     80                        " lower than desired rate " << mTargetBytesPerSecond << 
     81                        ", sending immediately (would have sent " << 
     82                        (BoxTimeToMilliSeconds(-waitTime)) << " ms ago)"); 
     83        } 
     84 
     85        return bytesReadThisTime; 
    4286} 
    4387 
    44  
    45 // -------------------------------------------------------------------------- 
    46 // 
    47 // Function 
    48 //              Name:    BufferedWriteStream::BytesLeftToRead() 
    49 //              Purpose: Returns number of bytes to read (may not be most efficient function ever) 
    50 //              Created: 2007/01/16 
    51 // 
    52 // -------------------------------------------------------------------------- 
    53 IOStream::pos_type BufferedWriteStream::BytesLeftToRead() 
    54 { 
    55         THROW_EXCEPTION(CommonException, NotSupported); 
    56 } 
    57  
    58  
    59 // -------------------------------------------------------------------------- 
    60 // 
    61 // Function 
    62 //              Name:    BufferedWriteStream::Write(void *, int) 
    63 //              Purpose: Writes bytes to the underlying stream (not supported) 
    64 //              Created: 2003/07/31 
    65 // 
    66 // -------------------------------------------------------------------------- 
    67 void BufferedWriteStream::Write(const void *pBuffer, int NBytes) 
    68 { 
    69         int numBytesRemain = NBytes; 
    70  
    71         do 
    72         { 
    73                 int maxWritable = sizeof(mBuffer) - mBufferPosition; 
    74                 int numBytesToWrite = (numBytesRemain < maxWritable) ? 
    75                         numBytesRemain : maxWritable; 
    76  
    77                 if(numBytesToWrite > 0) 
    78                 { 
    79                         memcpy(mBuffer + mBufferPosition, pBuffer, 
    80                                 numBytesToWrite); 
    81                         mBufferPosition += numBytesToWrite; 
    82                         pBuffer = ((const char *)pBuffer) + numBytesToWrite; 
    83                         numBytesRemain -= numBytesToWrite; 
    84                 } 
    85  
    86                 if(numBytesRemain > 0) 
    87                 { 
    88                         Flush(); 
    89                 } 
    90         } 
    91         while(numBytesRemain > 0); 
    92 } 
    93  
    94 // -------------------------------------------------------------------------- 
    95 // 
    96 // Function 
    97 //              Name:    BufferedWriteStream::GetPosition() 
    98 //              Purpose: Get position in stream 
    99 //              Created: 2003/08/21 
    100 // 
    101 // -------------------------------------------------------------------------- 
    102 IOStream::pos_type BufferedWriteStream::GetPosition() const 
    103 { 
    104         return mrSink.GetPosition() + mBufferPosition; 
    105 } 
    106  
    107  
    108 // -------------------------------------------------------------------------- 
    109 // 
    110 // Function 
    111 //              Name:    BufferedWriteStream::Seek(pos_type, int) 
    112 //              Purpose: Seeks within file, as lseek, invalidate buffer 
    113 //              Created: 2003/07/31 
    114 // 
    115 // -------------------------------------------------------------------------- 
    116 void BufferedWriteStream::Seek(IOStream::pos_type Offset, int SeekType) 
    117 { 
    118         // Always flush the buffer before seeking 
    119         Flush(); 
    120  
    121         mrSink.Seek(Offset, SeekType); 
    122 } 
    123  
    124 // -------------------------------------------------------------------------- 
    125 // 
    126 // Function 
    127 //              Name:    BufferedWriteStream::Flush(); 
    128 //              Purpose: Write out current buffer contents and invalidate 
    129 //              Created: 2010/09/13 
    130 // 
    131 // -------------------------------------------------------------------------- 
    132 void BufferedWriteStream::Flush(int Timeout) 
    133 { 
    134         if(mBufferPosition > 0) 
    135         { 
    136                 mrSink.Write(mBuffer, mBufferPosition); 
    137         } 
    138  
    139         mBufferPosition = 0; 
    140 } 
    141  
    142 // -------------------------------------------------------------------------- 
    143 // 
    144 // Function 
    145 //              Name:    BufferedWriteStream::Close() 
    146 //              Purpose: Closes the underlying stream (not needed) 
    147 //              Created: 2003/07/31 
    148 // 
    149 // -------------------------------------------------------------------------- 
    150 void BufferedWriteStream::Close() 
    151 { 
    152         Flush(); 
    153         mrSink.Close(); 
    154 } 
    155  
    156 // -------------------------------------------------------------------------- 
    157 // 
    158 // Function 
    159 //              Name:    BufferedWriteStream::StreamDataLeft() 
    160 //              Purpose: Any data left to write? 
    161 //              Created: 2003/08/02 
    162 // 
    163 // -------------------------------------------------------------------------- 
    164 bool BufferedWriteStream::StreamDataLeft() 
    165 { 
    166         THROW_EXCEPTION(CommonException, NotSupported); 
    167 } 
    168  
    169 // -------------------------------------------------------------------------- 
    170 // 
    171 // Function 
    172 //              Name:    BufferedWriteStream::StreamClosed() 
    173 //              Purpose: Is the stream closed? 
    174 //              Created: 2003/08/02 
    175 // 
    176 // -------------------------------------------------------------------------- 
    177 bool BufferedWriteStream::StreamClosed() 
    178 { 
    179         return mrSink.StreamClosed(); 
    180 } 
    181  
  • box/trunk/lib/common/RateLimitingStream.h

    r2728 r2847  
    22// 
    33// File 
    4 //              Name:    BufferedWriteStream.h 
    5 //              Purpose: Buffering write-only wrapper around IOStreams 
    6 //              Created: 2010/09/13 
     4//              Name:    RateLimitingStream.h 
     5//              Purpose: Rate-limiting write-only wrapper around IOStreams 
     6//              Created: 2011/01/11 
    77// 
    88// -------------------------------------------------------------------------- 
    99 
    10 #ifndef BUFFEREDWRITESTREAM__H 
    11 #define BUFFEREDWRITESTREAM__H 
     10#ifndef RATELIMITINGSTREAM__H 
     11#define RATELIMITINGSTREAM__H 
    1212 
     13#include "BoxTime.h" 
    1314#include "IOStream.h" 
    1415 
    15 class BufferedWriteStream : public IOStream 
     16class RateLimitingStream : public IOStream 
    1617{ 
    1718private: 
    1819        IOStream& mrSink; 
    19         char mBuffer[4096]; 
    20         int  mBufferPosition; 
     20        box_time_t mStartTime; 
     21        uint64_t mTotalBytesRead; 
     22        size_t mTargetBytesPerSecond; 
    2123 
    2224public: 
    23         BufferedWriteStream(IOStream& rSource); 
    24         virtual ~BufferedWriteStream() { Close(); } 
    25          
    26         virtual int Read(void *pBuffer, int NBytes, int Timeout = IOStream::TimeOutInfinite); 
    27         virtual pos_type BytesLeftToRead(); 
    28         virtual void Write(const void *pBuffer, int NBytes); 
    29         virtual pos_type GetPosition() const; 
    30         virtual void Seek(IOStream::pos_type Offset, int SeekType); 
    31         virtual void Flush(int Timeout = IOStream::TimeOutInfinite); 
    32         virtual void Close(); 
    33          
    34         virtual bool StreamDataLeft(); 
    35         virtual bool StreamClosed(); 
     25        RateLimitingStream(IOStream& rSink, size_t targetBytesPerSecond); 
     26        virtual ~RateLimitingStream() { } 
     27 
     28        // This is the only magic 
     29        virtual int Read(void *pBuffer, int NBytes, 
     30                int Timeout = IOStream::TimeOutInfinite); 
     31 
     32        // Everything else is delegated to the sink 
     33        virtual void Write(const void *pBuffer, int NBytes) 
     34        { 
     35                Write(pBuffer, NBytes); 
     36        } 
     37        virtual pos_type BytesLeftToRead() 
     38        { 
     39                return mrSink.BytesLeftToRead(); 
     40        } 
     41        virtual pos_type GetPosition() const 
     42        { 
     43                return mrSink.GetPosition(); 
     44        } 
     45        virtual void Seek(IOStream::pos_type Offset, int SeekType) 
     46        { 
     47                mrSink.Seek(Offset, SeekType); 
     48        } 
     49        virtual void Flush(int Timeout = IOStream::TimeOutInfinite) 
     50        { 
     51                mrSink.Flush(Timeout); 
     52        } 
     53        virtual void Close() 
     54        { 
     55                mrSink.Close(); 
     56        } 
     57        virtual bool StreamDataLeft() 
     58        { 
     59                return mrSink.StreamDataLeft(); 
     60        } 
     61        virtual bool StreamClosed() 
     62        { 
     63                return mrSink.StreamClosed(); 
     64        } 
    3665 
    3766private: 
    38         BufferedWriteStream(const BufferedWriteStream &rToCopy)  
     67        RateLimitingStream(const RateLimitingStream &rToCopy)  
    3968        : mrSink(rToCopy.mrSink) { /* do not call */ } 
    4069}; 
    4170 
    42 #endif // BUFFEREDWRITESTREAM__H 
    43  
    44  
     71#endif // RATELIMITINGSTREAM__H 
Note: See TracChangeset for help on using the changeset viewer.