-
Notifications
You must be signed in to change notification settings - Fork 123
/
EntriesExchange.cs
223 lines (187 loc) · 8.26 KB
/
EntriesExchange.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
using System.IO.Pipelines;
using static System.Buffers.Binary.BinaryPrimitives;
namespace DotNext.Net.Cluster.Consensus.Raft.TransportServices;
using Buffers;
using static IO.DataTransferObject;
using static IO.Pipelines.PipeExtensions;
internal abstract class EntriesExchange : ClientExchange<Result<bool>>, IAsyncDisposable
{
/*
Message flow:
1.REQ(None) Announce number of entries, prevLogIndex, prevLogTerm etc.
1.RES(Ack) Wait for command: NextEntry to start sending content, None to abort transmission
2.REQ(StreamStart) with information about content-type and length of the record
2.REP(Ack) Wait for command: NextEntry to start sending content, Continue to send next chunk, None to finalize transmission
3.REQ(Fragment) with the chunk of record data
3.REP(Ack) Wait for command: NextEntry to start sending content, Continue to send next chunk, None to finalize transmission
4.REQ(StreamEnd) with the final chunk of record data
4.REP(Ack) Wait for command: NextEntry to start sending content, None to finalize transmission
*/
private protected readonly Pipe pipe;
private readonly long term, prevLogIndex, prevLogTerm, commitIndex;
private readonly EmptyClusterConfiguration? configuration;
internal EntriesExchange(long term, long prevLogIndex, long prevLogTerm, long commitIndex, EmptyClusterConfiguration? configState, PipeOptions? options = null)
{
pipe = new Pipe(options ?? PipeOptions.Default);
this.term = term;
this.prevLogIndex = prevLogIndex;
this.prevLogTerm = prevLogTerm;
this.commitIndex = commitIndex;
configuration = configState;
}
internal static int CreateNextEntryResponse(Span<byte> output, int logEntryIndex)
{
WriteInt32LittleEndian(output, logEntryIndex);
return sizeof(int);
}
internal static int ParseLogEntryPrologue(ReadOnlySpan<byte> input, out LogEntryMetadata metadata)
{
var reader = new SpanReader<byte>(input);
metadata = new LogEntryMetadata(ref reader);
return LogEntryMetadata.Size;
}
internal static void ParseAnnouncement(ReadOnlySpan<byte> input, out ClusterMemberId sender, out long term, out long prevLogIndex, out long prevLogTerm, out long commitIndex, out int entriesCount, out EmptyClusterConfiguration? configuration)
{
var reader = new SpanReader<byte>(input);
sender = new(ref reader);
term = reader.ReadInt64(true);
prevLogIndex = reader.ReadInt64(true);
prevLogTerm = reader.ReadInt64(true);
commitIndex = reader.ReadInt64(true);
entriesCount = reader.ReadInt32(true);
configuration = EmptyClusterConfiguration.ReadFrom(ref reader);
}
private protected int WriteAnnouncement(Span<byte> output, int entriesCount)
{
var writer = new SpanWriter<byte>(output);
sender.Format(ref writer);
writer.WriteInt64(term, true);
writer.WriteInt64(prevLogIndex, true);
writer.WriteInt64(prevLogTerm, true);
writer.WriteInt64(commitIndex, true);
writer.WriteInt32(entriesCount, true);
EmptyClusterConfiguration.WriteTo(in configuration, ref writer);
return writer.WrittenCount;
}
private protected sealed override void OnException(Exception e) => pipe.Writer.Complete(e);
private protected sealed override void OnCanceled(CancellationToken token) => OnException(new OperationCanceledException(token));
async ValueTask IAsyncDisposable.DisposeAsync()
{
var e = new ObjectDisposedException(GetType().Name);
await pipe.Writer.CompleteAsync(e).ConfigureAwait(false);
await pipe.Reader.CompleteAsync(e).ConfigureAwait(false);
}
}
internal abstract class EntriesExchange<TEntry> : EntriesExchange
where TEntry : IRaftLogEntry
{
private delegate ValueTask<FlushResult> LogEntryFragmentWriter(PipeWriter writer, TEntry entry, CancellationToken token);
private static readonly LogEntryFragmentWriter[] FragmentWriters =
{
WriteLogEntryMetadata,
WriteLogEntryContent,
};
private protected EntriesExchange(long term, long prevLogIndex, long prevLogTerm, long commitIndex, EmptyClusterConfiguration? configState, PipeOptions? options = null)
: base(term, prevLogIndex, prevLogTerm, commitIndex, configState, options)
{
}
private static ValueTask<FlushResult> WriteLogEntryMetadata(PipeWriter writer, TEntry entry, CancellationToken token)
#pragma warning disable CA2252 // TODO: Remove in .NET 7
=> writer.WriteFormattableAsync(LogEntryMetadata.Create(entry), token);
#pragma warning restore CA2252
private static async ValueTask<FlushResult> WriteLogEntryContent(PipeWriter writer, TEntry entry, CancellationToken token)
{
var canceled = false;
try
{
await entry.WriteToAsync(writer, token).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
canceled = true;
}
return new FlushResult(canceled, false);
}
internal static async Task WriteEntryAsync(PipeWriter writer, TEntry entry, CancellationToken token)
{
foreach (var serializer in FragmentWriters)
{
var flushResult = await serializer(writer, entry, token).ConfigureAwait(false);
if (flushResult.IsCompleted)
return;
if (flushResult.IsCanceled)
break;
}
await writer.CompleteAsync().ConfigureAwait(false);
}
}
internal sealed class EntriesExchange<TEntry, TList> : EntriesExchange<TEntry>
where TEntry : IRaftLogEntry
where TList : IReadOnlyList<TEntry>
{
private TList entries;
private Task? writeSession;
private bool streamStart;
internal EntriesExchange(long term, in TList entries, long prevLogIndex, long prevLogTerm, long commitIndex, EmptyClusterConfiguration? configState, PipeOptions? options = null)
: base(term, prevLogIndex, prevLogTerm, commitIndex, configState, options)
{
this.entries = entries;
}
public override async ValueTask<(PacketHeaders, int, bool)> CreateOutboundMessageAsync(Memory<byte> payload, CancellationToken token)
{
int count;
FlowControl control;
// write portion of log entry
if (writeSession is null)
{
// send announcement
count = WriteAnnouncement(payload.Span, entries.Count);
control = FlowControl.None;
}
else
{
count = await pipe.Reader.CopyToAsync(payload, token).ConfigureAwait(false);
control = count == payload.Length
? streamStart ? FlowControl.StreamStart : FlowControl.Fragment
: FlowControl.StreamEnd;
}
return (new PacketHeaders(MessageType.AppendEntries, control), count, true);
}
private void FinalizeTransmission(ReadOnlySpan<byte> input)
{
TrySetResult(IExchange.ReadResult(input));
writeSession = null;
}
private Task WriteEntryAsync(int index, CancellationToken token)
=> WriteEntryAsync(pipe.Writer, entries[index], token);
private async Task NextEntryAsync(ReadOnlyMemory<byte> input, CancellationToken token)
{
var currentIndex = ReadInt32LittleEndian(input.Span);
if (writeSession is not null)
{
await pipe.Writer.CompleteAsync().ConfigureAwait(false);
await writeSession.ConfigureAwait(false);
await pipe.Reader.CompleteAsync().ConfigureAwait(false);
pipe.Reset();
}
writeSession = WriteEntryAsync(currentIndex, token);
}
public override async ValueTask<bool> ProcessInboundMessageAsync(PacketHeaders headers, ReadOnlyMemory<byte> payload, CancellationToken token)
{
switch (headers.Type)
{
default:
return false;
case MessageType.None:
FinalizeTransmission(payload.Span);
return false;
case MessageType.NextEntry:
streamStart = true;
await NextEntryAsync(payload, token).ConfigureAwait(false);
return true;
case MessageType.Continue:
streamStart = false;
return true;
}
}
}