From 72d1b36febd18db432391d5dc5524974aaf7d406 Mon Sep 17 00:00:00 2001 From: duripeng <453243496@qq.com> Date: Mon, 18 Dec 2023 21:37:30 +0800 Subject: [PATCH 1/2] [Enhance](broker) add inputstream expire scheduled checker to avoid memory leak for broker scan --- .../doris/broker/hdfs/BrokerConfig.java | 6 ++ .../broker/hdfs/ClientContextManager.java | 59 +++++++++++++++++-- .../doris/broker/hdfs/FileSystemManager.java | 4 +- 3 files changed, 61 insertions(+), 8 deletions(-) diff --git a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/BrokerConfig.java b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/BrokerConfig.java index b5d4ca996f7836..5fd17706a1ca27 100644 --- a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/BrokerConfig.java +++ b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/BrokerConfig.java @@ -31,6 +31,12 @@ public class BrokerConfig extends ConfigBase { @ConfField public static int client_expire_seconds = 3600; + @ConfField + public static boolean enable_input_stream_expire_check = false; + + @ConfField + public static int input_stream_expire_seconds = 300; + @ConfField public static int broker_ipc_port = 8000; } diff --git a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/ClientContextManager.java b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/ClientContextManager.java index 736f9ae448f7f8..c80925f85ca6e0 100644 --- a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/ClientContextManager.java +++ b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/ClientContextManager.java @@ -17,9 +17,13 @@ package org.apache.doris.broker.hdfs; +import java.util.Iterator; +import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -32,16 +36,21 @@ public class ClientContextManager { private static Logger logger = Logger .getLogger(ClientContextManager.class.getName()); - private ScheduledExecutorService executorService; + private ScheduledExecutorService clientCheckExecutorService; + private ScheduledExecutorService inputStreamCheckExecuterService; private ConcurrentHashMap clientContexts; private ConcurrentHashMap fdToClientMap; private int clientExpirationSeconds = BrokerConfig.client_expire_seconds; - public ClientContextManager(ScheduledExecutorService executorService) { + public ClientContextManager() { clientContexts = new ConcurrentHashMap<>(); fdToClientMap = new ConcurrentHashMap<>(); - this.executorService = executorService; - this.executorService.schedule(new CheckClientExpirationTask(), 0, TimeUnit.SECONDS); + this.clientCheckExecutorService = Executors.newScheduledThreadPool(2); + this.clientCheckExecutorService.schedule(new CheckClientExpirationTask(), 0, TimeUnit.SECONDS); + if (BrokerConfig.enable_input_stream_expire_check) { + this.inputStreamCheckExecuterService = Executors.newScheduledThreadPool(2); + this.inputStreamCheckExecuterService.schedule(new CheckInputStreamExpirationTask(), 0, TimeUnit.SECONDS); + } } public void onPing(String clientId) { @@ -126,6 +135,24 @@ public synchronized void removeOutputStream(TBrokerFD fd) { } } + public synchronized void remoteExpireInputStreams() { + int inputStreamExpireSeconds = BrokerConfig.input_stream_expire_seconds; + TBrokerFD fd; + for (ClientResourceContext clientContext : clientContexts.values()) { + Iterator> iter = clientContext.inputStreams.entrySet().iterator(); + while (iter.hasNext()) { + Entry entry = iter.next(); + if (entry.getValue().checkExpire(inputStreamExpireSeconds)) { + ClientContextManager.this.removeInputStream(fd); + } + iter.remove(); + logger.info(fd + " in client [" + clientContext.clientId + + "] is expired, remove it from contexts. last update time is " + + entry.getValue().getLastPingTimestamp()); + } + } + } + class CheckClientExpirationTask implements Runnable { @Override public void run() { @@ -145,7 +172,18 @@ public void run() { } } } finally { - ClientContextManager.this.executorService.schedule(this, 60, TimeUnit.SECONDS); + ClientContextManager.this.clientCheckExecutorService.schedule(this, 60, TimeUnit.SECONDS); + } + } + } + + class CheckInputStreamExpirationTask implements Runnable { + @Override + public void run() { + try { + ClientContextManager.this.remoteExpireInputStreams(); + } finally { + ClientContextManager.this.inputStreamCheckExecuterService.schedule(this, 60, TimeUnit.SECONDS); } } } @@ -175,21 +213,32 @@ private static class BrokerInputStream { private final FSDataInputStream inputStream; private final BrokerFileSystem brokerFileSystem; + private AtomicLong lastPingTimestamp; public BrokerInputStream(FSDataInputStream inputStream, BrokerFileSystem brokerFileSystem) { this.inputStream = inputStream; this.brokerFileSystem = brokerFileSystem; this.brokerFileSystem.updateLastUpdateAccessTime(); + this.lastPingTimestamp = new AtomicLong(System.currentTimeMillis()); } public FSDataInputStream getInputStream() { this.brokerFileSystem.updateLastUpdateAccessTime(); + this.lastPingTimestamp.set(System.currentTimeMillis()); return inputStream; } public void updateLastUpdateAccessTime() { this.brokerFileSystem.updateLastUpdateAccessTime(); } + + public boolean checkExpire(long expireSecond) { + return System.currentTimeMillis() - lastPingTimestamp.get() > expireSecond * 1000; + } + + public long getLastPingTimestamp() { + return lastPingTimestamp.get(); + } } static class ClientResourceContext { diff --git a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java index 58a09c7e090225..beeff588bcba76 100644 --- a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java +++ b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java @@ -163,8 +163,6 @@ public class FileSystemManager { private static final String DFS_CLIENT_AUTH_METHOD = "dfs.client.auth.method"; private static final String DFS_RPC_TIMEOUT = "dfs.rpc.timeout"; - private ScheduledExecutorService handleManagementPool = Executors.newScheduledThreadPool(2); - private int readBufferSize = 128 << 10; // 128k private int writeBufferSize = 128 << 10; // 128k @@ -173,7 +171,7 @@ public class FileSystemManager { public FileSystemManager() { cachedFileSystem = new ConcurrentHashMap<>(); - clientContextManager = new ClientContextManager(handleManagementPool); + clientContextManager = new ClientContextManager(); readBufferSize = BrokerConfig.hdfs_read_buffer_size_kb << 10; writeBufferSize = BrokerConfig.hdfs_write_buffer_size_kb << 10; } From 7164749bb89288d4278b267bae930adac148d46c Mon Sep 17 00:00:00 2001 From: duripeng <453243496@qq.com> Date: Tue, 19 Dec 2023 15:44:10 +0800 Subject: [PATCH 2/2] fd init --- .../java/org/apache/doris/broker/hdfs/ClientContextManager.java | 1 + 1 file changed, 1 insertion(+) diff --git a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/ClientContextManager.java b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/ClientContextManager.java index c80925f85ca6e0..2df4c12a7fda8d 100644 --- a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/ClientContextManager.java +++ b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/ClientContextManager.java @@ -142,6 +142,7 @@ public synchronized void remoteExpireInputStreams() { Iterator> iter = clientContext.inputStreams.entrySet().iterator(); while (iter.hasNext()) { Entry entry = iter.next(); + fd = entry.getKey(); if (entry.getValue().checkExpire(inputStreamExpireSeconds)) { ClientContextManager.this.removeInputStream(fd); }