-
Notifications
You must be signed in to change notification settings - Fork 1.5k
/
syncstream
385 lines (310 loc) · 12.3 KB
/
syncstream
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
// syncstream standard header
// Copyright (c) Microsoft Corporation.
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
#ifndef _SYNCSTREAM_
#define _SYNCSTREAM_
#include <yvals.h>
#if _STL_COMPILER_PREPROCESSOR
#if !_HAS_CXX20
_EMIT_STL_WARNING(STL4038, "The contents of <syncstream> are available only with C++20 or later.");
#else // ^^^ !_HAS_CXX20 / _HAS_CXX20 vvv
#include <memory>
#include <ostream>
#include <shared_mutex>
#include <streambuf>
#pragma pack(push, _CRT_PACKING)
#pragma warning(push, _STL_WARNING_LEVEL)
#pragma warning(disable : _STL_DISABLED_WARNINGS)
_STL_DISABLE_CLANG_WARNINGS
#pragma push_macro("new")
#pragma push_macro("emit")
#undef new
#undef emit
extern "C" {
_NODISCARD _STD shared_mutex* __stdcall __std_acquire_shared_mutex_for_instance(void* _Ptr) noexcept;
void __stdcall __std_release_shared_mutex_for_instance(void* _Ptr) noexcept;
} // extern "C"
_STD_BEGIN
template <class _Elem, class _Traits>
class _Basic_syncbuf_impl : public basic_streambuf<_Elem, _Traits> {
public:
void set_emit_on_sync(const bool _Val) noexcept {
_Emit_on_sync = _Val;
}
virtual bool _Do_emit() = 0;
#ifdef _ENABLE_STL_INTERNAL_CHECK
_NODISCARD bool _Stl_internal_check_get_emit_on_sync() const noexcept {
return _Emit_on_sync;
}
#endif // _ENABLE_STL_INTERNAL_CHECK
protected:
using _Mysb = basic_streambuf<_Elem, _Traits>;
_Basic_syncbuf_impl() = default;
_Basic_syncbuf_impl(_Basic_syncbuf_impl&& _Right) {
_Swap(_Right);
}
void _Swap(_Basic_syncbuf_impl& _Right) noexcept {
_Mysb::swap(_Right);
_STD swap(_Emit_on_sync, _Right._Emit_on_sync);
_STD swap(_Sync_recorded, _Right._Sync_recorded);
}
bool _Emit_on_sync{false};
bool _Sync_recorded{false};
};
_EXPORT_STD template <class _Elem, class _Traits, class _Alloc>
class basic_syncbuf : public _Basic_syncbuf_impl<_Elem, _Traits> {
public:
using int_type = _Traits::int_type;
using pos_type = _Traits::pos_type;
using off_type = _Traits::off_type;
using allocator_type = _Alloc;
using streambuf_type = basic_streambuf<_Elem, _Traits>;
using _Mybase = _Basic_syncbuf_impl<_Elem, _Traits>;
using _Pointer = allocator_traits<_Alloc>::pointer;
using _Size_type = allocator_traits<_Alloc>::size_type;
using _Mybase::set_emit_on_sync;
basic_syncbuf() = default;
explicit basic_syncbuf(streambuf_type* _Strbuf) : basic_syncbuf(_Strbuf, _Alloc{}) {}
basic_syncbuf(streambuf_type* _Strbuf, const _Alloc& _Al_)
: _Wrapped(_Strbuf), _Mypair{_One_then_variadic_args_t{}, _Al_, nullptr} {
if (_Wrapped) {
auto& _Mutex = _Get_mutex();
_Mutex = __std_acquire_shared_mutex_for_instance(_Wrapped);
if (!_Mutex) {
_Xbad_alloc();
}
}
_Init();
}
basic_syncbuf(basic_syncbuf&& _Right) : _Mypair{_One_then_variadic_args_t{}, _STD move(_Right._Getal()), nullptr} {
_Swap_except_al(_Right);
}
~basic_syncbuf() {
_Emit();
_Tidy();
}
basic_syncbuf& operator=(basic_syncbuf&& _Right) {
emit();
if (this == _STD addressof(_Right)) {
return *this;
}
auto& _Al = _Getal();
auto& _Right_al = _Right._Getal();
constexpr auto _Pocma_val = _Choose_pocma_v<_Alloc>;
if constexpr (_Pocma_val == _Pocma_values::_No_propagate_allocators) {
if (_Al != _Right_al) {
_Tidy();
_Size_type _Right_buf_size = _Right._Get_buffer_size();
const _Size_type _Right_data_size = _Right._Get_data_size();
_Elem* const _New_ptr = _Unfancy(_Allocate_at_least_helper(_Al, _Right_buf_size));
_Traits::copy(_New_ptr, _Right.pbase(), _Right_data_size);
streambuf_type::setp(_New_ptr, _New_ptr + _Right_data_size, _New_ptr + _Right_buf_size);
_STD swap(streambuf_type::_Plocale, _Right._Plocale);
_STD swap(_Mybase::_Emit_on_sync, _Right._Emit_on_sync);
_STD swap(_Mybase::_Sync_recorded, _Right._Sync_recorded);
_STD swap(_Wrapped, _Right._Wrapped);
_STD swap(_Get_mutex(), _Right._Get_mutex());
_Right._Tidy();
return *this;
}
}
_Tidy();
_Pocma(_Al, _Right_al);
_Swap_except_al(_Right);
return *this;
}
void swap(basic_syncbuf& _Right) noexcept /* strengthened */ {
if (this != _STD addressof(_Right)) {
_Pocs(_Getal(), _Right._Getal());
_Swap_except_al(_Right);
}
}
bool emit() {
if (!_Wrapped) {
return false;
}
bool _Result = true;
const _Size_type _Data_size = _Get_data_size();
_Elem* const _Begin_seq_ptr = streambuf_type::pbase();
if (_Data_size > 0 || _Mybase::_Sync_recorded) {
scoped_lock _Guard(*_Get_mutex());
if (_Data_size > 0
&& _Data_size
!= static_cast<_Size_type>(
_Wrapped->sputn(_Begin_seq_ptr, static_cast<streamsize>(_Data_size)))) {
_Result = false;
}
if (_Mybase::_Sync_recorded) {
if (_Wrapped->pubsync() == -1) {
_Result = false;
}
}
}
_Mybase::_Sync_recorded = false;
streambuf_type::setp(_Begin_seq_ptr, streambuf_type::epptr()); // reset written data
return _Result;
}
_NODISCARD streambuf_type* get_wrapped() const noexcept {
return _Wrapped;
}
_NODISCARD allocator_type get_allocator() const noexcept {
return _Mypair._Get_first();
}
protected:
int sync() override {
_Mybase::_Sync_recorded = true;
if (_Mybase::_Emit_on_sync) {
if (!emit()) {
return -1;
}
}
return 0;
}
int_type overflow(int_type _Current_elem) override {
if (!_Wrapped) {
return _Traits::eof();
}
const bool _Chk_eof = _Traits::eq_int_type(_Current_elem, _Traits::eof());
if (_Chk_eof) {
return _Traits::not_eof(_Current_elem);
}
auto& _Al = _Getal();
const _Size_type _Buf_size = _Get_buffer_size();
const _Size_type _Max_allocation = allocator_traits<_Alloc>::max_size(_Al);
if (_Buf_size == _Max_allocation) {
return _Traits::eof();
}
_Size_type _New_capacity = _Calculate_growth(_Buf_size, _Buf_size + 1, _Max_allocation);
_Elem* const _Old_ptr = streambuf_type::pbase();
const _Size_type _Old_data_size = _Get_data_size();
_Elem* const _New_ptr = _Unfancy(_Allocate_at_least_helper(_Al, _New_capacity));
_Traits::copy(_New_ptr, _Old_ptr, _Old_data_size);
if (0 < _Buf_size) {
_Al.deallocate(_Refancy<_Pointer>(_Old_ptr), _Buf_size);
}
streambuf_type::setp(_New_ptr, _New_ptr + _Old_data_size, _New_ptr + _New_capacity);
streambuf_type::sputc(_Traits::to_char_type(_Current_elem));
return _Current_elem;
}
private:
static constexpr _Size_type _Min_size = 32; // constant for minimum buffer size
void _Init() {
_Size_type _New_capacity = _Min_size;
_Elem* const _New_ptr = _Unfancy(_Allocate_at_least_helper(_Getal(), _New_capacity));
streambuf_type::setp(_New_ptr, _New_ptr + _New_capacity);
}
void _Tidy() noexcept {
const _Size_type _Buf_size = _Get_buffer_size();
if (0 < _Buf_size) {
_Getal().deallocate(_Refancy<_Pointer>(streambuf_type::pbase()), _Buf_size);
}
streambuf_type::setp(nullptr, nullptr, nullptr);
if (_Wrapped) {
__std_release_shared_mutex_for_instance(_Wrapped);
_Wrapped = nullptr;
_Get_mutex() = nullptr;
}
}
void _Swap_except_al(basic_syncbuf& _Right) noexcept {
_Mybase::_Swap(_Right);
_STD swap(_Wrapped, _Right._Wrapped);
_STD swap(_Get_mutex(), _Right._Get_mutex());
}
bool _Do_emit() override {
return emit();
}
bool _Emit() noexcept {
_TRY_BEGIN
return emit();
_CATCH_ALL
return false;
_CATCH_END
}
_NODISCARD static constexpr _Size_type _Calculate_growth(
const _Size_type _Oldsize, const _Size_type _Newsize, const _Size_type _Maxsize) {
if (_Oldsize > _Maxsize - _Oldsize / 2) {
return _Maxsize; // geometric growth would overflow
}
const _Size_type _Geometric = _Oldsize + _Oldsize / 2;
if (_Geometric < _Newsize) {
return _Newsize; // geometric growth would be insufficient
}
return _Geometric; // geometric growth is sufficient
}
_NODISCARD _Size_type _Get_data_size() const noexcept {
return static_cast<_Size_type>(streambuf_type::pptr() - streambuf_type::pbase());
}
_NODISCARD _Size_type _Get_buffer_size() const noexcept {
return static_cast<_Size_type>(streambuf_type::epptr() - streambuf_type::pbase());
}
_NODISCARD _Alloc& _Getal() noexcept {
return _Mypair._Get_first();
}
_NODISCARD shared_mutex*& _Get_mutex() noexcept {
return _Mypair._Myval2;
}
streambuf_type* _Wrapped{nullptr};
_Compressed_pair<_Alloc, shared_mutex*> _Mypair{_Zero_then_variadic_args_t{}, nullptr};
};
_EXPORT_STD template <class _Elem, class _Traits, class _Alloc>
void swap(basic_syncbuf<_Elem, _Traits, _Alloc>& _Left, basic_syncbuf<_Elem, _Traits, _Alloc>& _Right) noexcept
/* strengthened */ {
_Left.swap(_Right);
}
_EXPORT_STD template <class _Elem, class _Traits, class _Alloc>
class basic_osyncstream : public basic_ostream<_Elem, _Traits> {
public:
using char_type = _Elem;
using int_type = _Traits::int_type;
using pos_type = _Traits::pos_type;
using off_type = _Traits::off_type;
using traits_type = _Traits;
using allocator_type = _Alloc;
using streambuf_type = basic_streambuf<_Elem, _Traits>;
using syncbuf_type = basic_syncbuf<_Elem, _Traits, _Alloc>;
using _Mybase = basic_ostream<_Elem, _Traits>;
using _Myios = basic_ios<_Elem, _Traits>;
basic_osyncstream(streambuf_type* _Strbuf, const _Alloc& _Al)
: _Mybase(_STD addressof(_Sync_buf)), _Sync_buf(_Strbuf, _Al) {}
explicit basic_osyncstream(streambuf_type* _Strbuf) : basic_osyncstream(_Strbuf, _Alloc{}) {}
basic_osyncstream(basic_ostream<_Elem, _Traits>& _Ostr, const _Alloc& _Al)
: basic_osyncstream(_Ostr.rdbuf(), _Al) {}
explicit basic_osyncstream(basic_ostream<_Elem, _Traits>& _Ostr) : basic_osyncstream(_Ostr, _Alloc{}) {}
basic_osyncstream(basic_osyncstream&& _Right) noexcept
: _Mybase(_STD move(_Right)), _Sync_buf(_STD move(_Right._Sync_buf)) {
_Mybase::set_rdbuf(_STD addressof(_Sync_buf));
}
~basic_osyncstream() = default;
basic_osyncstream& operator=(basic_osyncstream&&) = default;
void emit() {
ios_base::iostate _State = ios_base::goodbit;
const typename _Mybase::sentry _Ok(*this);
if (!_Ok) {
_State |= ios_base::badbit;
} else {
_TRY_IO_BEGIN
if (!_Sync_buf.emit()) {
_State |= ios_base::badbit;
}
_CATCH_IO_END
}
_Mybase::setstate(_State);
}
_NODISCARD streambuf_type* get_wrapped() const noexcept {
return _Sync_buf.get_wrapped();
}
_NODISCARD syncbuf_type* rdbuf() const noexcept {
return const_cast<syncbuf_type*>(_STD addressof(_Sync_buf));
}
private:
syncbuf_type _Sync_buf;
};
_STD_END
#pragma pop_macro("emit")
#pragma pop_macro("new")
_STL_RESTORE_CLANG_WARNINGS
#pragma warning(pop)
#pragma pack(pop)
#endif // ^^^ _HAS_CXX20 ^^^
#endif // _STL_COMPILER_PREPROCESSOR
#endif // _SYNCSTREAM_