9
9
using System . Linq ;
10
10
using System . Threading ;
11
11
using System . Threading . Tasks ;
12
+ using System . Runtime . InteropServices ;
13
+ using System . Buffers ;
12
14
13
15
namespace HttpStress
14
16
{
15
17
public sealed class LogHttpEventListener : EventListener
16
18
{
17
19
private int _lastLogNumber = 0 ;
18
- private StreamWriter _log ;
20
+ private FileStream _log ;
19
21
private Channel < string > _messagesChannel = Channel . CreateUnbounded < string > ( ) ;
20
22
private Task _processMessages ;
21
- private CancellationTokenSource _stopProcessing ;
23
+ private CancellationTokenSource _stopProcessing = new CancellationTokenSource ( ) ;
24
+ private const string LogDirectory = "./clientlog" ;
25
+
26
+ private FileStream CreateNextLogFileStream ( )
27
+ {
28
+ string fn = Path . Combine ( LogDirectory , $ "client_{ ++ _lastLogNumber : 000} .log") ;
29
+ if ( File . Exists ( fn ) )
30
+ {
31
+ File . Delete ( fn ) ;
32
+ }
33
+ return new FileStream ( fn , FileMode . CreateNew , FileAccess . Write ) ;
34
+ }
22
35
23
36
public LogHttpEventListener ( )
24
37
{
25
- foreach ( var filename in Directory . GetFiles ( "." , "client*.log" ) )
38
+ if ( ! Directory . Exists ( LogDirectory ) )
39
+ {
40
+ Directory . CreateDirectory ( LogDirectory ) ;
41
+ }
42
+
43
+ foreach ( string filename in Directory . GetFiles ( LogDirectory , "client*.log" ) )
26
44
{
27
45
try
28
46
{
29
47
File . Delete ( filename ) ;
30
48
} catch { }
31
49
}
32
- _log = new StreamWriter ( "client.log" , false ) { AutoFlush = true } ;
33
-
50
+ _log = CreateNextLogFileStream ( ) ;
34
51
_messagesChannel = Channel . CreateUnbounded < string > ( ) ;
35
52
_processMessages = ProcessMessagesAsync ( ) ;
36
- _stopProcessing = new CancellationTokenSource ( ) ;
37
53
}
38
54
39
55
protected override void OnEventSourceCreated ( EventSource eventSource )
@@ -46,40 +62,51 @@ protected override void OnEventSourceCreated(EventSource eventSource)
46
62
47
63
private async Task ProcessMessagesAsync ( )
48
64
{
49
- await Task . Yield ( ) ;
50
-
51
65
try
52
66
{
67
+ byte [ ] buffer = new byte [ 8192 ] ;
68
+ var encoding = Encoding . ASCII ;
69
+
53
70
int i = 0 ;
54
71
await foreach ( string message in _messagesChannel . Reader . ReadAllAsync ( _stopProcessing . Token ) )
55
72
{
56
73
if ( ( ++ i % 10_000 ) == 0 )
57
74
{
58
- RotateFiles ( ) ;
75
+ await RotateFiles ( ) ;
59
76
}
60
-
61
- _log . WriteLine ( message ) ;
77
+ int maxLen = encoding . GetMaxByteCount ( message . Length ) ;
78
+ if ( maxLen > buffer . Length )
79
+ {
80
+ buffer = new byte [ maxLen ] ;
81
+ }
82
+ int byteCount = encoding . GetBytes ( message , buffer ) ;
83
+
84
+ await _log . WriteAsync ( buffer . AsMemory ( 0 , byteCount ) , _stopProcessing . Token ) ;
62
85
}
63
86
}
64
87
catch ( OperationCanceledException )
65
88
{
66
89
return ;
67
90
}
68
91
69
- void RotateFiles ( )
92
+ async ValueTask RotateFiles ( )
70
93
{
94
+ await _log . FlushAsync ( _stopProcessing . Token ) ;
71
95
// Rotate the log if it reaches 50 MB size.
72
- if ( _log . BaseStream . Length > ( 50 << 20 ) )
96
+ if ( _log . Length > ( 100 << 20 ) )
73
97
{
74
- _log . Close ( ) ;
75
- _log = new StreamWriter ( $ "client_ { ++ _lastLogNumber : 000 } .log" , false ) { AutoFlush = true } ;
98
+ await _log . DisposeAsync ( ) ;
99
+ _log = CreateNextLogFileStream ( ) ;
76
100
}
77
101
}
78
102
}
79
103
104
+ private StringBuilder ? _cachedStringBuilder ;
105
+
80
106
protected override async void OnEventWritten ( EventWrittenEventArgs eventData )
81
107
{
82
- var sb = new StringBuilder ( ) . Append ( $ "{ eventData . TimeStamp : HH:mm:ss.fffffff} [{ eventData . EventName } ] ") ;
108
+ StringBuilder sb = Interlocked . Exchange ( ref _cachedStringBuilder , null ) ?? new StringBuilder ( ) ;
109
+ sb . Append ( $ "{ eventData . TimeStamp : HH:mm:ss.fffffff} [{ eventData . EventName } ] ") ;
83
110
for ( int i = 0 ; i < eventData . Payload ? . Count ; i ++ )
84
111
{
85
112
if ( i > 0 )
@@ -88,13 +115,17 @@ protected override async void OnEventWritten(EventWrittenEventArgs eventData)
88
115
}
89
116
sb . Append ( eventData . PayloadNames ? [ i ] ) . Append ( ": " ) . Append ( eventData . Payload [ i ] ) ;
90
117
}
91
- await _messagesChannel . Writer . WriteAsync ( sb . ToString ( ) ) ;
118
+ sb . Append ( Environment . NewLine ) ;
119
+ string s = sb . ToString ( ) ;
120
+ sb . Clear ( ) ;
121
+ Interlocked . Exchange ( ref _cachedStringBuilder , sb ) ;
122
+ await _messagesChannel . Writer . WriteAsync ( s , _stopProcessing . Token ) ;
92
123
}
93
124
94
125
public override void Dispose ( )
95
126
{
96
127
base . Dispose ( ) ;
97
-
128
+ _log . Flush ( ) ;
98
129
if ( ! _processMessages . Wait ( TimeSpan . FromSeconds ( 30 ) ) )
99
130
{
100
131
_stopProcessing . Cancel ( ) ;
0 commit comments