Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref(buffering): Add BufferService with SQLite backend #1920

Merged
merged 39 commits into from
Mar 30, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
566cad9
ref(buffering): Add BufferService with SQLite backend
olksdr Mar 9, 2023
baa323e
Add changelog entry and fix lints
olksdr Mar 13, 2023
825ffc2
Add path to db in tmp
olksdr Mar 13, 2023
d3a5dc6
sorted relay-server deps
olksdr Mar 13, 2023
ddf87c0
Merge branch 'master' into feat/buff-with-lite
olksdr Mar 13, 2023
76c102a
Merge branch 'master' into feat/buff-with-lite
olksdr Mar 20, 2023
7c4f618
Post master merge fixes
olksdr Mar 20, 2023
fe3292d
Merge branch 'master' into feat/buff-with-lite
olksdr Mar 22, 2023
601f947
Addressed some PR review comments
olksdr Mar 22, 2023
5a1c17b
Merge branch 'master' into feat/buff-with-lite
olksdr Mar 22, 2023
60902f2
Cleanup
olksdr Mar 22, 2023
e46b14f
Extend the config and use BufferPermit
olksdr Mar 22, 2023
94ceb7a
Better error messages
olksdr Mar 22, 2023
de8b39e
ref: use one service for in-memory and on disk buffering
olksdr Mar 22, 2023
74ec158
Merge branch 'master' into feat/buff-with-lite
olksdr Mar 22, 2023
dbb3b3b
Fix changelog
olksdr Mar 22, 2023
cb4e5c0
ref: remove unwrap, add another config option
olksdr Mar 23, 2023
4caed2e
Set limit on batch inserts.
olksdr Mar 23, 2023
d427ffc
Reserve permits before getting data from DB
olksdr Mar 23, 2023
c7d4de7
Request permits per key
olksdr Mar 23, 2023
1fd521b
Review comments
olksdr Mar 23, 2023
4a176fd
Address more review comments
olksdr Mar 24, 2023
d688a49
match on the reuest size
olksdr Mar 24, 2023
54d4c0a
ref: build in the migrations into the binary
olksdr Mar 24, 2023
fcd6740
reorganize the config
olksdr Mar 24, 2023
4356295
just use the provided path
olksdr Mar 24, 2023
8f73ccc
Merge branch 'master' into feat/buff-with-lite
olksdr Mar 24, 2023
ecd339c
Add docs and enabled shared cache
olksdr Mar 25, 2023
97df208
Merge branch 'master' into feat/buff-with-lite
olksdr Mar 27, 2023
b92449e
remove un-used imports
olksdr Mar 27, 2023
6bc057b
review comments: fetch all envelopes from disk till there is nothing …
olksdr Mar 27, 2023
28601e4
Merge branch 'master' into feat/buff-with-lite
olksdr Mar 28, 2023
b038237
Review comments
olksdr Mar 28, 2023
05050ce
Chunk up the envelopes with future streams
olksdr Mar 28, 2023
d6d8d06
Merge branch 'master' into feat/buff-with-lite
olksdr Mar 28, 2023
2a7e01f
fix docs
olksdr Mar 29, 2023
47f32ac
Merge branch 'master' into feat/buff-with-lite
olksdr Mar 29, 2023
69fce70
Switch to fetch using Stream to query db
olksdr Mar 29, 2023
c6a8714
small checks and logging
olksdr Mar 30, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions relay-config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -759,11 +759,6 @@ pub struct PersistentBuffer {
///
/// If not set the befault is 10737418240 bytes or 10 GB.
max_size: Option<u64>,
/// The buffer memory limit.
///
/// When provided, either `envelope_buffer_size` or `memory_limit` is used depending what is smaller.
/// If not set the default will be the half of `envelope_buffer_size`.
memory_limit: Option<usize>,
}

