Changeset 2847
- Timestamp:
- 12/01/2011 00:11:53 (17 months ago)
- Location:
- box/trunk/lib/common
- Files:
-
- 2 copied
-
RateLimitingStream.cpp (copied) (copied from box/trunk/lib/common/BufferedWriteStream.cpp) (2 diffs)
-
RateLimitingStream.h (copied) (copied from box/trunk/lib/common/BufferedWriteStream.h) (1 diff)
Legend:
- Unmodified
- Added
- Removed
-
box/trunk/lib/common/RateLimitingStream.cpp
r2732 r2847 2 2 // 3 3 // File 4 // Name: BufferedWriteStream.cpp5 // Purpose: Buffering write-only wrapper around IOStreams6 // Created: 201 0/09/134 // Name: RateLimitingStream.cpp 5 // Purpose: Rate-limiting write-only wrapper around IOStreams 6 // Created: 2011/01/11 7 7 // 8 8 // -------------------------------------------------------------------------- 9 9 10 10 #include "Box.h" 11 #include " BufferedWriteStream.h"11 #include "RateLimitingStream.h" 12 12 #include "CommonException.h" 13 13 … … 19 19 // 20 20 // Function 21 // Name: BufferedWriteStream::BufferedWriteStream(const char *, int, int)21 // Name: RateLimitingStream::RateLimitingStream(const char *, int, int) 22 22 // Purpose: Constructor, set up buffer 23 // Created: 20 07/01/1623 // Created: 2011/01/11 24 24 // 25 25 // -------------------------------------------------------------------------- 26 BufferedWriteStream::BufferedWriteStream(IOStream& rSink) 27 : mrSink(rSink), mBufferPosition(0) 26 RateLimitingStream::RateLimitingStream(IOStream& rSink, size_t targetBytesPerSecond) 27 : mrSink(rSink), mStartTime(GetCurrentBoxTime()), mTotalBytesRead(0), 28 mTargetBytesPerSecond(targetBytesPerSecond) 28 29 { } 29 30 30 31 31 // -------------------------------------------------------------------------- 32 32 // 33 33 // Function 34 // Name: BufferedWriteStream::Read(void *, int) 35 // Purpose: Reads bytes from the file - throws exception 36 // Created: 2007/01/16 34 // Name: RateLimitingStream::Read(void *pBuffer, int NBytes, 35 // int Timeout) 36 // Purpose: Reads bytes to the underlying stream at no more than 37 // a fixed rate 38 // Created: 2011/01/11 37 39 // 38 40 // -------------------------------------------------------------------------- 39 int BufferedWriteStream::Read(void *pBuffer, int NBytes, int Timeout)41 int RateLimitingStream::Read(void *pBuffer, int NBytes, int Timeout) 40 42 { 41 THROW_EXCEPTION(CommonException, NotSupported); 43 if(NBytes > 0 && (size_t)NBytes > mTargetBytesPerSecond) 44 { 45 // Limit to one second's worth of data for performance 46 BOX_TRACE("Reducing read size from " << NBytes << " to " << 47 mTargetBytesPerSecond << " to smooth upload rate"); 48 NBytes = mTargetBytesPerSecond; 49 } 50 51 int bytesReadThisTime = mrSink.Read(pBuffer, NBytes, Timeout); 52 53 // How many bytes we will have written after this write finishes? 54 mTotalBytesRead += bytesReadThisTime; 55 56 // When should it be completed by? 57 box_time_t desiredFinishTime = mStartTime + 58 SecondsToBoxTime(mTotalBytesRead / mTargetBytesPerSecond); 59 60 // How long do we have to wait? 61 box_time_t currentTime = GetCurrentBoxTime(); 62 int64_t waitTime = desiredFinishTime - currentTime; 63 64 // How are we doing so far? (for logging only) 65 box_time_t currentDuration = currentTime - mStartTime; 66 uint64_t effectiveRateSoFar = (mTotalBytesRead * MICRO_SEC_IN_SEC_LL) 67 / currentDuration; 68 69 if(waitTime > 0) 70 { 71 BOX_TRACE("Current rate " << effectiveRateSoFar << 72 " higher than desired rate " << mTargetBytesPerSecond << 73 ", sleeping for " << BoxTimeToMilliSeconds(waitTime) << 74 " ms"); 75 ShortSleep(waitTime, false); 76 } 77 else 78 { 79 BOX_TRACE("Current rate " << effectiveRateSoFar << 80 " lower than desired rate " << mTargetBytesPerSecond << 81 ", sending immediately (would have sent " << 82 (BoxTimeToMilliSeconds(-waitTime)) << " ms ago)"); 83 } 84 85 return bytesReadThisTime; 42 86 } 43 87 44 45 // --------------------------------------------------------------------------46 //47 // Function48 // Name: BufferedWriteStream::BytesLeftToRead()49 // Purpose: Returns number of bytes to read (may not be most efficient function ever)50 // Created: 2007/01/1651 //52 // --------------------------------------------------------------------------53 IOStream::pos_type BufferedWriteStream::BytesLeftToRead()54 {55 THROW_EXCEPTION(CommonException, NotSupported);56 }57 58 59 // --------------------------------------------------------------------------60 //61 // Function62 // Name: BufferedWriteStream::Write(void *, int)63 // Purpose: Writes bytes to the underlying stream (not supported)64 // Created: 2003/07/3165 //66 // --------------------------------------------------------------------------67 void BufferedWriteStream::Write(const void *pBuffer, int NBytes)68 {69 int numBytesRemain = NBytes;70 71 do72 {73 int maxWritable = sizeof(mBuffer) - mBufferPosition;74 int numBytesToWrite = (numBytesRemain < maxWritable) ?75 numBytesRemain : maxWritable;76 77 if(numBytesToWrite > 0)78 {79 memcpy(mBuffer + mBufferPosition, pBuffer,80 numBytesToWrite);81 mBufferPosition += numBytesToWrite;82 pBuffer = ((const char *)pBuffer) + numBytesToWrite;83 numBytesRemain -= numBytesToWrite;84 }85 86 if(numBytesRemain > 0)87 {88 Flush();89 }90 }91 while(numBytesRemain > 0);92 }93 94 // --------------------------------------------------------------------------95 //96 // Function97 // Name: BufferedWriteStream::GetPosition()98 // Purpose: Get position in stream99 // Created: 2003/08/21100 //101 // --------------------------------------------------------------------------102 IOStream::pos_type BufferedWriteStream::GetPosition() const103 {104 return mrSink.GetPosition() + mBufferPosition;105 }106 107 108 // --------------------------------------------------------------------------109 //110 // Function111 // Name: BufferedWriteStream::Seek(pos_type, int)112 // Purpose: Seeks within file, as lseek, invalidate buffer113 // Created: 2003/07/31114 //115 // --------------------------------------------------------------------------116 void BufferedWriteStream::Seek(IOStream::pos_type Offset, int SeekType)117 {118 // Always flush the buffer before seeking119 Flush();120 121 mrSink.Seek(Offset, SeekType);122 }123 124 // --------------------------------------------------------------------------125 //126 // Function127 // Name: BufferedWriteStream::Flush();128 // Purpose: Write out current buffer contents and invalidate129 // Created: 2010/09/13130 //131 // --------------------------------------------------------------------------132 void BufferedWriteStream::Flush(int Timeout)133 {134 if(mBufferPosition > 0)135 {136 mrSink.Write(mBuffer, mBufferPosition);137 }138 139 mBufferPosition = 0;140 }141 142 // --------------------------------------------------------------------------143 //144 // Function145 // Name: BufferedWriteStream::Close()146 // Purpose: Closes the underlying stream (not needed)147 // Created: 2003/07/31148 //149 // --------------------------------------------------------------------------150 void BufferedWriteStream::Close()151 {152 Flush();153 mrSink.Close();154 }155 156 // --------------------------------------------------------------------------157 //158 // Function159 // Name: BufferedWriteStream::StreamDataLeft()160 // Purpose: Any data left to write?161 // Created: 2003/08/02162 //163 // --------------------------------------------------------------------------164 bool BufferedWriteStream::StreamDataLeft()165 {166 THROW_EXCEPTION(CommonException, NotSupported);167 }168 169 // --------------------------------------------------------------------------170 //171 // Function172 // Name: BufferedWriteStream::StreamClosed()173 // Purpose: Is the stream closed?174 // Created: 2003/08/02175 //176 // --------------------------------------------------------------------------177 bool BufferedWriteStream::StreamClosed()178 {179 return mrSink.StreamClosed();180 }181 -
box/trunk/lib/common/RateLimitingStream.h
r2728 r2847 2 2 // 3 3 // File 4 // Name: BufferedWriteStream.h5 // Purpose: Buffering write-only wrapper around IOStreams6 // Created: 201 0/09/134 // Name: RateLimitingStream.h 5 // Purpose: Rate-limiting write-only wrapper around IOStreams 6 // Created: 2011/01/11 7 7 // 8 8 // -------------------------------------------------------------------------- 9 9 10 #ifndef BUFFEREDWRITESTREAM__H11 #define BUFFEREDWRITESTREAM__H10 #ifndef RATELIMITINGSTREAM__H 11 #define RATELIMITINGSTREAM__H 12 12 13 #include "BoxTime.h" 13 14 #include "IOStream.h" 14 15 15 class BufferedWriteStream : public IOStream16 class RateLimitingStream : public IOStream 16 17 { 17 18 private: 18 19 IOStream& mrSink; 19 char mBuffer[4096]; 20 int mBufferPosition; 20 box_time_t mStartTime; 21 uint64_t mTotalBytesRead; 22 size_t mTargetBytesPerSecond; 21 23 22 24 public: 23 BufferedWriteStream(IOStream& rSource); 24 virtual ~BufferedWriteStream() { Close(); } 25 26 virtual int Read(void *pBuffer, int NBytes, int Timeout = IOStream::TimeOutInfinite); 27 virtual pos_type BytesLeftToRead(); 28 virtual void Write(const void *pBuffer, int NBytes); 29 virtual pos_type GetPosition() const; 30 virtual void Seek(IOStream::pos_type Offset, int SeekType); 31 virtual void Flush(int Timeout = IOStream::TimeOutInfinite); 32 virtual void Close(); 33 34 virtual bool StreamDataLeft(); 35 virtual bool StreamClosed(); 25 RateLimitingStream(IOStream& rSink, size_t targetBytesPerSecond); 26 virtual ~RateLimitingStream() { } 27 28 // This is the only magic 29 virtual int Read(void *pBuffer, int NBytes, 30 int Timeout = IOStream::TimeOutInfinite); 31 32 // Everything else is delegated to the sink 33 virtual void Write(const void *pBuffer, int NBytes) 34 { 35 Write(pBuffer, NBytes); 36 } 37 virtual pos_type BytesLeftToRead() 38 { 39 return mrSink.BytesLeftToRead(); 40 } 41 virtual pos_type GetPosition() const 42 { 43 return mrSink.GetPosition(); 44 } 45 virtual void Seek(IOStream::pos_type Offset, int SeekType) 46 { 47 mrSink.Seek(Offset, SeekType); 48 } 49 virtual void Flush(int Timeout = IOStream::TimeOutInfinite) 50 { 51 mrSink.Flush(Timeout); 52 } 53 virtual void Close() 54 { 55 mrSink.Close(); 56 } 57 virtual bool StreamDataLeft() 58 { 59 return mrSink.StreamDataLeft(); 60 } 61 virtual bool StreamClosed() 62 { 63 return mrSink.StreamClosed(); 64 } 36 65 37 66 private: 38 BufferedWriteStream(const BufferedWriteStream &rToCopy)67 RateLimitingStream(const RateLimitingStream &rToCopy) 39 68 : mrSink(rToCopy.mrSink) { /* do not call */ } 40 69 }; 41 70 42 #endif // BUFFEREDWRITESTREAM__H 43 44 71 #endif // RATELIMITINGSTREAM__H
Note: See TracChangeset
for help on using the changeset viewer.
