1
- using System ;
1
+ using StackifyLib . Models ;
2
+ using StackifyLib . Utils ;
3
+ using System ;
4
+ using System . Collections . Concurrent ;
2
5
using System . Collections . Generic ;
3
6
using System . Linq ;
4
- using System . Collections . Concurrent ;
5
- using System . Diagnostics ;
6
7
using System . Threading . Tasks ;
7
- using StackifyLib . Models ;
8
- using StackifyLib . Utils ;
9
8
10
9
#if NET451 || NET45 || NET40
11
10
using System . Runtime . Remoting . Messaging ;
@@ -298,19 +297,11 @@ private int FlushLoop()
298
297
299
298
bool keepGoing = false ;
300
299
301
- var tasks = new List < Task > ( ) ;
302
-
303
300
int flushTimes = 0 ;
304
301
//Keep flushing
305
302
do
306
303
{
307
- int count ;
308
- var task = FlushOnceAsync ( out count ) ;
309
-
310
- if ( task != null )
311
- {
312
- tasks . Add ( task ) ;
313
- }
304
+ int count = FlushOnce ( ) ;
314
305
315
306
if ( count >= 100 )
316
307
{
@@ -324,14 +315,6 @@ private int FlushLoop()
324
315
processedCount += count ;
325
316
} while ( keepGoing && flushTimes < 25 ) ;
326
317
327
-
328
- if ( _StopRequested && tasks . Any ( ) )
329
- {
330
- StackifyLib . Utils . StackifyAPILogger . Log ( "Waiting to ensure final log send. Waiting on " + tasks . Count + " tasks" ) ;
331
- Task . WaitAll ( tasks . ToArray ( ) , 5000 ) ;
332
- StackifyLib . Utils . StackifyAPILogger . Log ( "Final log flush complete" ) ;
333
- }
334
-
335
318
_QueueTooBig = _MessageBuffer . Count < Logger . MaxLogBufferSize ;
336
319
}
337
320
}
@@ -343,12 +326,12 @@ private int FlushLoop()
343
326
return processedCount ;
344
327
}
345
328
346
- private Task FlushOnceAsync ( out int messageSize )
329
+ private int FlushOnce ( )
347
330
{
348
331
349
332
// StackifyLib.Utils.StackifyAPILogger.Log("Calling FlushOnceAsync");
350
333
351
- messageSize = 0 ;
334
+ int messageSize = 0 ;
352
335
var chunk = new List < LogMsg > ( ) ;
353
336
354
337
//we only want to do this once at a time but the actual send is done async
@@ -393,85 +376,44 @@ private Task FlushOnceAsync(out int messageSize)
393
376
394
377
if ( chunk . Any ( ) )
395
378
{
379
+ var response = _LogClient . SendLogsByGroups ( chunk . ToArray ( ) ) ;
396
380
397
- return _LogClient . SendLogsByGroups ( chunk . ToArray ( ) ) . ContinueWith ( ( continuation ) =>
381
+ if ( response != null && response . Exception != null )
398
382
{
383
+ Utils . StackifyAPILogger . Log ( "Requeueing log messages due to error: " + response . Exception . ToString ( ) , true ) ;
399
384
400
- if ( continuation . Exception != null )
385
+ if ( response . IsClientError ( ) )
401
386
{
402
- Utils . StackifyAPILogger . Log ( "Requeueing log messages due to error: " + continuation . Exception . ToString ( ) , true ) ;
387
+ Utils . StackifyAPILogger . Log ( "Not requeueing log messages due to client error: " + response . StatusCode , true ) ;
403
388
}
404
-
405
- if ( continuation . Result != null && continuation . Result . Exception != null )
406
- {
407
- Utils . StackifyAPILogger . Log ( "Requeueing log messages due to error: " + continuation . Result . Exception . ToString ( ) , true ) ;
408
- }
409
-
410
- if ( continuation . Exception != null ||
411
- ( continuation . Result != null && continuation . Result . Exception != null ) )
389
+ else
412
390
{
413
391
try
414
392
{
415
- bool messagesSentTooManyTimes = false ;
416
-
417
- foreach ( var item in chunk )
418
- {
419
- item . UploadErrors ++ ;
420
-
421
- // try to upload up to 10 times
422
- if ( item . UploadErrors < 100 )
423
- {
424
- _MessageBuffer . Enqueue ( item ) ;
425
- }
426
- else
427
- {
428
- messagesSentTooManyTimes = true ;
429
- }
430
- }
393
+ bool messagesSentTooManyTimes = EnqueueForRetransmission ( chunk ) ;
431
394
432
395
if ( messagesSentTooManyTimes )
433
396
{
434
397
Utils . StackifyAPILogger . Log (
435
398
"Some messages not queued again due to too many failures uploading" ) ;
436
399
}
437
-
438
400
}
439
401
catch ( Exception ex2 )
440
402
{
441
403
Utils . StackifyAPILogger . Log ( "Error trying to requeue messages " + ex2 . ToString ( ) ) ;
442
404
}
443
405
}
444
- } ) ;
406
+ }
445
407
}
446
408
}
447
409
catch ( Exception ex )
448
410
{
449
411
Utils . StackifyAPILogger . Log ( ex . ToString ( ) ) ;
450
412
451
-
452
- //requeue the messages that errored trying to upload
453
- try
454
- {
455
- foreach ( var item in chunk )
456
- {
457
- item . UploadErrors ++ ;
458
-
459
- // try to upload up to 10 times
460
- if ( item . UploadErrors < 10 )
461
- {
462
- _MessageBuffer . Enqueue ( item ) ;
463
- }
464
- }
465
-
466
- }
467
- catch ( Exception ex2 )
468
- {
469
- Utils . StackifyAPILogger . Log ( ex2 . ToString ( ) ) ;
470
- }
413
+ EnqueueForRetransmission ( chunk ) ;
471
414
}
472
415
473
-
474
- return null ;
416
+ return messageSize ;
475
417
}
476
418
477
419
@@ -506,5 +448,34 @@ public void Pause(bool isPaused)
506
448
{
507
449
_PauseUpload = isPaused ;
508
450
}
451
+
452
+ private bool EnqueueForRetransmission ( List < LogMsg > chunk )
453
+ {
454
+ bool skippedMessage = false ;
455
+
456
+ try
457
+ {
458
+ foreach ( var item in chunk )
459
+ {
460
+ ++ item . UploadErrors ;
461
+
462
+ // retry up to 5 times
463
+ if ( item . UploadErrors < 5 )
464
+ {
465
+ _MessageBuffer . Enqueue ( item ) ;
466
+ }
467
+ else
468
+ {
469
+ skippedMessage = true ;
470
+ }
471
+ }
472
+ }
473
+ catch ( Exception e )
474
+ {
475
+ Utils . StackifyAPILogger . Log ( e . ToString ( ) ) ;
476
+ }
477
+
478
+ return skippedMessage ;
479
+ }
509
480
}
510
481
}
0 commit comments