impl PersistentBuffer {
Expand All @@ -788,11 +783,6 @@ impl PersistentBuffer {
pub fn max_buffer_size(&self) -> u64 {
self.max_size.unwrap_or(10 * 1024 * 1024 * 1024)
}

/// The buffer memory limit.
pub fn memory_limit(&self) -> Option<usize> {
self.memory_limit
}
}

/// Controls internal caching behavior.
Expand Down
213 changes: 150 additions & 63 deletions relay-server/src/actors/project_buffer.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
use std::collections::{BTreeMap, BTreeSet};
use std::path::PathBuf;
use std::path::{Path, PathBuf};
use std::sync::Arc;

use futures::TryStreamExt;
use relay_common::ProjectKey;
use relay_config::{Config, PersistentBuffer};
use relay_log::LogError;
use relay_system::{FromMessage, Interface, Service};
use sqlx::migrate::MigrateError;
use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions};
use relay_system::{Addr, FromMessage, Interface, Service};
use sqlx::migrate::{MigrateError, Migrator};
use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions, SqliteRow};
use sqlx::{Pool, QueryBuilder, Row, Sqlite};
use tokio::sync::mpsc;

use crate::actors::project_cache::{BufferIndex, ProjectCache};
use crate::envelope::{Envelope, EnvelopeError};
use crate::utils::{BufferGuard, ManagedEnvelope};
use crate::utils::{BufferGuard, ManagedEnvelope, SemaphorePermit};

