@@ -31,14 +31,14 @@ class BufferDataSource : public DataSource {
3131
3232 const uint8_t * get_buffer (size_t size) override
3333 {
34- (void ) size;
34+ (void )size;
3535 assert (_pos + size <= _size);
3636 return _data + _pos;
3737 }
3838
3939 void release_buffer (const uint8_t * buffer, size_t size) override
4040 {
41- (void ) buffer;
41+ (void )buffer;
4242 assert (buffer == _data + _pos);
4343 _pos += size;
4444 }
@@ -66,28 +66,65 @@ class BufferedStreamDataSource : public DataSource {
6666 const uint8_t * get_buffer (size_t size) override
6767 {
6868 assert (_pos + size <= _size);
69- if (_bufferSize < size) {
70- _buffer.reset (new uint8_t [size]);
71- _bufferSize = size;
69+
70+ // Data that was already read from the stream but not released (e.g. if tcp_write error occured). Otherwise this should be 0.
71+ const size_t stream_read = _streamPos - _pos;
72+
73+ // Min required buffer size: max(requested size, previous stream data already in buffer)
74+ const size_t min_buffer_size = size > stream_read ? size : stream_read;
75+
76+ // Buffer too small?
77+ if (_bufferSize < min_buffer_size) {
78+ uint8_t *new_buffer = new uint8_t [min_buffer_size];
79+ // If stream reading is ahead, than some data is already in the old buffer and needs to be copied to new resized buffer
80+ if (_buffer && stream_read > 0 ) {
81+ memcpy (new_buffer, _buffer.get (), stream_read);
82+ }
83+ _buffer.reset (new_buffer);
84+ _bufferSize = min_buffer_size;
85+ }
86+
87+ // Fetch remaining data from stream
88+ // If error in tcp_write in ClientContext::_write_some() occured earlier and therefore release_buffer was not called last time, than the requested stream data is already in the buffer.
89+ if (size > stream_read) {
90+ // Remaining bytes to read from stream
91+ const size_t stream_rem = size - stream_read;
92+ const size_t cb = _stream.readBytes (reinterpret_cast <char *>(_buffer.get () + stream_read), stream_rem);
93+ assert (cb == stream_rem);
94+ (void )cb;
95+ _streamPos += stream_rem;
7296 }
73- size_t cb = _stream.readBytes (reinterpret_cast <char *>(_buffer.get ()), size);
74- assert (cb == size);
75- (void ) cb;
7697 return _buffer.get ();
98+
7799 }
78100
79101 void release_buffer (const uint8_t * buffer, size_t size) override
80102 {
81- (void ) buffer;
82- _pos += size;
103+ if (size == 0 ) {
104+ return ;
105+ }
106+
107+ (void )buffer;
108+ _pos += size;
109+
110+ // Cannot release more than acquired through get_buffer
111+ assert (_pos <= _streamPos);
112+
113+ // Release less than requested with get_buffer?
114+ if (_pos < _streamPos) {
115+ // Move unreleased stream data in buffer to front
116+ assert (_buffer);
117+ memmove (_buffer.get (), _buffer.get () + size, _streamPos - _pos);
118+ }
83119 }
84120
85121protected:
86- TStream& _stream;
122+ TStream & _stream;
87123 std::unique_ptr<uint8_t []> _buffer;
88124 size_t _size;
89125 size_t _pos = 0 ;
90126 size_t _bufferSize = 0 ;
127+ size_t _streamPos = 0 ;
91128};
92129
93130class ProgmemStream
@@ -104,7 +141,7 @@ class ProgmemStream
104141 size_t will_read = (_left < size) ? _left : size;
105142 memcpy_P ((void *)dst, (PGM_VOID_P)_buf, will_read);
106143 _left -= will_read;
107- _buf += will_read;
144+ _buf += will_read;
108145 return will_read;
109146 }
110147
0 commit comments