From ed60773fe56c65c0bc08662ebb4e726ec9f88157 Mon Sep 17 00:00:00 2001 From: "xiaofan.chen" Date: Wed, 7 Aug 2024 21:08:36 +0800 Subject: [PATCH] fix: ensure hasOngoingSendLoop.exitSafe() --- .../protocol/DefaultBatchFlushEndpoint.java | 32 +++++++++++-------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/src/main/java/io/lettuce/core/protocol/DefaultBatchFlushEndpoint.java b/src/main/java/io/lettuce/core/protocol/DefaultBatchFlushEndpoint.java index 34f20d84bc..8955f151da 100644 --- a/src/main/java/io/lettuce/core/protocol/DefaultBatchFlushEndpoint.java +++ b/src/main/java/io/lettuce/core/protocol/DefaultBatchFlushEndpoint.java @@ -609,7 +609,11 @@ private void scheduleSendJobOnConnected(final ContextualChannel chan) { LettuceAssert.assertState(chan.eventLoop().inEventLoop(), "must be called in event loop thread"); // Schedule directly - scheduleSendJobInEventLoopIfNeeded(chan); + if (chan.context.batchFlushEndPointContext.hasOngoingSendLoop.tryEnterSafeGetVolatile()) { + scheduleSendJobInEventLoopIfNeeded(chan); + } + // Otherwise: + // someone will do the job for us } private void scheduleSendJobIfNeeded(final ContextualChannel chan) { @@ -618,11 +622,6 @@ private void scheduleSendJobIfNeeded(final ContextualChannel chan) { } final EventLoop eventLoop = chan.eventLoop(); - if (eventLoop.inEventLoop()) { - scheduleSendJobInEventLoopIfNeeded(chan); - return; - } - if (chan.context.batchFlushEndPointContext.hasOngoingSendLoop.tryEnterSafeGetVolatile()) { // Benchmark result of using tryEnterSafeGetVolatile() or not (1 thread, async get): // 1. uses tryEnterSafeGetVolatile() to avoid unnecessary eventLoop.execute() calls @@ -644,8 +643,11 @@ private void scheduleSendJobIfNeeded(final ContextualChannel chan) { private void scheduleSendJobInEventLoopIfNeeded(final ContextualChannel chan) { // Guarantee only 1 send loop. - if (chan.context.batchFlushEndPointContext.hasOngoingSendLoop.tryEnterUnsafe()) { + BatchFlushEndPointContext.HasOngoingSendLoop hasOngoingSendLoop = chan.context.batchFlushEndPointContext.hasOngoingSendLoop; + if (hasOngoingSendLoop.tryEnterUnsafe()) { loopSend(chan); + } else { + hasOngoingSendLoop.exitSafe(); } } @@ -657,11 +659,11 @@ private void loopSend(final ContextualChannel chan) { } LettuceAssert.assertState(channel == chan, "unexpected: channel not match but closeStatus == null"); - loopSend0(batchFlushEndPointContext, chan, writeSpinCount, true); + loopSend0(batchFlushEndPointContext, chan, writeSpinCount, false); } private void loopSend0(final BatchFlushEndPointContext batchFlushEndPointContext, final ContextualChannel chan, - int remainingSpinnCount, final boolean firstCall) { + int remainingSpinnCount, final boolean exitedSafe) { do { final int count = pollBatch(batchFlushEndPointContext, chan); if (count < 0) { @@ -686,14 +688,16 @@ private void loopSend0(final BatchFlushEndPointContext batchFlushEndPointContext return; } - if (firstCall) { + if (exitedSafe) { + // The send loop will be triggered later when a new task is added, + batchFlushEndPointContext.hasOngoingSendLoop.exitUnsafe(); + } else { // // Don't setUnsafe here because loopSend0() may result in a delayed loopSend() call. batchFlushEndPointContext.hasOngoingSendLoop.exitSafe(); // // Guarantee thread-safety: no dangling tasks in the queue. - loopSend0(batchFlushEndPointContext, chan, remainingSpinnCount, false); - } else { - // The send loop will be triggered later when a new task is added, - batchFlushEndPointContext.hasOngoingSendLoop.exitUnsafe(); + loopSend0(batchFlushEndPointContext, chan, remainingSpinnCount, true); + // chan.eventLoop().schedule(() -> loopSend0(batchFlushEndPointContext, chan, writeSpinCount, false), 100, + // TimeUnit.NANOSECONDS); } }