Skip to content

Commit

Permalink
draft
Browse files Browse the repository at this point in the history
  • Loading branch information
sergiyvamz committed Oct 31, 2023
1 parent 8138a86 commit adf1d30
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.checkerframework.checker.nullness.qual.NonNull;
import software.amazon.jdbc.HostSpec;
Expand Down Expand Up @@ -152,6 +153,7 @@ public void run() {
MonitorConnectionContext newMonitorContext;
MonitorConnectionContext firstAddedNewMonitorContext = null;
final long currentTimeNano = this.getCurrentTimeNano();

while ((newMonitorContext = this.newContexts.poll()) != null) {
if (firstAddedNewMonitorContext == newMonitorContext) {
// This context has already been processed.
Expand Down Expand Up @@ -179,8 +181,7 @@ public void run() {
final long statusCheckStartTimeNano = this.getCurrentTimeNano();
this.contextLastUsedTimestampNano = statusCheckStartTimeNano;

final ConnectionStatus status =
checkConnectionStatus(this.nodeCheckTimeoutMillis);
final ConnectionStatus status = checkConnectionStatus(this.nodeCheckTimeoutMillis);

long delayMillis = -1;
MonitorConnectionContext monitorContext;
Expand Down Expand Up @@ -250,25 +251,34 @@ public void run() {
throw intEx;
} catch (final Exception ex) {
// log and ignore
LOGGER.warning(
() -> Messages.get(
"MonitorImpl.exceptionDuringMonitoringContinue",
new Object[] {this.hostSpec.getHost(), ex.getMessage()}));
if (LOGGER.isLoggable(Level.WARNING)) {
LOGGER.log(
Level.WARNING,
Messages.get(
"MonitorImpl.exceptionDuringMonitoringContinue",
new Object[]{this.hostSpec.getHost()}),
ex); // We want to print full trace stack of the exception.
}
}
}
} catch (final InterruptedException intEx) {
// exit thread
LOGGER.warning(
() -> Messages.get(
"MonitorImpl.interruptedExceptionDuringMonitoring",
new Object[] {this.hostSpec.getHost(), intEx.getMessage()}));
new Object[] {this.hostSpec.getHost()}));
} catch (final Exception ex) {
// this should not be reached; log and exit thread
LOGGER.warning(
() -> Messages.get(
"MonitorImpl.exceptionDuringMonitoringStop",
new Object[] {this.hostSpec.getHost(), ex.getMessage()}));
if (LOGGER.isLoggable(Level.WARNING)) {
LOGGER.log(
Level.WARNING,
Messages.get(
"MonitorImpl.exceptionDuringMonitoringStop",
new Object[]{this.hostSpec.getHost()}),
ex); // We want to print full trace stack of the exception.
}
} finally {
this.stopped = true;
if (this.monitoringConn != null) {
try {
this.monitoringConn.close();
Expand All @@ -279,7 +289,6 @@ public void run() {
if (telemetryContext != null) {
this.telemetryContext.closeContext();
}
this.stopped = true;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@

package software.amazon.jdbc.plugin.efm;

import java.lang.ref.WeakReference;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Collections;
import java.util.Properties;
import java.util.Set;
Expand All @@ -42,15 +42,15 @@ public class MonitorServiceImpl implements MonitorService {
public static final AwsWrapperProperty MONITOR_DISPOSAL_TIME_MS =
new AwsWrapperProperty(
"monitorDisposalTime",
"60000",
"600000", // 10min
"Interval in milliseconds for a monitor to be considered inactive and to be disposed.");

private final PluginService pluginService;
private MonitorThreadContainer threadContainer;

final MonitorInitializer monitorInitializer;
private Set<String> cachedMonitorNodeKeys = null;
private Monitor cachedMonitor = null;
private WeakReference<Monitor> cachedMonitor = null;
final TelemetryFactory telemetryFactory;
final TelemetryCounter abortedConnectionsCounter;

Expand Down Expand Up @@ -100,16 +100,14 @@ public MonitorConnectionContext startMonitoring(
new Object[] {hostSpec}));
}

final Monitor monitor;
if (this.cachedMonitor == null
Monitor monitor = this.cachedMonitor == null ? null : this.cachedMonitor.get();
if (monitor == null
|| this.cachedMonitorNodeKeys == null
|| !this.cachedMonitorNodeKeys.equals(nodeKeys)) {

monitor = getMonitor(nodeKeys, hostSpec, properties);
this.cachedMonitor = monitor;
this.cachedMonitor = new WeakReference<>(monitor);
this.cachedMonitorNodeKeys = Collections.unmodifiableSet(nodeKeys);
} else {
monitor = this.cachedMonitor;
}

final MonitorConnectionContext context =
Expand Down Expand Up @@ -139,7 +137,6 @@ public void stopMonitoringForAllConnections(@NonNull final Set<String> nodeKeys)
monitor = this.threadContainer.getMonitor(nodeKey);
if (monitor != null) {
monitor.clearContexts();
this.threadContainer.resetResource(monitor);
return;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
Expand All @@ -40,9 +38,9 @@ public class MonitorThreadContainer {
private static final AtomicInteger CLASS_USAGE_COUNT = new AtomicInteger();
private final Map<String, Monitor> monitorMap = new ConcurrentHashMap<>();
private final Map<Monitor, Future<?>> tasksMap = new ConcurrentHashMap<>();
private final Queue<Monitor> availableMonitors = new ConcurrentLinkedDeque<>();
private final ExecutorService threadPool;
private static final ReentrantLock LOCK_OBJECT = new ReentrantLock();
private static final ReentrantLock MONITOR_LOCK_OBJECT = new ReentrantLock();

/**
* Create an instance of the {@link MonitorThreadContainer}.
Expand Down Expand Up @@ -101,10 +99,6 @@ public Map<Monitor, Future<?>> getTasksMap() {
return tasksMap;
}

public ExecutorService getThreadPool() {
return threadPool;
}

Monitor getMonitor(final String node) {
return monitorMap.get(node);
}
Expand All @@ -114,42 +108,34 @@ Monitor getOrCreateMonitor(final Set<String> nodeKeys, final Supplier<Monitor> m
throw new IllegalArgumentException(Messages.get("MonitorThreadContainer.emptyNodeKeys"));
}

Monitor monitor = null;
String anyNodeKey = null;
for (final String nodeKey : nodeKeys) {
monitor = monitorMap.get(nodeKey);
anyNodeKey = nodeKey;
if (monitor != null) {
break;
MONITOR_LOCK_OBJECT.lock();
try {

Monitor monitor = null;
String anyNodeKey = null;
for (final String nodeKey : nodeKeys) {
monitor = monitorMap.get(nodeKey);
anyNodeKey = nodeKey;
if (monitor != null) {
break;
}
}
}

if (monitor == null) {
monitor = monitorMap.computeIfAbsent(
anyNodeKey,
k -> {
if (!availableMonitors.isEmpty()) {
final Monitor availableMonitor = availableMonitors.remove();
if (!availableMonitor.isStopped()) {
return availableMonitor;
}
tasksMap.computeIfPresent(
availableMonitor,
(key, v) -> {
v.cancel(true);
return null;
});
}

final Monitor newMonitor = monitorSupplier.get();
addTask(newMonitor);

return newMonitor;
});
}
if (monitor == null) {
monitor = monitorMap.computeIfAbsent(
anyNodeKey,
k -> {
final Monitor newMonitor = monitorSupplier.get();
addTask(newMonitor);
return newMonitor;
});
}
populateMonitorMap(nodeKeys, monitor);
return monitor;

populateMonitorMap(nodeKeys, monitor);
return monitor;
} finally {
MONITOR_LOCK_OBJECT.unlock();
}
}

private void populateMonitorMap(final Set<String> nodeKeys, final Monitor monitor) {
Expand All @@ -162,21 +148,6 @@ void addTask(final Monitor monitor) {
tasksMap.computeIfAbsent(monitor, k -> threadPool.submit(monitor));
}

/**
* Clear all references used by the given monitor. Put the monitor in to a queue waiting to be
* reused.
*
* @param monitor The monitor to reset.
*/
public void resetResource(final Monitor monitor) {
if (monitor == null) {
return;
}

monitorMap.entrySet().removeIf(e -> e.getValue() == monitor);
availableMonitors.add(monitor);
}

/**
* Remove references to the given {@link MonitorImpl} object and stop the background monitoring
* thread.
Expand All @@ -189,23 +160,34 @@ public void releaseResource(final Monitor monitor) {
}

final List<Monitor> monitorList = Collections.singletonList(monitor);
monitorMap.values().removeAll(monitorList);
tasksMap.computeIfPresent(
monitor,
(k, v) -> {
v.cancel(true);
return null;
});

MONITOR_LOCK_OBJECT.lock();
try {
monitorMap.values().removeAll(monitorList);
tasksMap.computeIfPresent(
monitor,
(k, v) -> {
v.cancel(true);
return null;
});
} finally {
MONITOR_LOCK_OBJECT.unlock();
}
}

private void releaseResources() {
monitorMap.clear();
tasksMap.values().stream()
.filter(val -> !val.isDone() && !val.isCancelled())
.forEach(val -> val.cancel(true));
MONITOR_LOCK_OBJECT.lock();
try {
monitorMap.clear();
tasksMap.values().stream()
.filter(val -> !val.isDone() && !val.isCancelled())
.forEach(val -> val.cancel(true));

if (threadPool != null) {
threadPool.shutdownNow();
if (threadPool != null) {
threadPool.shutdownNow();
}
} finally {
MONITOR_LOCK_OBJECT.unlock();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,9 @@ MonitorThreadContainer.emptyNodeKeys=Provided node keys are empty.

# Monitor Impl
MonitorImpl.contextNullWarning=Parameter 'context' should not be null.
MonitorImpl.interruptedExceptionDuringMonitoring=Monitoring thread for node {0} was interrupted: {1}
MonitorImpl.exceptionDuringMonitoringContinue=Continuing monitoring after unhandled exception was thrown in monitoring thread for node {0}: {1}
MonitorImpl.exceptionDuringMonitoringStop=Stopping monitoring after unhandled exception was thrown in monitoring thread for node {0}: {1}
MonitorImpl.interruptedExceptionDuringMonitoring=Monitoring thread for node {0} was interrupted.
MonitorImpl.exceptionDuringMonitoringContinue=Continuing monitoring after unhandled exception was thrown in monitoring thread for node {0}.
MonitorImpl.exceptionDuringMonitoringStop=Stopping monitoring after unhandled exception was thrown in monitoring thread for node {0}.
MonitorImpl.monitorIsStopped=Monitoring was already stopped for node {0}.

# Monitor Service Impl
Expand Down

0 comments on commit adf1d30

Please sign in to comment.