Skip to content

Commit

Permalink
Add bulk writing to indexer with custom memory directory
Browse files Browse the repository at this point in the history
  • Loading branch information
ChillFish8 committed Dec 10, 2024
1 parent 78af882 commit d782e01
Show file tree
Hide file tree
Showing 9 changed files with 265 additions and 22 deletions.
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ ahash = "0.8.11"
parking_lot = "0.12.3"
bytemuck = "1.19"
cityhasher = "0.1.0"
stable_deref_trait = "1.2.0"

# Dev Dependencies
rstest = "0.23.0"
Expand Down
8 changes: 7 additions & 1 deletion lnx-fs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,13 @@ fnv = { workspace = true }
ahash = { workspace = true }
parking_lot = { workspace = true }

tempfile = { workspace = true, optional = true }

[dev-dependencies]
tracing-subscriber = { workspace = true }
tokio = { workspace = true }
tempfile = { workspace = true }
tempfile = { workspace = true }

[features]
# Enables testing utilities like `VirtualFileSystem::create_for_test()`.
test-utils = ["tempfile"]
20 changes: 17 additions & 3 deletions lnx-fs/src/service.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::env::temp_dir;
use std::io;
use std::path::PathBuf;
use std::sync::Arc;
Expand Down Expand Up @@ -70,6 +71,15 @@ impl VirtualFileSystem {
})
}

#[cfg(feature = "test-utils")]
pub async fn create_for_test() -> Result<(Self, tempfile::TempDir), FileSystemError>
{
let rt_options = RuntimeOptions::builder().num_threads(1).build();
let dir = tempfile::TempDir::new()?;
let fs = Self::mount(dir.path().to_path_buf(), rt_options).await?;
Ok((fs, dir))
}

/// Returns a clone of the [SharedBucket] if it exists.
pub fn bucket(&self, bucket: &str) -> Option<SharedBucket> {
self.buckets.read().get(bucket).cloned()
Expand All @@ -78,18 +88,22 @@ impl VirtualFileSystem {
/// Creates a new bucket with the given [BucketCreateOptions].
///
/// Returns an error if the bucket already exists (on disk.)
pub async fn create_bucket(&self, name: &str) -> Result<(), FileSystemError> {
pub async fn create_bucket(
&self,
name: &str,
) -> Result<SharedBucket, FileSystemError> {
let options = BucketCreateOptions::builder()
.name(name)
.bucket_path(self.mount_point.join(name))
.build();

let bucket = Bucket::create(options, self.runtime.clone()).await?;
let shared_bucket = SharedBucket::new(bucket);

let mut lock = self.buckets.write();
lock.insert(bucket.name().to_string(), SharedBucket::new(bucket));
lock.insert(shared_bucket.name().to_string(), shared_bucket.clone());

Ok(())
Ok(shared_bucket)
}

/// Attempts to delete the bucket with the given name.
Expand Down
10 changes: 9 additions & 1 deletion lnx-tantivy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@ edition = "2021"
thiserror = { workspace = true }
tantivy = { workspace = true }
tracing = { workspace = true }
parking_lot = { workspace = true }
bytes = { workspace = true }
stable_deref_trait = { workspace = true }

lnx-doc = { path = "../lnx-doc" }
lnx-fs = { path = "../lnx-fs" }
lnx-fs = { path = "../lnx-fs" }

[dev-dependencies]
tokio = { workspace = true }
ulid = { workspace = true }
lnx-fs = { path = "../lnx-fs", features = ["test-utils"] }
138 changes: 138 additions & 0 deletions lnx-tantivy/src/directory/memory.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
use std::collections::BTreeMap;
use std::fmt::{Debug, Formatter};
use std::mem;
use std::ops::Deref;
use std::path::{Path, PathBuf};
use std::sync::Arc;

use bytes::{Bytes, BytesMut};
use parking_lot::Mutex;
use tantivy::directory::error::{DeleteError, OpenReadError, OpenWriteError};
use tantivy::directory::{
AntiCallToken,
FileHandle,
OwnedBytes,
TerminatingWrite,
WatchCallback,
WatchCallbackList,
WatchHandle,
WritePtr,
};

type State = Arc<Mutex<BTreeMap<PathBuf, Bytes>>>;

#[derive(Clone, Default)]
/// A [tantivy::Directory] implementation that holds all data in memory.
///
/// Unlike [tantivy::directory::RamDirectory], this directory allows you to
/// retrieve the internal [Bytes] that make up the object allowing us to
/// avoid some additional copies when writing segments.
pub struct MemoryDirectory {
inner: State,
watch_callbacks: Arc<WatchCallbackList>,
}

impl MemoryDirectory {
/// Gets the bytes for a file with the given path.
pub fn get(&self, path: &Path) -> Result<Bytes, OpenReadError> {
self.inner
.lock()
.get(path)
.cloned()
.ok_or_else(|| OpenReadError::FileDoesNotExist(path.to_path_buf()))
}
}

impl Debug for MemoryDirectory {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "MemoryDirectory")
}
}

impl tantivy::Directory for MemoryDirectory {
fn get_file_handle(
&self,
path: &Path,
) -> Result<Arc<dyn FileHandle>, OpenReadError> {
let data = self.get(path)?;
let handle = OwnedBytes::new(BytesWrapper(data));
Ok(Arc::new(handle))
}

fn delete(&self, path: &Path) -> Result<(), DeleteError> {
self.inner.lock().remove(path);
Ok(())
}

fn exists(&self, path: &Path) -> Result<bool, OpenReadError> {
Ok(self.inner.lock().contains_key(path))
}

fn open_write(&self, path: &Path) -> Result<WritePtr, OpenWriteError> {
let writer = MemoryWriter {
path: path.to_path_buf(),
state: self.inner.clone(),
inner_buffer: BytesMut::with_capacity(8 << 10),
};
Ok(WritePtr::with_capacity(2 << 10, Box::new(writer)))
}

fn atomic_read(&self, path: &Path) -> Result<Vec<u8>, OpenReadError> {
let data = self.get(path)?;
Ok(data.to_vec())
}

fn atomic_write(&self, path: &Path, data: &[u8]) -> std::io::Result<()> {
self.inner
.lock()
.insert(path.to_path_buf(), Bytes::copy_from_slice(data));
Ok(())
}

fn sync_directory(&self) -> std::io::Result<()> {
self.watch_callbacks.broadcast();
Ok(())
}

fn watch(&self, watch_callback: WatchCallback) -> tantivy::Result<WatchHandle> {
Ok(self.watch_callbacks.subscribe(watch_callback))
}
}

struct MemoryWriter {
path: PathBuf,
inner_buffer: BytesMut,
state: State,
}

impl std::io::Write for MemoryWriter {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.inner_buffer.extend_from_slice(buf);
Ok(buf.len())
}

fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}

impl TerminatingWrite for MemoryWriter {
fn terminate_ref(&mut self, _: AntiCallToken) -> std::io::Result<()> {
let buffer = mem::take(&mut self.inner_buffer);
self.state.lock().insert(self.path.clone(), buffer.freeze());
Ok(())
}
}

struct BytesWrapper(Bytes);

impl Deref for BytesWrapper {
type Target = [u8];

#[inline]
fn deref(&self) -> &Self::Target {
self.0.as_ref()
}
}

unsafe impl stable_deref_trait::StableDeref for BytesWrapper {}
3 changes: 3 additions & 0 deletions lnx-tantivy/src/directory/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
mod memory;

pub use self::memory::MemoryDirectory;
Loading

0 comments on commit d782e01

Please sign in to comment.