Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CLOUDSTACK-9564: Fix memory leaks in VmwareContextPool #1729

Merged
merged 1 commit into from
Dec 5, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ public static VmwareContext create(String vCenterAddress, String vCenterUserName
context.registerStockObject("noderuninfo", String.format("%d-%d", s_clusterMgr.getManagementNodeId(), s_clusterMgr.getCurrentRunId()));

context.setPoolInfo(s_pool, VmwareContextPool.composePoolKey(vCenterAddress, vCenterUserName));
s_pool.registerOutstandingContext(context);

return context;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5481,7 +5481,7 @@ private static void recycleServiceContext() {
if (s_logger.isTraceEnabled()) {
s_logger.trace("Recycling threadlocal context to pool");
}
context.getPool().returnContext(context);
context.getPool().registerContext(context);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ public static VmwareContext create(String vCenterAddress, String vCenterUserName
assert (context != null);

context.setPoolInfo(s_pool, VmwareContextPool.composePoolKey(vCenterAddress, vCenterUserName));
s_pool.registerOutstandingContext(context);

return context;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ public void recycleServiceContext() {
VmwareContext context = currentContext.get();
currentContext.set(null);
assert (context.getPool() != null);
context.getPool().returnContext(context);
context.getPool().registerContext(context);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -666,7 +666,7 @@ public void close() {
s_logger.warn("Unexpected exception: ", e);
} finally {
if (_pool != null) {
_pool.unregisterOutstandingContext(this);
_pool.unregisterContext(this);
}
unregisterOutstandingContext();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,105 +16,108 @@
// under the License.
package com.cloud.hypervisor.vmware.util;

import com.google.common.base.Strings;
import org.apache.cloudstack.managed.context.ManagedContextTimerTask;
import org.apache.log4j.Logger;
import org.joda.time.Duration;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Timer;
import java.util.TimerTask;

import org.apache.log4j.Logger;

import org.apache.cloudstack.managed.context.ManagedContextTimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;

public class VmwareContextPool {
private static final Logger s_logger = Logger.getLogger(VmwareContextPool.class);

private static final long DEFAULT_CHECK_INTERVAL = 10000;
private static final Duration DEFAULT_CHECK_INTERVAL = Duration.millis(10000L);
private static final int DEFAULT_IDLE_QUEUE_LENGTH = 128;

private List<VmwareContext> _outstandingRegistry = new ArrayList<VmwareContext>();

private Map<String, List<VmwareContext>> _pool;
private final ConcurrentMap<String, Queue<VmwareContext>> _pool;
private int _maxIdleQueueLength = DEFAULT_IDLE_QUEUE_LENGTH;
private long _idleCheckIntervalMs = DEFAULT_CHECK_INTERVAL;
private Duration _idleCheckInterval = DEFAULT_CHECK_INTERVAL;

private Timer _timer = new Timer();

public VmwareContextPool() {
this(DEFAULT_IDLE_QUEUE_LENGTH, DEFAULT_CHECK_INTERVAL);
}

public VmwareContextPool(int maxIdleQueueLength) {
this(maxIdleQueueLength, DEFAULT_CHECK_INTERVAL);
}

public VmwareContextPool(int maxIdleQueueLength, long idleCheckIntervalMs) {
_pool = new HashMap<String, List<VmwareContext>>();
public VmwareContextPool(int maxIdleQueueLength, Duration idleCheckInterval) {
_pool = new ConcurrentHashMap<String, Queue<VmwareContext>>();

_maxIdleQueueLength = maxIdleQueueLength;
_idleCheckIntervalMs = idleCheckIntervalMs;

_timer.scheduleAtFixedRate(getTimerTask(), _idleCheckIntervalMs, _idleCheckIntervalMs);
}
_idleCheckInterval = idleCheckInterval;

public void registerOutstandingContext(VmwareContext context) {
assert (context != null);
synchronized (this) {
_outstandingRegistry.add(context);
}
_timer.scheduleAtFixedRate(getTimerTask(), _idleCheckInterval.getMillis(), _idleCheckInterval.getMillis());
}

public void unregisterOutstandingContext(VmwareContext context) {
assert (context != null);
synchronized (this) {
_outstandingRegistry.remove(context);
public VmwareContext getContext(final String vCenterAddress, final String vCenterUserName) {
final String poolKey = composePoolKey(vCenterAddress, vCenterUserName).intern();
if (Strings.isNullOrEmpty(poolKey)) {
return null;
}
}

public VmwareContext getContext(String vCenterAddress, String vCenterUserName) {
String poolKey = composePoolKey(vCenterAddress, vCenterUserName);
synchronized (this) {
List<VmwareContext> l = _pool.get(poolKey);
if (l == null)
return null;

if (l.size() > 0) {
VmwareContext context = l.remove(0);
context.setPoolInfo(this, poolKey);

if (s_logger.isTraceEnabled())
s_logger.trace("Return a VmwareContext from the idle pool: " + poolKey + ". current pool size: " + l.size() + ", outstanding count: " +
VmwareContext.getOutstandingContextCount());
synchronized (poolKey) {
final Queue<VmwareContext> ctxList = _pool.get(poolKey);
if (ctxList != null && !ctxList.isEmpty()) {
final VmwareContext context = ctxList.remove();
if (context != null) {
context.setPoolInfo(this, poolKey);
}
if (s_logger.isTraceEnabled()) {
s_logger.trace("Return a VmwareContext from the idle pool: " + poolKey + ". current pool size: " + ctxList.size() + ", outstanding count: " +
VmwareContext.getOutstandingContextCount());
}
return context;
}

// TODO, we need to control the maximum number of outstanding VmwareContext object in the future
return null;
}
}

public void returnContext(VmwareContext context) {
public void registerContext(final VmwareContext context) {
assert (context.getPool() == this);
assert (context.getPoolKey() != null);
synchronized (this) {
List<VmwareContext> l = _pool.get(context.getPoolKey());
if (l == null) {
l = new ArrayList<VmwareContext>();
_pool.put(context.getPoolKey(), l);

final String poolKey = context.getPoolKey().intern();
synchronized (poolKey) {
Queue<VmwareContext> ctxQueue = _pool.get(poolKey);

if (ctxQueue == null) {
ctxQueue = new ConcurrentLinkedQueue<>();
_pool.put(poolKey, ctxQueue);
}

if (ctxQueue.size() >= _maxIdleQueueLength) {
final VmwareContext oldestContext = ctxQueue.remove();
if (oldestContext != null) {
try {
oldestContext.close();
} catch (Throwable t) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@abhinandanprateek ^^ here we close oldest context, before adding a new one. The queue provided the FIFO semantics to purge old contexts and keep new ones around, which is why I switched from previously used arraylist.

s_logger.error("Unexpected exception caught while trying to purge oldest VmwareContext", t);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although we are using a bounded queue now and that will prevent memory leaks, do we know why it should be throwing old elements -or- why the contexts are not cleaned up programmatically instead of forcefully restricting the size ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the registerContext method we check if the queue is full; in case it is full, we remove the oldest element and close it properly. I'll ping you on the lines where it happens.

}
}
context.clearStockObjects();
ctxQueue.add(context);

if (l.size() < _maxIdleQueueLength) {
context.clearStockObjects();
l.add(context);

if (s_logger.isTraceEnabled())
s_logger.trace("Recycle VmwareContext into idle pool: " + context.getPoolKey() + ", current idle pool size: " + l.size() + ", outstanding count: " +
VmwareContext.getOutstandingContextCount());
} else {
if (s_logger.isTraceEnabled())
s_logger.trace("VmwareContextPool queue exceeds limits, queue size: " + l.size());
context.close();
if (s_logger.isTraceEnabled()) {
s_logger.trace("Recycle VmwareContext into idle pool: " + context.getPoolKey() + ", current idle pool size: " + ctxQueue.size() + ", outstanding count: "
+ VmwareContext.getOutstandingContextCount());
}
}
}

public void unregisterContext(final VmwareContext context) {
assert (context != null);
final String poolKey = context.getPoolKey().intern();
final Queue<VmwareContext> ctxList = _pool.get(poolKey);
synchronized (poolKey) {
if (!Strings.isNullOrEmpty(poolKey) && ctxList != null && ctxList.contains(context)) {
ctxList.remove(context);
}
}
}
Expand All @@ -124,8 +127,6 @@ private TimerTask getTimerTask() {
@Override
protected void runInContext() {
try {
// doIdleCheck();

doKeepAlive();
} catch (Throwable e) {
s_logger.error("Unexpected exception", e);
Expand All @@ -134,35 +135,30 @@ protected void runInContext() {
};
}

private void getKeepAliveCheckContexts(List<VmwareContext> l, int batchSize) {
synchronized (this) {
int size = Math.min(_outstandingRegistry.size(), batchSize);
while (size > 0) {
VmwareContext context = _outstandingRegistry.remove(0);
l.add(context);

_outstandingRegistry.add(context);
size--;
}
}
}

private void doKeepAlive() {
List<VmwareContext> l = new ArrayList<VmwareContext>();
int batchSize = (int)(_idleCheckIntervalMs / 1000); // calculate batch size at 1 request/sec rate
getKeepAliveCheckContexts(l, batchSize);

for (VmwareContext context : l) {
try {
context.idleCheck();
} catch (Throwable e) {
s_logger.warn("Exception caught during VmwareContext idle check, close and discard the context", e);
context.close();
final List<VmwareContext> closableCtxList = new ArrayList<>();
for (final Queue<VmwareContext> ctxQueue : _pool.values()) {
for (Iterator<VmwareContext> iterator = ctxQueue.iterator(); iterator.hasNext();) {
final VmwareContext context = iterator.next();
if (context == null) {
iterator.remove();
continue;
}
try {
context.idleCheck();
} catch (Throwable e) {
s_logger.warn("Exception caught during VmwareContext idle check, close and discard the context", e);
closableCtxList.add(context);
iterator.remove();
}
}
}
for (final VmwareContext context : closableCtxList) {
context.close();
}
}

public static String composePoolKey(String vCenterAddress, String vCenterUserName) {
public static String composePoolKey(final String vCenterAddress, final String vCenterUserName) {
assert (vCenterUserName != null);
assert (vCenterAddress != null);
return vCenterUserName + "@" + vCenterAddress;
Expand Down
Loading