diff --git a/Plasma-based-cache/src/main/parquet1.10.1/java/org/apache/parquet/hadoop/ParquetFileReader.java b/Plasma-based-cache/src/main/parquet1.10.1/java/org/apache/parquet/hadoop/ParquetFileReader.java index 5b0834a64..2198f9d76 100644 --- a/Plasma-based-cache/src/main/parquet1.10.1/java/org/apache/parquet/hadoop/ParquetFileReader.java +++ b/Plasma-based-cache/src/main/parquet1.10.1/java/org/apache/parquet/hadoop/ParquetFileReader.java @@ -597,18 +597,18 @@ public static ParquetFileReader open(InputFile file, ParquetReadOptions options) } private final InputFile file; - protected final SeekableInputStream f; + public final SeekableInputStream f; private final ParquetReadOptions options; - protected final Map paths = new HashMap<>(); + public final Map paths = new HashMap<>(); private final FileMetaData fileMetaData; // may be null - protected final List blocks; + public final List blocks; // not final. in some cases, this may be lazily loaded for backward-compat. private ParquetMetadata footer; - protected int currentBlock = 0; - protected ColumnChunkPageReadStore currentRowGroup = null; - protected DictionaryPageReader nextDictionaryReader = null; + public int currentBlock = 0; + public ColumnChunkPageReadStore currentRowGroup = null; + public DictionaryPageReader nextDictionaryReader = null; /** * @param configuration the Hadoop conf @@ -823,7 +823,7 @@ public boolean skipNextRowGroup() { return advanceToNextBlock(); } - protected boolean advanceToNextBlock() { + public boolean advanceToNextBlock() { if (currentBlock == blocks.size()) { return false; } @@ -1036,7 +1036,7 @@ public BytesInput readAsBytesInput(int size) throws IOException { /** * deals with a now fixed bug where compressedLength was missing a few bytes. */ - protected class WorkaroundChunk extends Chunk { + public class WorkaroundChunk extends Chunk { private final SeekableInputStream f; @@ -1098,7 +1098,7 @@ public BytesInput readAsBytesInput(int size) throws IOException { */ static class ChunkDescriptor { - protected final ColumnDescriptor col; + public final ColumnDescriptor col; private final ColumnChunkMetaData metadata; private final long fileOffset; private final int size; diff --git a/Plasma-based-cache/src/spark-3.1.1/main/scala/org/apache/spark/sql/execution/datasources/oap/filecache/MemoryManager.scala b/Plasma-based-cache/src/spark-3.1.1/main/scala/org/apache/spark/sql/execution/datasources/oap/filecache/MemoryManager.scala index 68479f4ef..a1efe78f9 100644 --- a/Plasma-based-cache/src/spark-3.1.1/main/scala/org/apache/spark/sql/execution/datasources/oap/filecache/MemoryManager.scala +++ b/Plasma-based-cache/src/spark-3.1.1/main/scala/org/apache/spark/sql/execution/datasources/oap/filecache/MemoryManager.scala @@ -135,7 +135,7 @@ private[sql] object MemoryManager extends Logging { case "tmp" => new TmpDramMemoryManager(sparkEnv) case "kmem" => new DaxKmemMemoryManager(sparkEnv) case "plasma" => - if (plasmaServerDetect()) { + if (plasmaServerDetect(sparkEnv)) { new PlasmaMemoryManager(sparkEnv) } else { new OffHeapMemoryManager(sparkEnv) @@ -164,7 +164,7 @@ private[sql] object MemoryManager extends Logging { case "noevict" => new HybridMemoryManager(sparkEnv) case "vmem" => new TmpDramMemoryManager(sparkEnv) case "external" => - if (plasmaServerDetect()) { + if (plasmaServerDetect(sparkEnv)) { new PlasmaMemoryManager(sparkEnv) } else { new OffHeapMemoryManager(sparkEnv) diff --git a/Plasma-based-cache/src/spark-3.1.1/main/scala/org/apache/spark/sql/execution/datasources/oap/filecache/OapCache.scala b/Plasma-based-cache/src/spark-3.1.1/main/scala/org/apache/spark/sql/execution/datasources/oap/filecache/OapCache.scala index 8aa673d32..baccfd2dd 100644 --- a/Plasma-based-cache/src/spark-3.1.1/main/scala/org/apache/spark/sql/execution/datasources/oap/filecache/OapCache.scala +++ b/Plasma-based-cache/src/spark-3.1.1/main/scala/org/apache/spark/sql/execution/datasources/oap/filecache/OapCache.scala @@ -217,15 +217,23 @@ private[filecache] class CacheGuardian(maxMemory: Long) extends Thread with Logg } private[filecache] object OapCache extends Logging { - def plasmaServerDetect(): Boolean = { - val command = "ps -ef" #| "grep plasma" - val plasmaServerStatus = command.!! - if (plasmaServerStatus.indexOf("plasma-store-server") == -1) { - logWarning("External cache strategy requires plasma-store-server launched, " + - "failed to detect plasma-store-server, will fallback to simpleCache.") - return false + def plasmaServerDetect(sparkEnv: SparkEnv): Boolean = { + val socket = sparkEnv.conf.get(OapConf.OAP_EXTERNAL_CACHE_SOCKET_PATH) + try { + System.loadLibrary("plasma_java") + } catch { + case e: Exception => logError(s"load plasma jni lib failed " + e.getMessage) } - true + var plasmaDetected: Boolean = true; + try { + val conn: plasma.PlasmaClient = new plasma.PlasmaClient(socket, "", 0) + } catch { + case e: PlasmaClientException => + logWarning("External cache strategy requires plasma-store-server launched, " + + "failed to detect plasma-store-server, will fallback to simpleCache." + e.getMessage) + plasmaDetected = false; + } + plasmaDetected } def cacheFallBackDetect(sparkEnv: SparkEnv, fallBackEnabled: Boolean = true, @@ -299,7 +307,7 @@ private[filecache] object OapCache extends Logging { oapCacheOpt match { case "external" => - if (plasmaServerDetect()) new ExternalCache(fiberType) + if (plasmaServerDetect(sparkEnv)) new ExternalCache(fiberType) else new SimpleOapCache() case "guava" => if (cacheFallBackDetect(sparkEnv, fallBackEnabled.toBoolean, fallBackRes.toBoolean)) { @@ -956,7 +964,8 @@ class MixCache(dataCacheMemory: Long, class ExternalCache(fiberType: FiberType) extends OapCache with Logging { private val conf = SparkEnv.get.conf - private val externalStoreCacheSocket: String = "/tmp/plasmaStore" + private val externalStoreCacheSocket: String = + conf.get(OapConf.OAP_EXTERNAL_CACHE_SOCKET_PATH) private var cacheInit: Boolean = false private var externalDBClient: ExternalDBClient = null diff --git a/Plasma-based-cache/src/spark-3.1.1/main/scala/org/apache/spark/sql/internal/oap/OapConf.scala b/Plasma-based-cache/src/spark-3.1.1/main/scala/org/apache/spark/sql/internal/oap/OapConf.scala index da6f613aa..3501eb7db 100644 --- a/Plasma-based-cache/src/spark-3.1.1/main/scala/org/apache/spark/sql/internal/oap/OapConf.scala +++ b/Plasma-based-cache/src/spark-3.1.1/main/scala/org/apache/spark/sql/internal/oap/OapConf.scala @@ -772,4 +772,10 @@ object OapConf { .stringConf .createWithDefault("RedisClient") + val OAP_EXTERNAL_CACHE_SOCKET_PATH = + SqlConfAdapter.buildConf("spark.sql.oap.external.cache.socket.path") + .internal() + .doc("The socket path of plasma cache") + .stringConf + .createWithDefault("/tmp/plasmaStore") } diff --git a/docs/User-Guide.md b/docs/User-Guide.md index c18ed4719..8b19464f7 100644 --- a/docs/User-Guide.md +++ b/docs/User-Guide.md @@ -295,6 +295,8 @@ spark.oap.cache.strategy external spark.sql.oap.dcpmm.free.wait.threshold 50000000000 # according to your executor core number spark.executor.sql.oap.cache.external.client.pool.size 10 +# The socket path of plasma server, default is /tmp/plasmaStore +spark.sql.oap.external.cache.socket.path /tmp/plasmaStore ``` Start Plasma service manually