/// SQLite allocates space to hold all host parameters between 1 and the largest host parameter number used.
///
Expand Down Expand Up @@ -44,9 +45,6 @@ pub enum BufferError {

#[error("failed to run migrations")]
MigrationFailed(#[from] MigrateError),

#[error("failed to read the migrations directory")]
MissingMigrations,
}

/// This key represents the index element in the queue.
Expand Down Expand Up @@ -85,13 +83,22 @@ impl Enqueue {
/// Removes messages from the internal buffer and streams them to the sender.
#[derive(Debug)]
pub struct DequeueMany {
project_key: ProjectKey,
keys: Vec<QueueKey>,
sender: mpsc::UnboundedSender<ManagedEnvelope>,
}

impl DequeueMany {
pub fn new(keys: Vec<QueueKey>, sender: mpsc::UnboundedSender<ManagedEnvelope>) -> Self {
Self { keys, sender }
pub fn new(
project_key: ProjectKey,
keys: Vec<QueueKey>,
sender: mpsc::UnboundedSender<ManagedEnvelope>,
) -> Self {
Self {
project_key,
keys,
sender,
}
}
}

Expand Down Expand Up @@ -159,55 +166,76 @@ impl FromMessage<RemoveMany> for Buffer {
}
}

#[derive(Debug)]
struct BufferSpoolConfig {
config: PersistentBuffer,
memory_limit: usize,
db: Pool<Sqlite>,
}

/// [`Buffer`] interface implementation backed by SQLite.
#[derive(Debug)]
pub struct BufferService {
buffer: BTreeMap<QueueKey, Vec<ManagedEnvelope>>,
buffer_guard: Arc<BufferGuard>,
config: Option<PersistentBuffer>,
// this is half of the `envelope_buffer_size` defined in the [`relay_config::Config`].
memory_limit: Option<usize>,
project_cache: Addr<ProjectCache>,
spool_config: Option<BufferSpoolConfig>,
count_mem_envelopes: i64,
db: Option<Pool<Sqlite>>,
}

impl BufferService {
async fn setup(path: &PathBuf) -> Result<(), BufferError> {
let options = SqliteConnectOptions::new()
.filename(path)
.journal_mode(SqliteJournalMode::Wal)
.create_if_missing(true);

let db = SqlitePoolOptions::new().connect_with(options).await?;

let migrator = Migrator::new(Path::new("./migrations")).await?;
migrator.run(&db).await?;

Ok(())
}

/// Creates a new [`BufferService`] from the provided path to the SQLite database file.
pub async fn create(
buffer_guard: Arc<BufferGuard>,
project_cache: Addr<ProjectCache>,
config: Arc<Config>,
) -> Result<Self, BufferError> {
let mut service = Self {
buffer: BTreeMap::new(),
buffer_guard,
config: None,
memory_limit: None,
project_cache,
count_mem_envelopes: 0,
db: None,
spool_config: None,
};

// Only iof persistent buffer enabled, we create the pool and set the config.
if let Some(buffer_config) = config.cache_persistent_buffer() {
let path = buffer_config.buffer_path().to_owned();
let path = PathBuf::from("sqlite://").join(buffer_config.buffer_path());

Self::setup(&path).await?;

let options = SqliteConnectOptions::new()
.filename(PathBuf::from("sqlite://").join(&path))
.journal_mode(SqliteJournalMode::Wal)
.create_if_missing(true);
.filename(path)
.journal_mode(SqliteJournalMode::Wal);

let db = SqlitePoolOptions::new()
.max_connections(buffer_config.max_connections())
.min_connections(buffer_config.min_connections())
.connect_with(options)
.await?;

// Set the buffer memory limit, which must not be bigger then `envelope_buffer_size`.
let limit = buffer_config
.memory_limit()
.unwrap_or(config.envelope_buffer_size() / 2);
service.memory_limit = Some(limit.min(config.envelope_buffer_size()));
let spool_config = BufferSpoolConfig {
// Set the buffer memory limit, to half of the size of`envelope_buffer_size`.
memory_limit: config.envelope_buffer_size() / 2,
config: buffer_config.clone(),
db,
};

service.db = Some(db);
service.config = Some(buffer_config.to_owned());
service.spool_config = Some(spool_config);
}

Ok(service)
Expand All @@ -216,29 +244,30 @@ impl BufferService {
/// Tries to save in-memory buffer to disk.
///
/// It will spool to disk only if the persistent storage enabled in the configuration.
fn try_enqueue(&mut self) -> Result<(), BufferError> {
fn try_spool(&mut self) -> Result<(), BufferError> {
let Self {
db,
config,
memory_limit,
spool_config,
ref mut buffer,
..
} = self;

// Buffer to disk only if the DB and config are provided.
if let (Some(db), Some(config), Some(memory_limit)) = (db, config, memory_limit) {
if let Some(BufferSpoolConfig {
config,
db,
memory_limit,
}) = spool_config
{
// And if the count of in memory envelopes is over the defined max buffer size.
if self.count_mem_envelopes > *memory_limit as i64 {
// Reject all the enqueue requests if we exceed the max size of the buffer.
let current_size = std::fs::metadata(config.buffer_path())
.ok()
.map(|meta| meta.len());
if current_size.map_or(false, |size| size >= config.max_buffer_size()) {
return Err(BufferError::Full(current_size.unwrap_or_default()));
.map_or(0, |meta| meta.len());
if current_size > config.max_buffer_size() {
return Err(BufferError::Full(current_size));
}

// Do not drain and just keep the buffer around, so we do not have to allocate it
// again.
let buf = std::mem::take(buffer);
let db = db.clone();
self.count_mem_envelopes = 0;
Expand Down Expand Up @@ -292,14 +321,29 @@ impl BufferService {
Ok(())
}

/// Extreacts the envelope from the `SqliteRow`.
///
/// Reads the bytes and tries to perse them into `Envelope`.
fn extract_envelope(
row: SqliteRow,
permit: Option<SemaphorePermit>,
) -> Result<ManagedEnvelope, BufferError> {
let envelope_bytes_slice: &[u8] = row.try_get("envelope")?;
let envelope_bytes = bytes::Bytes::from(envelope_bytes_slice);
let envelope = Envelope::parse_bytes(envelope_bytes)?;
let managed_envelope =
ManagedEnvelope::new(envelope, permit.ok_or(BufferError::Overloaded)?);
Ok(managed_envelope)
}

/// Handles the enqueueing messages into the internal buffer.
async fn handle_enqueue(&mut self, message: Enqueue) -> Result<(), BufferError> {
let Enqueue {
key,
value: managed_envelope,
} = message;

self.try_enqueue()?;
self.try_spool()?;

// save to the internal buffer
self.buffer.entry(key).or_default().push(managed_envelope);
Expand All @@ -312,7 +356,14 @@ impl BufferService {
///
/// This method removes the envelopes from the buffer and stream them to the sender.
async fn handle_dequeue(&mut self, message: DequeueMany) -> Result<(), BufferError> {
olksdr marked this conversation as resolved.
Show resolved Hide resolved
let DequeueMany { keys, sender } = message;
let DequeueMany {
project_key,
mut keys,
sender,
} = message;

let mut back_keys = BTreeSet::new();

for key in &keys {
for value in self.buffer.remove(key).unwrap_or_default() {
self.count_mem_envelopes -= 1;
olksdr marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -321,32 +372,68 @@ impl BufferService {
}

// Persistent buffer is configured, lets try to get data from the disk.
if let Some(db) = &self.db {
for key in keys {
// TODO: remove hardcoded number for the limit.
if let Some(BufferSpoolConfig {
db, memory_limit, ..
}) = &self.spool_config
{
// The size of the batch per key we want to fetch from the persistent buffer.
// Should still fit into memory, but must not be too big, for one go.
let request_size: usize = (memory_limit / (keys.len() + 1) + 1).min(500);
while let Some(key) = keys.pop() {
// If the requested permits are available, let use them and fetch the envelopes.
if let Ok(mut permits) = self.buffer_guard.try_reserve(100) {
// request
if let Ok(mut permits) = self.buffer_guard.try_reserve(request_size) {
let mut envelopes = sqlx::query(
"DELETE FROM envelopes WHERE id IN (SELECT id FROM envelopes WHERE own_key = ? AND sampling_key = ? LIMIT ?) RETURNING envelope",
)
.bind(key.own_key.to_string())
.bind(key.sampling_key.to_string())
.bind(100)
.fetch(db);

while let Some(row) = envelopes.try_next().await? {
let envelope_bytes_slice: &[u8] = row.try_get("envelope")?;
let envelope_bytes = bytes::Bytes::from(envelope_bytes_slice);
let envelope = Envelope::parse_bytes(envelope_bytes)?;
let managed_envelope = ManagedEnvelope::new(
envelope,
permits.pop().ok_or(BufferError::Overloaded)?,
);

sender.send(managed_envelope).ok();
"DELETE FROM envelopes WHERE id IN (SELECT id FROM envelopes WHERE own_key = ? AND sampling_key = ? LIMIT ?) RETURNING envelope",
)
.bind(key.own_key.to_string())
.bind(key.sampling_key.to_string())
.bind(request_size as u32)
.fetch(db);

loop {
match envelopes.try_next().await {
Ok(Some(row)) => match Self::extract_envelope(row, permits.pop()) {
Ok(managed_envelope) => {
sender.send(managed_envelope).ok();
}
Err(err) => relay_log::error!(
"failed to extractt envelope from the buffer: {}",
olksdr marked this conversation as resolved.
Show resolved Hide resolved
LogError(&err)
),
},
Ok(None) => break,
Err(err) => {
relay_log::error!(
"failed to read the buffer stream from the disk: {}",
LogError(&err)
);
break;
}
}
}

// let's check if there are any data left in the db
let result = sqlx::query(
"SELECT id FROM envelopes WHERE own_key = ? AND sampling_key = ? LIMIT 1",
)
.bind(key.own_key.to_string())
.bind(key.sampling_key.to_string())
.fetch_one(db)
.await;

// Make sure to save the key which will be sent back if there are some more
// records left in the db.
if result.map_or(true, |row| !row.is_empty()) {
back_keys.insert(key);
olksdr marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
if !keys.is_empty() || !back_keys.is_empty() {
back_keys.extend(keys);
self.project_cache
.send(BufferIndex::new(project_key, back_keys))
}
}

Ok(())
Expand All @@ -364,7 +451,7 @@ impl BufferService {
count += self.buffer.remove(key).map_or(0, |k| k.len() as u64);
}

if let Some(db) = &self.db {
if let Some(BufferSpoolConfig { db, .. }) = &self.spool_config {
for key in keys {
let result =
sqlx::query("DELETE FROM envelopes where own_key = ? AND sampling_key = ?")
Expand Down Expand Up @@ -416,7 +503,7 @@ impl Drop for BufferService {
let count: usize = self.buffer.values().map(|v| v.len()).sum();
// We have envelopes in memory, try to buffer them to the disk.
if count > 0 {
if let Err(err) = self.try_enqueue() {
if let Err(err) = self.try_spool() {
relay_log::error!("failed to spool {} on shutdown: {}", count, LogError(&err));
}
}
Expand Down
Loading