Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-11754 right approach to fix memory leak #6462

Closed
wants to merge 7 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.lang.IgniteFuture;
import org.jetbrains.annotations.Nullable;

Expand Down Expand Up @@ -65,9 +64,16 @@ public void onFinishSend(UUID nodeId, long threadId) {
ThreadFinishSync threadSync = threadMap.get(threadId);

if (threadSync == null)
threadSync = F.addIfAbsent(threadMap, threadId, new ThreadFinishSync(threadId));
threadMap.put(threadId, threadSync = new ThreadFinishSync(threadId));

threadSync.onSend(nodeId);
synchronized (threadSync) {
//thread has to create new ThreadFinishSync
//if other thread executing onAckReceived method removed previous threadSync object
if (threadMap.get(threadId) == null)
threadMap.put(threadId, threadSync = new ThreadFinishSync(threadId));

threadSync.onSend(nodeId);
}
}

/**
Expand Down Expand Up @@ -103,8 +109,14 @@ public void onDisconnected(IgniteFuture<?> reconnectFut) {
public void onAckReceived(UUID nodeId, long threadId) {
ThreadFinishSync threadSync = threadMap.get(threadId);

if (threadSync != null)
if (threadSync != null) {
threadSync.onReceive(nodeId);

synchronized (threadSync) {
if (threadSync.isEmpty())
threadMap.remove(threadId);
}
}
}

/**
Expand All @@ -113,8 +125,14 @@ public void onAckReceived(UUID nodeId, long threadId) {
* @param nodeId Left node ID.
*/
public void onNodeLeft(UUID nodeId) {
for (ThreadFinishSync threadSync : threadMap.values())
for (ThreadFinishSync threadSync : threadMap.values()) {
threadSync.onNodeLeft(nodeId);

synchronized (threadSync) {
if (threadSync.isEmpty())
threadMap.remove(threadSync);
}
}
}

/**
Expand Down Expand Up @@ -192,7 +210,7 @@ public void onDisconnected(IgniteFuture<?> reconnectFut) {
* @param nodeId Node ID response received from.
*/
public void onReceive(UUID nodeId) {
TxFinishSync sync = nodeMap.get(nodeId);
TxFinishSync sync = nodeMap.remove(nodeId);

if (sync != null)
sync.onReceive();
Expand All @@ -207,6 +225,13 @@ public void onNodeLeft(UUID nodeId) {
if (sync != null)
sync.onNodeLeft();
}

/**
*
*/
private boolean isEmpty() {
return nodeMap.isEmpty();
}
}

/**
Expand Down