Skip to content

Commit

Permalink
feat(server): support in-heap memory JVM monitor (#2650)
Browse files Browse the repository at this point in the history
  • Loading branch information
MingzhenHan authored Oct 15, 2024
1 parent 1a82d83 commit 0cb6115
Show file tree
Hide file tree
Showing 5 changed files with 151 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.apache.hugegraph.config.OptionChecker.disallowEmpty;
import static org.apache.hugegraph.config.OptionChecker.nonNegativeInt;
import static org.apache.hugegraph.config.OptionChecker.positiveInt;
import static org.apache.hugegraph.config.OptionChecker.rangeDouble;
import static org.apache.hugegraph.config.OptionChecker.rangeInt;

public class ServerOptions extends OptionHolder {
Expand Down Expand Up @@ -321,4 +322,22 @@ public static synchronized ServerOptions instance() {
nonNegativeInt(),
1000L
);

public static final ConfigOption<Double> JVM_MEMORY_MONITOR_THRESHOLD =
new ConfigOption<>(
"memory_monitor.threshold",
"Threshold for JVM memory usage monitoring, 1 means disabling the memory " +
"monitoring task.",
rangeDouble(0.0, 1.0),
0.85
);

public static final ConfigOption<Integer> JVM_MEMORY_MONITOR_DETECT_PERIOD =
new ConfigOption<>(
"memory_monitor.period",
"The period in ms of JVM memory usage monitoring, in each period we will " +
"detect the jvm memory usage and take corresponding actions.",
nonNegativeInt(),
2000
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public Object metadata(String meta, Object... args) {

public static final void checkInterrupted() {
if (Thread.interrupted()) {
throw new BackendException("Interrupted, maybe it is timed out",
throw new BackendException("Interrupted, maybe it is timed out or uses too much memory",
new InterruptedException());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,7 @@ server.role=master

# slow query log
log.slow_query_threshold=1000

# jvm(in-heap) memory usage monitor, set 1 to disable it
memory_monitor.threshold=0.85
memory_monitor.period=2000
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class HugeGraphServer {

private final RestServer restServer;
private final GremlinServer gremlinServer;
private final MemoryMonitor memoryMonitor;

public static void register() {
RegisterUtil.registerBackends();
Expand Down Expand Up @@ -78,9 +79,15 @@ public HugeGraphServer(String gremlinServerConf, String restServerConf)
} finally {
System.setSecurityManager(securityManager);
}

// Start (In-Heap) Memory Monitor
this.memoryMonitor = new MemoryMonitor(restServerConf);
this.memoryMonitor.start();
}

public void stop() {
this.memoryMonitor.stop();

try {
this.gremlinServer.stop().get();
LOG.info("HugeGremlinServer stopped");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hugegraph.dist;

import java.lang.management.ManagementFactory;
import java.lang.management.MemoryUsage;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.apache.hugegraph.config.HugeConfig;
import org.apache.hugegraph.config.ServerOptions;
import org.apache.hugegraph.util.ExecutorUtil;
import org.apache.hugegraph.util.Log;
import org.slf4j.Logger;

import com.sun.management.ThreadMXBean;

public class MemoryMonitor {

private static final Logger LOG = Log.logger(MemoryMonitor.class);
private final double MEMORY_MONITOR_THRESHOLD;
private final int MEMORY_MONITOR_DETECT_PERIOD;
private final ScheduledExecutorService scheduler;

public MemoryMonitor(String restServerConf) {
HugeConfig restServerConfig = new HugeConfig(restServerConf);
MEMORY_MONITOR_THRESHOLD =
restServerConfig.get(ServerOptions.JVM_MEMORY_MONITOR_THRESHOLD);
MEMORY_MONITOR_DETECT_PERIOD =
restServerConfig.get(ServerOptions.JVM_MEMORY_MONITOR_DETECT_PERIOD);
this.scheduler = ExecutorUtil.newScheduledThreadPool("memory-monitor-thread-%d");
}

private void runMemoryDetect() {
double memoryUsagePercentage = getMemoryUsageRatio();

if (memoryUsagePercentage > MEMORY_MONITOR_THRESHOLD) {
LOG.warn("JVM memory usage is '{}', exceeding the threshold of '{}'.",
memoryUsagePercentage, MEMORY_MONITOR_THRESHOLD);
System.gc();
LOG.warn("Trigger System.gc()");

double doubleCheckUsage = getMemoryUsageRatio();
if (doubleCheckUsage > MEMORY_MONITOR_THRESHOLD) {
LOG.warn("JVM memory usage is '{}', exceeding the threshold of '{}'.",
doubleCheckUsage, MEMORY_MONITOR_THRESHOLD);
interruptHighestMemoryThread();
}
}
}

private double getMemoryUsageRatio() {
MemoryUsage heapMemoryUsage = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();
return (double) heapMemoryUsage.getUsed() / heapMemoryUsage.getMax();
}

private Thread getHighestMemoryThread() {
long highestMemory = 0;
Thread highestThread = null;

ThreadMXBean threadMXBean = (ThreadMXBean) ManagementFactory.getThreadMXBean();

Thread[] threads = new Thread[Thread.activeCount()];
Thread.enumerate(threads);
for (Thread thread : threads) {
if (thread.getState() != Thread.State.RUNNABLE || thread.getName() == null ||
!thread.getName().startsWith("grizzly-http-server-")) {
continue;
}

long threadMemory = threadMXBean.getThreadAllocatedBytes(thread.getId());
if (threadMemory > highestMemory) {
highestMemory = threadMemory;
highestThread = thread;
}
}
return highestThread;
}

private void interruptHighestMemoryThread() {
Thread targetThread = getHighestMemoryThread();
if (targetThread != null) {
targetThread.interrupt();
LOG.warn("Send interrupt to '{}' thread", targetThread.getName());
}
}

public void start() {
if (MEMORY_MONITOR_THRESHOLD >= 1.0) {
LOG.info("Invalid parameter, MEMORY_MONITOR_THRESHOLD should ≤ 1.0.");
return;
}
this.scheduler.scheduleAtFixedRate(this::runMemoryDetect, 0, MEMORY_MONITOR_DETECT_PERIOD,
TimeUnit.MILLISECONDS);
LOG.info("Memory monitoring started.");
}

public void stop() {
if (MEMORY_MONITOR_THRESHOLD >= 1.0) {
return;
}
this.scheduler.shutdownNow();
LOG.info("Memory monitoring stopped.");
}
}

0 comments on commit 0cb6115

Please sign in to comment.