Skip to content

Commit 6849dc0

Browse files
committed
Fix InputOutputStream not implement WriteAsync method problem.
1 parent 87caf74 commit 6849dc0

File tree

4 files changed

+67
-5
lines changed

4 files changed

+67
-5
lines changed

Quick.Protocol/QpChannel.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -323,7 +323,7 @@ private async Task writePackageBuffer(Stream stream, ArraySegment<byte> packageB
323323
packageBuffer = new ArraySegment<byte>(currentBuffer, 0, packageTotalLength);
324324
}
325325
}
326-
326+
327327
//执行AfterSendHandler
328328
afterSendHandler?.Invoke();
329329

@@ -351,7 +351,7 @@ private async Task writePackageBuffer(Stream stream, ArraySegment<byte> packageB
351351
LogUtils.LogContent ?
352352
BitConverter.ToString(packageBuffer.Array, packageBuffer.Offset, packageBuffer.Count)
353353
: LogUtils.NOT_SHOW_CONTENT_MESSAGE);
354-
stream.Flush();
354+
await stream.FlushAsync();
355355
}
356356

357357
private void writePackageTotalLengthToBuffer(byte[] buffer, int offset, int packageTotalLength)

Quick.Protocol/QpServerChannel.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ public class QpServerChannel : QpChannel
3131
/// <summary>
3232
/// 认证超时
3333
/// </summary>
34-
internal event EventHandler AuchenticateTimeout;
34+
public event EventHandler AuchenticateTimeout;
3535

3636
public QpServerChannel(Stream stream, string channelName, CancellationToken cancellationToken, QpServerOptions options) : base(options)
3737
{

Quick.Protocol/Streams/InputOutputStream.cs

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
using System.Collections.Generic;
33
using System.IO;
44
using System.Text;
5+
using System.Threading;
6+
using System.Threading.Tasks;
57

68
namespace Quick.Protocol.Streams
79
{
@@ -30,11 +32,41 @@ public override void Flush()
3032
output.Flush();
3133
}
3234

35+
public override int Read(Span<byte> buffer)
36+
{
37+
return input.Read(buffer);
38+
}
39+
40+
public override int ReadByte()
41+
{
42+
return input.ReadByte();
43+
}
44+
45+
public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
46+
{
47+
return input.BeginRead(buffer, offset, count, callback, state);
48+
}
49+
50+
public override int EndRead(IAsyncResult asyncResult)
51+
{
52+
return input.EndRead(asyncResult);
53+
}
54+
3355
public override int Read(byte[] buffer, int offset, int count)
3456
{
3557
return input.Read(buffer, offset, count);
3658
}
3759

60+
public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
61+
{
62+
return input.ReadAsync(buffer, offset, count, cancellationToken);
63+
}
64+
65+
public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
66+
{
67+
return input.ReadAsync(buffer, cancellationToken);
68+
}
69+
3870
public override long Seek(long offset, SeekOrigin origin)
3971
{
4072
throw new NotImplementedException();
@@ -49,5 +81,29 @@ public override void Write(byte[] buffer, int offset, int count)
4981
{
5082
output.Write(buffer, offset, count);
5183
}
84+
85+
public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
86+
{
87+
return output.WriteAsync(buffer, offset, count, cancellationToken);
88+
}
89+
public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
90+
{
91+
return output.WriteAsync(buffer, cancellationToken);
92+
}
93+
94+
public override void Write(ReadOnlySpan<byte> buffer)
95+
{
96+
output.Write(buffer);
97+
}
98+
99+
public override void WriteByte(byte value)
100+
{
101+
output.WriteByte(value);
102+
}
103+
104+
public override Task FlushAsync(CancellationToken cancellationToken)
105+
{
106+
return output.FlushAsync(cancellationToken);
107+
}
52108
}
53109
}

Test/ConsoleTest/Program.cs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,18 @@ static void Main(string[] args)
3030
ChannelName = $"Process:{process.Id}"
3131
};
3232
var channel = new QpStreamServerChannel(options);
33+
channel.AuchenticateTimeout += (sender, e) =>
34+
{
35+
Console.WriteLine(DateTime.Now.ToString() + "[Server]: 认证超时!");
36+
isDisconnected = true;
37+
};
3338
channel.Disconnected += (sender, e) =>
3439
{
3540
Console.WriteLine(DateTime.Now.ToString() + "[Server]: 连接已断开!");
3641
isDisconnected = true;
3742
};
3843
channel.HeartbeatPackageReceived += (sender, e) => Console.WriteLine(DateTime.Now.ToString() + "[Server]: 收到心跳包");
39-
channel.RawNoticePackageReceived+=(sender,e)=> Console.WriteLine($"[Client_RawNoticePackageReceived]TypeName:{e.TypeName},Content:{e.Content}"); ;
40-
44+
channel.RawNoticePackageReceived+=(sender,e)=> Console.WriteLine($"[Client_RawNoticePackageReceived]TypeName:{e.TypeName},Content:{e.Content}"); ;
4145
}
4246
else
4347
{
@@ -58,11 +62,13 @@ static void Main(string[] args)
5862
{
5963
if (t.IsCanceled)
6064
{
65+
isDisconnected = true;
6166
Debug.WriteLine("连接已取消");
6267
return;
6368
}
6469
if (t.IsFaulted)
6570
{
71+
isDisconnected = true;
6672
Debug.WriteLine("连接出错,原因:" + t.Exception.InnerException.ToString());
6773
return;
6874
}

0 commit comments

Comments
 (0)