Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: simplify/cleanup catalog API #26145

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 2 additions & 35 deletions influxdb3_catalog/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use influxdb3_id::{
};
use iox_time::{Time, TimeProvider};
use object_store::ObjectStore;
use observability_deps::tracing::{debug, info, trace, warn};
use observability_deps::tracing::{debug, trace, warn};
use parking_lot::RwLock;
use schema::{Schema, SchemaBuilder};
use serde::{Deserialize, Serialize};
Expand All @@ -25,8 +25,7 @@ pub use schema::{InfluxColumnType, InfluxFieldType};
pub use update::{DatabaseCatalogTransaction, Prompt};

use crate::log::{
CreateDatabaseLog, DatabaseBatch, DatabaseCatalogOp, NodeBatch, NodeCatalogOp, NodeMode,
RegisterNodeLog,
DatabaseBatch, DatabaseCatalogOp, NodeBatch, NodeCatalogOp, NodeMode, RegisterNodeLog,
};
use crate::object_store::ObjectStoreCatalog;
use crate::resource::CatalogResource;
Expand Down Expand Up @@ -228,38 +227,6 @@ impl Catalog {
self.inner.read().databases.next_id()
}

pub(crate) fn db_or_create(
&self,
db_name: &str,
now_time_ns: i64,
) -> Result<(Arc<DatabaseSchema>, Option<CatalogBatch>)> {
match self.db_schema(db_name) {
Some(db) => Ok((db, None)),
None => {
let mut inner = self.inner.write();

if inner.database_count() >= Self::NUM_DBS_LIMIT {
return Err(CatalogError::TooManyDbs);
}

info!(database_name = db_name, "creating new database");
let db_id = inner.databases.get_and_increment_next_id();
let db_name = db_name.into();
let db = Arc::new(DatabaseSchema::new(db_id, Arc::clone(&db_name)));
let batch = CatalogBatch::database(
now_time_ns,
db.id,
db.name(),
vec![DatabaseCatalogOp::CreateDatabase(CreateDatabaseLog {
database_id: db.id,
database_name: Arc::clone(&db.name),
})],
);
Ok((db, Some(batch)))
}
}
}

