Skip to content

Commit

Permalink
Preliminary support for LMDBStore
Browse files Browse the repository at this point in the history
  • Loading branch information
darkfrog26 committed Feb 23, 2025
1 parent f71b65a commit 4e09213
Show file tree
Hide file tree
Showing 9 changed files with 313 additions and 1 deletion.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ Computationally focused database using pluggable stores
- Yahoo's HaloDB (https://github.com/yahoo/HaloDB)
- MapDB (https://mapdb.org)
- Facebook's RocksDB (https://rocksdb.org)
- LMDB (https://www.symas.com/mdb)
- Redis (https://redis.io)
- Apache Lucene (https://lucene.apache.org)
- SQLite (https://www.sqlite.org)
Expand Down
17 changes: 16 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ ThisBuild / outputStrategy := Some(StdoutOutput)

ThisBuild / javaOptions ++= Seq(
"--enable-native-access=ALL-UNNAMED",
"--add-opens=java.base/java.nio=ALL-UNNAMED",
"--add-opens=java.base/sun.nio.ch=ALL-UNNAMED",
"--add-modules", "jdk.incubator.vector"
)

Expand All @@ -62,6 +64,8 @@ val rocksDBVersion: String = "9.10.0"

val mapdbVersion: String = "3.1.0"

val lmdbVersion: String = "0.9.1"

val jedisVersion: String = "5.2.0"

val fabricVersion: String = "1.15.9"
Expand All @@ -87,7 +91,7 @@ val rapidVersion: String = "0.10.0"
val scalaTestVersion: String = "3.2.19"

lazy val root = project.in(file("."))
.aggregate(core.jvm, sql, sqlite, postgresql, duckdb, h2, lucene, halodb, rocksdb, mapdb, redis, all)
.aggregate(core.jvm, sql, sqlite, postgresql, duckdb, h2, lucene, halodb, rocksdb, mapdb, lmdb, redis, all)
.settings(
name := projectName,
publish := {},
Expand Down Expand Up @@ -235,6 +239,17 @@ lazy val mapdb = project.in(file("mapdb"))
fork := true
)

lazy val lmdb = project.in(file("lmdb"))
.dependsOn(core.jvm, core.jvm % "test->test")
.settings(
name := s"$projectName-lmdb",
libraryDependencies ++= Seq(
"org.lmdbjava" % "lmdbjava" % lmdbVersion,
"org.scalatest" %% "scalatest" % scalaTestVersion % Test
),
fork := true
)

lazy val redis = project.in(file("redis"))
.dependsOn(core.jvm, core.jvm % "test->test")
.settings(
Expand Down
23 changes: 23 additions & 0 deletions lmdb/src/main/scala/lightdb/lmdb/ByteBufferPool.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package lightdb.lmdb

import java.nio.ByteBuffer
import java.util.concurrent.ConcurrentLinkedQueue

class ByteBufferPool(startingSize: Int) {
private val queue = new ConcurrentLinkedQueue[ByteBuffer]

def get(neededSize: Int): ByteBuffer = Option(queue.poll()) match {
case Some(bb) if bb.capacity() >= neededSize => bb.clear()
case _ => create(neededSize)
}

private def create(neededSize: Int): ByteBuffer = {
var actual = startingSize
while (actual < neededSize) {
actual *= 2
}
ByteBuffer.allocateDirect(actual)
}

def release(bb: ByteBuffer): Unit = queue.offer(bb)
}
37 changes: 37 additions & 0 deletions lmdb/src/main/scala/lightdb/lmdb/LMDBInstance.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package lightdb.lmdb

import org.lmdbjava.{Dbi, DbiFlags, Env}
import rapid.Task

import java.nio.ByteBuffer

case class LMDBInstance(env: Env[ByteBuffer]) {
instance =>
private var map = Map.empty[String, Dbi[ByteBuffer]]

def createTransaction(): LMDBTransaction = LMDBTransaction(env.txnWrite())

def get(name: String): Dbi[ByteBuffer] = synchronized {
map.get(name) match {
case Some(dbi) => dbi
case None =>
val dbi = env.openDbi(name, DbiFlags.MDB_CREATE)
map += name -> dbi
dbi
}
}

def release(name: String): Task[Unit] = Task {
instance.synchronized {
map.get(name) match {
case Some(dbi) =>
map -= name
dbi.close()
if (map.isEmpty) {
env.close()
}
case None => // Ignore
}
}
}
}
152 changes: 152 additions & 0 deletions lmdb/src/main/scala/lightdb/lmdb/LMDBStore.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package lightdb.lmdb

import fabric.io.{JsonFormatter, JsonParser}
import fabric.rw.{Asable, Convertible}
import lightdb.aggregate.AggregateQuery
import lightdb.{Id, LightDB, Query, SearchResults}
import lightdb.doc.{Document, DocumentModel}
import lightdb.field.Field
import lightdb.materialized.MaterializedAggregate
import lightdb.store.{Store, StoreManager, StoreMode}
import lightdb.transaction.Transaction
import org.lmdbjava._
import rapid.Task

import java.nio.ByteBuffer
import java.nio.file.{Files, Path}

class LMDBStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: String,
model: Model,
instance: LMDBInstance,
val storeMode: StoreMode[Doc, Model]) extends Store[Doc, Model](name, model) {
private lazy val dbi: Dbi[ByteBuffer] = instance.get(name)

override protected def initialize(): Task[Unit] = Task(dbi)

override def prepareTransaction(transaction: Transaction[Doc]): Task[Unit] = Task {
transaction.put(
key = StateKey,
value = instance.createTransaction()
)
}

private def key(id: Id[Doc]): ByteBuffer = {
val bb = LMDBStore.keyBufferPool.get(512)
bb.put(id.bytes)
bb.flip()
}

private def value(doc: Doc): ByteBuffer = {
val json = doc.json(model.rw)
val value = JsonFormatter.Compact(json)
val bb = LMDBStore.valueBufferPool.get(value.length)
bb.put(value.getBytes)
bb.flip()
}

override def insert(doc: Doc)(implicit transaction: Transaction[Doc]): Task[Doc] = Task {
val key = this.key(doc._id)
val value = this.value(doc)
dbi.put(getState.txn, key, value, PutFlags.MDB_NOOVERWRITE)
doc
}

override def upsert(doc: Doc)(implicit transaction: Transaction[Doc]): Task[Doc] = Task {
val key = this.key(doc._id)
val value = this.value(doc)
dbi.put(getState.txn, key, value)
doc
}

override def exists(id: Id[Doc])(implicit transaction: Transaction[Doc]): Task[Boolean] = Task {
val cursor = dbi.openCursor(getState.txn) // ✅ Open a cursor for efficient key lookup
try {
cursor.get(key(id), GetOp.MDB_SET_KEY)
} finally {
cursor.close()
}
}

private def b2d(bb: ByteBuffer): Doc = {
val bytes = new Array[Byte](bb.remaining())
bb.get(bytes)
val jsonString = new String(bytes, "UTF-8")
val json = JsonParser(jsonString)
json.as[Doc](model.rw)
}

override def get[V](field: Field.UniqueIndex[Doc, V], value: V)
(implicit transaction: Transaction[Doc]): Task[Option[Doc]] = Task {
if (field == idField) {
Option(dbi.get(getState.txn, key(value.asInstanceOf[Id[Doc]]))).filterNot(_.remaining() == 0).map(b2d)
} else {
throw new UnsupportedOperationException(s"LMDBStore can only get on _id, but ${field.name} was attempted")
}
}

override def delete[V](field: Field.UniqueIndex[Doc, V], value: V)
(implicit transaction: Transaction[Doc]): Task[Boolean] = Task {
if (field == idField) {
dbi.delete(getState.txn, key(value.asInstanceOf[Id[Doc]]))
} else {
throw new UnsupportedOperationException(s"LMDBStore can only get on _id, but ${field.name} was attempted")
}
}

override def count(implicit transaction: Transaction[Doc]): Task[Int] = Task {
dbi.stat(getState.txn).entries.toInt
}

override def stream(implicit transaction: Transaction[Doc]): rapid.Stream[Doc] =
rapid.Stream.fromIterator(Task(new LMDBValueIterator(dbi, getState.txn).map(b2d)))

override def doSearch[V](query: Query[Doc, Model, V])
(implicit transaction: Transaction[Doc]): Task[SearchResults[Doc, Model, V]] =
throw new UnsupportedOperationException("LMDBStore does not support searching")

override def aggregate(query: AggregateQuery[Doc, Model])
(implicit transaction: Transaction[Doc]): rapid.Stream[MaterializedAggregate[Doc, Model]] =
throw new UnsupportedOperationException("LMDBStore does not support aggregation")

override def aggregateCount(query: AggregateQuery[Doc, Model])(implicit transaction: Transaction[Doc]): Task[Int] =
throw new UnsupportedOperationException("LMDBStore does not support aggregation")

override def truncate()(implicit transaction: Transaction[Doc]): Task[Int] = count.flatTap { _ =>
Task(dbi.drop(getState.txn))
}

override protected def doDispose(): Task[Unit] = instance.release(name)
}

object LMDBStore extends StoreManager {
private val keyBufferPool = new ByteBufferPool(512)
private val valueBufferPool = new ByteBufferPool(512)

def createInstance(directory: Path,
maxDbs: Int = 1_000, // 1,000 default
mapSize: Long = 100L * 1024 * 1024 * 1024, // 100 gig
maxReaders: Int = 128): LMDBInstance = {
if (!Files.exists(directory)) {
Files.createDirectories(directory)
}
val env = Env
.create()
.setMaxDbs(maxDbs)
.setMapSize(mapSize)
.setMaxReaders(maxReaders)
.open(directory.toFile)
LMDBInstance(env)
}

override def create[Doc <: Document[Doc], Model <: DocumentModel[Doc]](db: LightDB,
model: Model,
name: String,
storeMode: StoreMode[Doc, Model]): Store[Doc, Model] = {
new LMDBStore[Doc, Model](
name = name,
model = model,
instance = createInstance(db.directory.get.resolve(name)),
storeMode = storeMode
)
}
}
22 changes: 22 additions & 0 deletions lmdb/src/main/scala/lightdb/lmdb/LMDBTransaction.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package lightdb.lmdb

import lightdb.transaction.TransactionFeature
import org.lmdbjava.Txn
import rapid.Task

import java.nio.ByteBuffer

case class LMDBTransaction(txn: Txn[ByteBuffer]) extends TransactionFeature {
override def commit(): Task[Unit] = Task {
txn.commit()
}

override def rollback(): Task[Unit] = Task {
txn.abort()
}

override def close(): Task[Unit] = Task {
txn.commit()
txn.close()
}
}
40 changes: 40 additions & 0 deletions lmdb/src/main/scala/lightdb/lmdb/LMDBValueIterator.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package lightdb.lmdb

import org.lmdbjava.{Dbi, Txn}

import java.nio.ByteBuffer

class LMDBValueIterator(dbi: Dbi[ByteBuffer], txn: Txn[ByteBuffer]) extends Iterator[ByteBuffer] {
private val cursor = dbi.openCursor(txn)
private var current: Option[ByteBuffer] = None
private var open = true

private def advanceCursor(): Boolean = {
while (cursor.next()) {
val bb = cursor.`val`()
if (bb.remaining() > 0) {
current = Some(bb)
return true
}
}
close()
false
}

private def close(): Unit = {
open = false
cursor.close()
}

override def hasNext: Boolean = {
if (open && current.isEmpty) advanceCursor()
current.nonEmpty
}

override def next(): ByteBuffer = {
if (!hasNext) throw new NoSuchElementException("No more values in iterator")
val result = current.get
current = None
result
}
}
13 changes: 13 additions & 0 deletions lmdb/src/main/scala/lightdb/lmdb/package.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package lightdb

import lightdb.doc.Document
import lightdb.transaction.{Transaction, TransactionKey}
import org.lmdbjava.Txn

import java.nio.ByteBuffer

package object lmdb {
val StateKey: TransactionKey[LMDBTransaction] = TransactionKey("lmdbTxn")

def getState[Doc <: Document[Doc]](implicit transaction: Transaction[Doc]): LMDBTransaction = transaction(StateKey)
}
9 changes: 9 additions & 0 deletions lmdb/src/test/scala/spec/LMDBSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package spec

import lightdb.lmdb.LMDBStore
import lightdb.store.StoreManager

@EmbeddedTest
class LMDBSpec extends AbstractKeyValueSpec {
override def storeManager: StoreManager = LMDBStore
}

0 comments on commit 4e09213

Please sign in to comment.