Skip to content

Commit

Permalink
Continued work on LMDB support
Browse files Browse the repository at this point in the history
  • Loading branch information
darkfrog26 committed Feb 25, 2025
1 parent 13ac7f9 commit 8479674
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 107 deletions.
6 changes: 4 additions & 2 deletions core/src/main/scala/lightdb/store/split/SplitStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ case class SplitStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](overrid
storage: Store[Doc, Model],
searching: Store[Doc, Model],
storeMode: StoreMode[Doc, Model]) extends Store[Doc, Model](name, model) {
override protected def initialize(): Task[Unit] = storage.init.next(searching.init)
override protected def initialize(): Task[Unit] = {
storage.init.and(searching.init).unit
}

override def prepareTransaction(transaction: Transaction[Doc]): Task[Unit] =
storage.prepareTransaction(transaction).and(searching.prepareTransaction(transaction)).unit
Expand Down Expand Up @@ -110,4 +112,4 @@ case class SplitStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](overrid

object SplitStore {
val NoSearchUpdates: TransactionKey[Boolean] = TransactionKey[Boolean]("splitStoreNoSearchUpdates")
}
}
44 changes: 13 additions & 31 deletions lmdb/src/main/scala/lightdb/lmdb/LMDBStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ class LMDBStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: String,
val storeMode: StoreMode[Doc, Model]) extends Store[Doc, Model](name, model) {
private lazy val dbi: Dbi[ByteBuffer] = instance.get(name)

override protected def initialize(): Task[Unit] = Task(dbi)
override protected def initialize(): Task[Unit] = Task {
dbi
}

override def prepareTransaction(transaction: Transaction[Doc]): Task[Unit] = Task {
transaction.put(
Expand All @@ -47,9 +49,6 @@ class LMDBStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: String,
private def withWrite[Return](f: Txn[ByteBuffer] => Task[Return]): Task[Return] =
instance.transactionManager.withWrite(f)

private def withRead[Return](f: Txn[ByteBuffer] => Task[Return]): Task[Return] =
instance.transactionManager.withRead(f)

override def insert(doc: Doc)(implicit transaction: Transaction[Doc]): Task[Doc] = withWrite { txn =>
Task {
dbi.put(txn, key(doc._id), value(doc), PutFlags.MDB_NOOVERWRITE)
Expand All @@ -64,16 +63,8 @@ class LMDBStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: String,
}
}

override def exists(id: Id[Doc])(implicit transaction: Transaction[Doc]): Task[Boolean] = withRead { txn =>
Task {
val cursor = dbi.openCursor(txn) // ✅ Open a cursor for efficient key lookup
try {
cursor.get(key(id), GetOp.MDB_SET_KEY)
} finally {
cursor.close()
}
}
}
override def exists(id: Id[Doc])(implicit transaction: Transaction[Doc]): Task[Boolean] =
instance.transactionManager.exists(dbi, key(id))

private def b2d(bb: ByteBuffer): Doc = {
val bytes = new Array[Byte](bb.remaining())
Expand All @@ -84,14 +75,10 @@ class LMDBStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: String,
}

override def get[V](field: Field.UniqueIndex[Doc, V], value: V)
(implicit transaction: Transaction[Doc]): Task[Option[Doc]] = withRead { txn =>
Task {
if (field == idField) {
Option(dbi.get(txn, key(value.asInstanceOf[Id[Doc]]))).filterNot(_.remaining() == 0).map(b2d)
} else {
throw new UnsupportedOperationException(s"LMDBStore can only get on _id, but ${field.name} was attempted")
}
}
(implicit transaction: Transaction[Doc]): Task[Option[Doc]] = if (field == idField) {
instance.transactionManager.get(dbi, key(value.asInstanceOf[Id[Doc]])).map(_.map(b2d))
} else {
throw new UnsupportedOperationException(s"LMDBStore can only get on _id, but ${field.name} was attempted")
}

override def delete[V](field: Field.UniqueIndex[Doc, V], value: V)
Expand All @@ -105,16 +92,11 @@ class LMDBStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: String,
}
}

override def count(implicit transaction: Transaction[Doc]): Task[Int] = withRead { txn =>
Task {
dbi.stat(txn).entries.toInt
}
}
override def count(implicit transaction: Transaction[Doc]): Task[Int] =
instance.transactionManager.count(dbi)

override def stream(implicit transaction: Transaction[Doc]): rapid.Stream[Doc] =
rapid.Stream.fromIterator(Task(instance.transactionManager.withReadIterator { txn =>
new LMDBValueIterator(dbi, txn).map(b2d)
}))
rapid.Stream.fromIterator(instance.transactionManager.withReadIterator(txn => new LMDBValueIterator(dbi, txn).map(b2d)))

override def doSearch[V](query: Query[Doc, Model, V])
(implicit transaction: Transaction[Doc]): Task[SearchResults[Doc, Model, V]] =
Expand Down Expand Up @@ -188,4 +170,4 @@ object LMDBStore extends StoreManager {
storeMode = storeMode
)
}
}
}
160 changes: 86 additions & 74 deletions lmdb/src/main/scala/lightdb/lmdb/LMDBTransactionManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,103 +2,115 @@ package lightdb.lmdb

import org.lmdbjava._
import java.nio.ByteBuffer
import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
import java.util.concurrent.atomic.AtomicInteger
import rapid.Task
import lightdb.util.ActionIterator