pub fn db_name_to_id(&self, db_name: &str) -> Option<DbId> {
self.inner.read().databases.name_to_id(db_name)
}
Expand Down
61 changes: 38 additions & 23 deletions influxdb3_catalog/src/catalog/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ impl Catalog {
node_id: &str,
core_count: u64,
mode: Vec<NodeMode>,
) -> Result<Option<OrderedCatalogBatch>> {
) -> Result<OrderedCatalogBatch> {
info!(node_id, core_count, mode = ?mode, "register node");
self.catalog_update_with_retry(|| {
let time_ns = self.time_provider.now().timestamp_nanos();
Expand Down Expand Up @@ -155,20 +155,38 @@ impl Catalog {
.await
}

pub async fn create_database(&self, name: &str) -> Result<Option<OrderedCatalogBatch>> {
pub async fn create_database(&self, name: &str) -> Result<OrderedCatalogBatch> {
info!(name, "create database");
self.catalog_update_with_retry(|| {
let (_, Some(batch)) =
self.db_or_create(name, self.time_provider.now().timestamp_nanos())?
else {
let inner = self.inner.read();

if inner.databases.contains_name(name) {
return Err(CatalogError::AlreadyExists);
};
Ok(batch)
}
if inner.database_count() >= Self::NUM_DBS_LIMIT {
return Err(CatalogError::TooManyDbs);
}
drop(inner);

let mut inner = self.inner.write();

let db_id = inner.databases.get_and_increment_next_id();
let db_name = name.into();
let db = Arc::new(DatabaseSchema::new(db_id, Arc::clone(&db_name)));
Ok(CatalogBatch::database(
self.time_provider.now().timestamp_nanos(),
db.id,
db.name(),
vec![DatabaseCatalogOp::CreateDatabase(CreateDatabaseLog {
database_id: db.id,
database_name: Arc::clone(&db.name),
})],
))
})
.await
}

pub async fn soft_delete_database(&self, name: &str) -> Result<Option<OrderedCatalogBatch>> {
pub async fn soft_delete_database(&self, name: &str) -> Result<OrderedCatalogBatch> {
info!(name, "soft delete database");
self.catalog_update_with_retry(|| {
let Some(db) = self.db_schema(name) else {
Expand Down Expand Up @@ -201,7 +219,7 @@ impl Catalog {
table_name: &str,
tags: &[impl AsRef<str> + Send + Sync],
fields: &[(impl AsRef<str> + Send + Sync, FieldDataType)],
) -> Result<Option<OrderedCatalogBatch>> {
) -> Result<OrderedCatalogBatch> {
info!(db_name, table_name, "create table");
self.catalog_update_with_retry(|| {
let mut txn = self.begin(db_name)?;
Expand All @@ -215,7 +233,7 @@ impl Catalog {
&self,
db_name: &str,
table_name: &str,
) -> Result<Option<OrderedCatalogBatch>> {
) -> Result<OrderedCatalogBatch> {
info!(db_name, table_name, "soft delete database");
self.catalog_update_with_retry(|| {
let Some(db) = self.db_schema(db_name) else {
Expand Down Expand Up @@ -249,7 +267,7 @@ impl Catalog {
columns: &[impl AsRef<str> + Send + Sync],
max_cardinality: MaxCardinality,
max_age_seconds: MaxAge,
) -> Result<Option<OrderedCatalogBatch>> {
) -> Result<OrderedCatalogBatch> {
info!(db_name, table_name, cache_name = ?cache_name, "create distinct cache");
self.catalog_update_with_retry(|| {
let Some(db) = self.db_schema(db_name) else {
Expand Down Expand Up @@ -329,7 +347,7 @@ impl Catalog {
db_name: &str,
table_name: &str,
cache_name: &str,
) -> Result<Option<OrderedCatalogBatch>> {
) -> Result<OrderedCatalogBatch> {
info!(db_name, table_name, cache_name, "delete distinct cache");
self.catalog_update_with_retry(|| {
let Some(db) = self.db_schema(db_name) else {
Expand Down Expand Up @@ -368,7 +386,7 @@ impl Catalog {
value_columns: Option<&[impl AsRef<str> + Send + Sync]>,
count: LastCacheSize,
ttl: LastCacheTtl,
) -> Result<Option<OrderedCatalogBatch>> {
) -> Result<OrderedCatalogBatch> {
info!(db_name, table_name, cache_name = ?cache_name, "create last cache");
self.catalog_update_with_retry(|| {
let Some(db) = self.db_schema(db_name) else {
Expand Down Expand Up @@ -482,7 +500,7 @@ impl Catalog {
db_name: &str,
table_name: &str,
cache_name: &str,
) -> Result<Option<OrderedCatalogBatch>> {
) -> Result<OrderedCatalogBatch> {
info!(db_name, table_name, cache_name, "delete last cache");
self.catalog_update_with_retry(|| {
let Some(db) = self.db_schema(db_name) else {
Expand Down Expand Up @@ -521,7 +539,7 @@ impl Catalog {
trigger_settings: TriggerSettings,
trigger_arguments: &Option<HashMap<String, String>>,
disabled: bool,
) -> Result<Option<OrderedCatalogBatch>> {
) -> Result<OrderedCatalogBatch> {
info!(db_name, trigger_name, "create processing engine trigger");
self.catalog_update_with_retry(|| {
let Some(mut db) = self.db_schema(db_name) else {
Expand Down Expand Up @@ -559,7 +577,7 @@ impl Catalog {
db_name: &str,
trigger_name: &str,
force: bool,
) -> Result<Option<OrderedCatalogBatch>> {
) -> Result<OrderedCatalogBatch> {
info!(db_name, trigger_name, "delete processing engine trigger");
self.catalog_update_with_retry(|| {
let Some(db) = self.db_schema(db_name) else {
Expand Down Expand Up @@ -591,7 +609,7 @@ impl Catalog {
&self,
db_name: &str,
trigger_name: &str,
) -> Result<Option<OrderedCatalogBatch>> {
) -> Result<OrderedCatalogBatch> {
info!(db_name, trigger_name, "enable processing engine trigger");
self.catalog_update_with_retry(|| {
let Some(db) = self.db_schema(db_name) else {
Expand Down Expand Up @@ -622,7 +640,7 @@ impl Catalog {
&self,
db_name: &str,
trigger_name: &str,
) -> Result<Option<OrderedCatalogBatch>> {
) -> Result<OrderedCatalogBatch> {
info!(db_name, trigger_name, "disable processing engine trigger");
self.catalog_update_with_retry(|| {
let Some(db) = self.db_schema(db_name) else {
Expand All @@ -649,10 +667,7 @@ impl Catalog {
.await
}

async fn catalog_update_with_retry<F>(
&self,
batch_creator_fn: F,
) -> Result<Option<OrderedCatalogBatch>>
async fn catalog_update_with_retry<F>(&self, batch_creator_fn: F) -> Result<OrderedCatalogBatch>
where
F: Fn() -> Result<CatalogBatch>,
{
Expand All @@ -677,7 +692,7 @@ impl Catalog {
self.apply_ordered_catalog_batch(&ordered_batch, &permit);
self.background_checkpoint(&ordered_batch);
self.broadcast_update(ordered_batch.clone().into_batch());
return Ok(Some(ordered_batch));
return Ok(ordered_batch);
}
}
}
Expand Down
39 changes: 13 additions & 26 deletions influxdb3_server/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -841,7 +841,7 @@ where
max_cardinality,
max_age,
} = args;
match self
let batch = self
.write_buffer
.catalog()
.create_distinct_cache(
Expand All @@ -852,18 +852,12 @@ where
max_cardinality,
max_age,
)
.await
{
Ok(Some(batch)) => Response::builder()
.status(StatusCode::CREATED)
.body(Body::from(serde_json::to_vec(&batch)?))
.map_err(Into::into),
Ok(None) => Response::builder()
.status(StatusCode::NO_CONTENT)
.body(Body::empty())
.map_err(Into::into),
Err(error) => Err(error.into()),
}
.await?;

Response::builder()
.status(StatusCode::CREATED)
.body(Body::from(serde_json::to_vec(&batch)?))
.map_err(Into::into)
}

/// Delete a distinct value cache entry with the given [`DistinctCacheDeleteRequest`] parameters
Expand Down Expand Up @@ -898,7 +892,7 @@ where
count,
ttl,
} = self.read_body_json(req).await?;
match self
let batch = self
.write_buffer
.catalog()
.create_last_cache(
Expand All @@ -910,18 +904,11 @@ where
count,
ttl,
)
.await
{
Ok(Some(batch)) => Response::builder()
.status(StatusCode::CREATED)
.body(Body::from(serde_json::to_vec(&batch)?))
.map_err(Into::into),
Ok(None) => Response::builder()
.status(StatusCode::NO_CONTENT)
.body(Body::empty())
.map_err(Into::into),
Err(error) => Err(error.into()),
}
.await?;
Response::builder()
.status(StatusCode::CREATED)
.body(Body::from(serde_json::to_vec(&batch)?))
.map_err(Into::into)
}

/// Delete a last cache entry with the given [`LastCacheDeleteRequest`] parameters
Expand Down