@@ -49,7 +49,10 @@ public abstract class QpChannel
4949 private ICryptoTransform enc ;
5050 private ICryptoTransform dec ;
5151 private Encoding encoding = Encoding . UTF8 ;
52-
52+
53+ private Task sendPackageTask = Task . CompletedTask ;
54+ //发送包锁对象
55+ private object SEND_PACKAGE_LOCK_OBJ = new object ( ) ;
5356 //断开连接锁对象
5457 private object DISCONNECT_LOCK_OBJ = new object ( ) ;
5558
@@ -103,13 +106,9 @@ protected set
103106 /// </summary>
104107 public long BytesSentPerSec { get ; private set ; }
105108 /// <summary>
106- /// 包发送队列
107- /// </summary>
108- private ConcurrentQueue < QpPackageSendQueueItem > packageSendQueue = new ConcurrentQueue < QpPackageSendQueueItem > ( ) ;
109- /// <summary>
110109 /// 包发送队列数量
111110 /// </summary>
112- public int PackageSendQueueCount => packageSendQueue . Count ;
111+ public int PackageSendQueueCount = 0 ;
113112
114113 /// <summary>
115114 /// 最后一次连接的时间
@@ -129,16 +128,18 @@ protected set
129128 /// </summary>
130129 public virtual void Disconnect ( )
131130 {
131+ var shouldRaiseDisconnectedEvent = false ;
132132 lock ( DISCONNECT_LOCK_OBJ )
133133 {
134- packageSendQueue . Clear ( ) ;
135134 if ( IsConnected )
136135 {
137- IsConnected = false ;
138- Disconnected ? . Invoke ( this , QpEventArgs . Empty ) ;
136+ IsConnected = false ;
137+ shouldRaiseDisconnectedEvent = true ;
139138 }
140139 }
141140 InitQpPackageHandler_Stream ( null ) ;
141+ if ( shouldRaiseDisconnectedEvent )
142+ Disconnected ? . Invoke ( this , EventArgs . Empty ) ;
142143 }
143144
144145 /// <summary>
@@ -366,9 +367,19 @@ private void writePackageTotalLengthToBuffer(byte[] buffer, int offset, int pack
366367
367368 private Task writePackageAsync ( Func < byte [ ] , ArraySegment < byte > > getPackagePayloadFunc , Action afterSendHandler )
368369 {
369- var item = new QpPackageSendQueueItem ( getPackagePayloadFunc , afterSendHandler ) ;
370- packageSendQueue . Enqueue ( item ) ;
371- return item . SendTask ;
370+ Interlocked . Increment ( ref PackageSendQueueCount ) ;
371+ lock ( SEND_PACKAGE_LOCK_OBJ )
372+ {
373+ sendPackageTask = sendPackageTask . ContinueWith ( t =>
374+ {
375+ var stream = QpPackageHandler_Stream ;
376+ if ( stream == null )
377+ throw new IOException ( "Connection is disconnected." ) ;
378+ writePackage ( getPackagePayloadFunc , afterSendHandler ) . Wait ( ) ;
379+ } ) ;
380+ sendPackageTask . ContinueWith ( t => Interlocked . Decrement ( ref PackageSendQueueCount ) ) ;
381+ return sendPackageTask ;
382+ }
372383 }
373384
374385 private async Task writePackage ( Func < byte [ ] , ArraySegment < byte > > getPackagePayloadFunc , Action afterSendHandler )
@@ -782,37 +793,6 @@ protected async Task<ArraySegment<byte>> ReadPackageAsync(CancellationToken toke
782793 finalPackageBuffer . Count ) ;
783794 }
784795
785- protected void BeginSendPackage ( CancellationToken cancellationToken )
786- {
787- Task . Delay ( options . CheckSendQueueInterval , cancellationToken ) . ContinueWith ( async t =>
788- {
789- if ( t . IsCanceled )
790- return ;
791- //开始发送数据包
792- QpPackageSendQueueItem item = null ;
793- while ( packageSendQueue . Count > 0 )
794- {
795- if ( cancellationToken . IsCancellationRequested )
796- return ;
797- var stream = QpPackageHandler_Stream ;
798- if ( stream == null )
799- return ;
800- if ( ! packageSendQueue . TryDequeue ( out item ) )
801- continue ;
802- try
803- {
804- await writePackage ( item . GetPackagePayloadFunc , item . AfterSendHandler ) ;
805- item . SetResult ( null ) ;
806- }
807- catch ( Exception ex )
808- {
809- item . SetResult ( ex ) ;
810- }
811- }
812- BeginSendPackage ( cancellationToken ) ;
813- } ) ;
814- }
815-
816796 protected void BeginHeartBeat ( CancellationToken cancellationToken )
817797 {
818798 if ( options . HeartBeatInterval > 0 )
0 commit comments