case class LMDBTransactionManager(env: Env[ByteBuffer]) {
private val writeTxn = new AtomicReference[Option[ManagedTxn]](None)
private val readTxn = new AtomicReference[Option[Txn[ByteBuffer]]](None)
private val writeActive = new AtomicBoolean(false) // ✅ Ensures only one write txn
case class LMDBTransactionManager(env: Env[ByteBuffer]) { manager =>
private var lastCommit: Long = 0L
private val activeWrites = new AtomicInteger(0)
@volatile private var commitRequested = false
@volatile private var writeTransaction: Txn[ByteBuffer] = _

/** Inner class to track commit state manually */
private case class ManagedTxn(txn: Txn[ByteBuffer], var committed: Boolean = false)
/** Get or create a new write transaction. Fails fast if commit is in progress. */
private def getWrite(): Txn[ByteBuffer] = synchronized {
if (commitRequested) {
throw new IllegalStateException("A commit is already in progress. Cannot acquire a new write transaction.")
}

/** Safely execute a function within a **read transaction** */
def withRead[Return](f: Txn[ByteBuffer] => Task[Return]): Task[Return] = {
Task.defer {
val txn = readTxn.updateAndGet {
case Some(activeTxn) => Some(activeTxn) // Reuse existing read transaction
case None =>
val newTxn = env.txnRead()
Some(newTxn)
}.get

f(txn).guarantee(Task {
if (readTxn.get().contains(txn)) {
txn.close()
readTxn.set(None)
}
})
activeWrites.incrementAndGet()

if (writeTransaction == null) {
writeTransaction = env.txnWrite()
} else {
}
}

/** Provide an iterator wrapped in a read transaction */
def withReadIterator[T](iteratorProvider: Txn[ByteBuffer] => Iterator[T]): Iterator[T] = {
val txn = env.txnRead()
readTxn.set(Some(txn))
writeTransaction
}

val underlying = iteratorProvider(txn)
/** Release a write transaction safely */
private def releaseWrite(txn: Txn[ByteBuffer]): Unit = synchronized {
val remainingWrites = activeWrites.decrementAndGet()

ActionIterator(
underlying,
onNext = _ => (), // No special action on next
onClose = () => {
if (readTxn.get().contains(txn)) {
txn.close()
readTxn.set(None)
}
}
)
if (remainingWrites == 0 || commitRequested) {
notifyAll()
} else {
}
}

/** Safely execute a function within an **existing write transaction** or create one if needed */
/** Execute a function within a write transaction */
def withWrite[Return](f: Txn[ByteBuffer] => Task[Return]): Task[Return] = {
Task.defer {
val managedTxn = writeTxn.updateAndGet {
case Some(activeTxn) => Some(activeTxn) // ✅ Reuse active write transaction
case None =>
val newTxn = ManagedTxn(env.txnWrite())
writeActive.set(true)
Some(newTxn)
}.get

f(managedTxn.txn)
val txn = getWrite()
f(txn).guarantee(Task { releaseWrite(txn) })
}
}

/** Explicitly commit and close the active write transaction */
def commit(): Task[Unit] = Task.defer {
writeTxn.getAndSet(None) match {
case Some(managedTxn) if !managedTxn.committed =>
try {
managedTxn.txn.commit()
managedTxn.committed = true
Task.unit
} finally {
managedTxn.txn.close()
writeActive.set(false) // ✅ Release write lock
refreshReadTxn() // ✅ Ensure fresh read transactions
/** Commit the active write transaction safely */
def commit(): Task[Unit] = Task {
synchronized {
if (writeTransaction == null) {
return Task.unit
}

commitRequested = true

val timeout = System.currentTimeMillis() + 5000 // 5s timeout
var waitCount = 0
while (activeWrites.get() > 0 && System.currentTimeMillis() < timeout) {
waitCount += 1
Thread.`yield`() // Allow other threads to proceed before waiting
wait(100)
}

try {
if (writeTransaction != null) {
writeTransaction.commit()
writeTransaction.close()
writeTransaction = null
}
case _ => Task.unit // Ignore if there is no active transaction
lastCommit = System.currentTimeMillis()
} finally {
commitRequested = false
notifyAll()
}
}
}

/** Close and refresh the read transaction after a write commit */
private def refreshReadTxn(): Unit = {
val txnOpt = readTxn.getAndSet(None)
if (txnOpt.isDefined) {
/** Provide a read iterator wrapped in a write transaction */
def withReadIterator[T](iteratorProvider: Txn[ByteBuffer] => Iterator[T]): Task[Iterator[T]] = Task {
val txn = getWrite()

val underlying = iteratorProvider(txn)

ActionIterator(
underlying,
onNext = _ => (),
onClose = () => releaseWrite(txn)
)
}

/** Check if a key exists in the database */
def exists(dbi: Dbi[ByteBuffer], key: ByteBuffer): Task[Boolean] = withWrite { txn =>
Task {
val cursor = dbi.openCursor(txn)
try {
txnOpt.get.close()
} catch {
case _: Exception => println("Warning: Read transaction already closed!")
cursor.get(key, GetOp.MDB_SET_KEY)
} finally {
cursor.close()
}
}
}

/** Explicitly close all transactions (e.g., on shutdown) */
def close(): Unit = {
writeTxn.getAndSet(None).foreach(_.txn.close())
readTxn.getAndSet(None).foreach(_.close())
/** Retrieve a key-value from the database */
def get(dbi: Dbi[ByteBuffer], key: ByteBuffer): Task[Option[ByteBuffer]] = withWrite { txn =>
Task {
Option(dbi.get(txn, key)).filterNot(_.remaining() == 0)
}
}

/** Count total records in the database */
def count(dbi: Dbi[ByteBuffer]): Task[Int] = withWrite { txn =>
Task(dbi.stat(txn).entries.toInt)
}
}
}

0 comments on commit 8479674

Please sign in to comment.