Skip to content

Commit

Permalink
567 optional iouring (#777)
Browse files Browse the repository at this point in the history
* add suport for optional iouring

* update changelog

* add feature for async-io

* make iodriver single per backend

* reduce number of develop changes

* style fixes

* style fixes

* style fixes

* style fixes

* Update bob-backend/src/pearl/core.rs

* fix discussions

* Update bob-backend/src/pearl/core.rs

* Update bob-backend/src/pearl/core.rs

* Update bob-backend/src/pearl/disk_controller.rs

* [567] Update CHANGELOG.md

* [567] Fix build

* [567] Fix build
  • Loading branch information
idruzhitskiy authored May 16, 2023
1 parent bcb623d commit 0151283
Show file tree
Hide file tree
Showing 11 changed files with 104 additions and 65 deletions.
20 changes: 12 additions & 8 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@ name: build

on:
pull_request:
types: [ opened, synchronize, reopened, ready_for_review, labeled, unlabeled ]
types: [opened, synchronize, reopened, ready_for_review, labeled, unlabeled]
push:
branches: [ master ]

jobs:
branches: [master]

jobs:
build:
runs-on: ubuntu-latest
strategy:
Expand All @@ -17,7 +16,7 @@ jobs:
TARGET: "x86_64-unknown-linux-musl"
BUILD_PROFILE: "release"
steps:
- name: Checks-out repository
- name: Checks-out repository
uses: actions/checkout@v2
- name: Set env variables and install packages
run: |
Expand Down Expand Up @@ -45,13 +44,18 @@ jobs:
with:
command: build
args: --profile=${{ env.BUILD_PROFILE }} --target=${{ env.TARGET }}
- name: Build release with async io
uses: actions-rs/cargo@v1
with:
command: build
args: --features async-io --profile=${{ env.BUILD_PROFILE }} --target=${{ env.TARGET }}
- name: Check bobd version
run: target/${{ env.TARGET }}/${{ env.BUILD_PROFILE_DIR }}/bobd --version

build-alpine-image:
runs-on: ubuntu-latest
steps:
- name: Checks-out repository
- name: Checks-out repository
uses: actions/checkout@v2
- name: Set env
run: |
Expand All @@ -66,7 +70,7 @@ jobs:
build-ubuntu-image:
runs-on: ubuntu-latest
steps:
- name: Checks-out repository
- name: Checks-out repository
uses: actions/checkout@v2
- name: Set env
run: |
Expand All @@ -76,4 +80,4 @@ jobs:
with:
context: .
file: dockerfiles/ubuntu/Dockerfile
build-args: BOB_COMMIT_HASH=${BOB_COMMIT_HASH}
build-args: BOB_COMMIT_HASH=${BOB_COMMIT_HASH}
14 changes: 9 additions & 5 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,15 @@ name: tests

on:
pull_request:
types: [ opened, synchronize, reopened, ready_for_review ]
types: [opened, synchronize, reopened, ready_for_review]
push:
branches: [ master ]

jobs:
branches: [master]

jobs:
tests:
runs-on: ubuntu-latest
steps:
- name: Checks-out repository
- name: Checks-out repository
uses: actions/checkout@v2
- name: Install latest stable
uses: actions-rs/toolchain@v1
Expand All @@ -25,3 +24,8 @@ jobs:
with:
command: test
args: --features testing
- name: Tests async-io
uses: actions-rs/cargo@v1
with:
command: test
args: --features testing,async-io
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ Bob versions changelog

#### Changed
- Avoid Pearl Storage clone (#791)
- Make iouring optional (#567)

#### Fixed

Expand Down
1 change: 1 addition & 0 deletions bob-apps/bin/blob_recovery_tool/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ macro_rules! sized_key {
impl<'a> KeyTrait<'a> for $t {
type Ref = $r<'a>;
const LEN: u16 = $n;
const MEM_SIZE: usize = std::mem::size_of::<Vec<u8>>() + $n;
}

impl Default for $t {
Expand Down
3 changes: 3 additions & 0 deletions bob-backend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ features = []
[dev-dependencies]
criterion = "0.4"

[features]
async-io = ["pearl/async-io-rio"]

[[bench]]
name = "key_cmp_benchmark"
harness = false
6 changes: 3 additions & 3 deletions bob-backend/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ pub(crate) mod prelude {
pub use chrono::{DateTime, Datelike, Duration as ChronoDuration, NaiveDateTime, Utc};
pub use futures::{stream::FuturesUnordered, StreamExt, TryFutureExt};
pub use pearl::{
filter::hierarchical::*, filter::traits::*, filter::Config as BloomConfig, rio, Builder,
Error as PearlError, ErrorKind as PearlErrorKind, Key as KeyTrait, RefKey as RefKeyTrait,
Storage,
filter::hierarchical::*, filter::traits::*, filter::Config as BloomConfig, Builder,
Error as PearlError, ErrorKind as PearlErrorKind, IoDriver, Key as KeyTrait,
RefKey as RefKeyTrait, Storage,
};
pub use std::{
collections::{hash_map::Entry, HashMap},
Expand Down
29 changes: 27 additions & 2 deletions bob-backend/src/pearl/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,17 @@ impl Pearl {
})?;

let run_sem = Arc::new(Semaphore::new(config.init_par_degree()));
let iodriver = get_io_driver(&config.pearl());
let data = settings
.clone()
.read_group_from_disk(config, run_sem.clone(), logger.clone())
.read_group_from_disk(config, run_sem.clone(), logger.clone(), iodriver.clone())
.await;
let disk_controllers: Arc<[_]> = Arc::from(data.as_slice());
trace!("count vdisk groups: {}", disk_controllers.len());

let alien_disk_controller = settings
.clone()
.read_alien_directory(config, run_sem, logger)
.read_alien_directory(config, run_sem, logger, iodriver)
.await;

let pearl = Self {
Expand Down Expand Up @@ -340,3 +341,27 @@ impl BackendStorage for Pearl {
group.remount(postprocessor).await
}
}

#[cfg(not(feature = "async-io"))]
fn get_io_driver(pearl_config: &PearlConfig) -> IoDriver {
if pearl_config.is_aio_enabled() {
warn!("async io feature is not enabled, ignoring aio flag from config");
}
IoDriver::new_sync()
}

#[cfg(feature = "async-io")]
fn get_io_driver(pearl_config: &PearlConfig) -> IoDriver {
let iodriver = if pearl_config.is_aio_enabled() {
info!("bob will start with AIO - async fs io api");
IoDriver::new_async()
.unwrap_or_else(|e| {
warn!("bob will start with standard sync fs io api, can't start with AIO, cause: {:?}", e);
IoDriver::new_sync()
})
} else {
info!("bob will start with standard sync fs io api, cause: async io disabled in config");
IoDriver::new_sync()
};
iodriver
}
16 changes: 9 additions & 7 deletions bob-backend/src/pearl/disk_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pub(crate) mod logger;
use crate::{core::Operation, pearl::hooks::Hooks, prelude::*};
use logger::DisksEventsLogger;

use super::holder::PearlCreationContext;
use super::Holder;
use super::{core::BackendResult, settings::Settings, utils::StartTimestampConfig, Group};

Expand All @@ -33,7 +34,6 @@ enum GroupsState {
pub struct DiskController {
disk: DiskPath,
vdisks: Vec<VDiskId>,
dump_sem: Arc<Semaphore>,
run_sem: Arc<Semaphore>,
monitor_sem: Arc<Semaphore>,
node_name: String,
Expand All @@ -44,6 +44,7 @@ pub struct DiskController {
disk_state_metric: String,
logger: DisksEventsLogger,
blobs_count_cached: Arc<AtomicU64>,
pearl_creation_context: PearlCreationContext,
}

impl DiskController {
Expand All @@ -55,13 +56,14 @@ impl DiskController {
settings: Arc<Settings>,
is_alien: bool,
logger: DisksEventsLogger,
iodriver: IoDriver,
) -> Arc<Self> {
let disk_state_metric = format!("{}.{}", DISKS_FOLDER, disk.name());
let dump_sem = Arc::new(Semaphore::new(config.disk_access_par_degree()));
let pearl_creation_context = PearlCreationContext::new(dump_sem, iodriver);
let new_dc = Self {
disk,
vdisks,
dump_sem,
run_sem,
monitor_sem: Arc::new(Semaphore::new(1)),
node_name: config.name().to_owned(),
Expand All @@ -72,6 +74,7 @@ impl DiskController {
disk_state_metric,
logger,
blobs_count_cached: Arc::new(AtomicU64::new(0)),
pearl_creation_context,
};
new_dc
.init()
Expand Down Expand Up @@ -264,7 +267,7 @@ impl DiskController {
self.disk.name().to_owned(),
path,
self.node_name.clone(),
self.dump_sem.clone(),
self.pearl_creation_context.clone(),
)
})
.collect()
Expand All @@ -275,8 +278,8 @@ impl DiskController {
let groups = settings
.collect_alien_groups(
self.disk.name().to_owned(),
self.dump_sem.clone(),
&self.node_name,
self.pearl_creation_context.clone(),
)
.await?;
trace!(
Expand Down Expand Up @@ -325,7 +328,7 @@ impl DiskController {
operation.remote_node_name().expect("Node name not found"),
operation.vdisk_id(),
&self.node_name,
self.dump_sem.clone(),
self.pearl_creation_context.clone(),
)
.await?;
write_lock_groups.push(group.clone());
Expand Down Expand Up @@ -521,7 +524,6 @@ impl DiskController {
}
}


pub(crate) async fn delete(
&self,
op: Operation,
Expand Down Expand Up @@ -588,7 +590,7 @@ impl DiskController {
}
}


pub(crate) async fn shutdown(&self) {
let futures = FuturesUnordered::new();
for group in self.groups.read().await.iter() {
Expand Down
24 changes: 12 additions & 12 deletions bob-backend/src/pearl/group.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{pearl::utils::get_current_timestamp, prelude::*};

use super::{data::Key, utils::StartTimestampConfig, Holder, Hooks};
use super::{data::Key, holder::PearlCreationContext, utils::StartTimestampConfig, Holder, Hooks};
use crate::{
core::Operation,
pearl::{core::BackendResult, settings::Settings, utils::Utils},
Expand All @@ -23,7 +23,7 @@ pub struct Group {
node_name: String,
disk_name: String,
owner_node_name: String,
dump_sem: Arc<Semaphore>,
pearl_creation_context: PearlCreationContext,
}

impl Group {
Expand All @@ -34,7 +34,7 @@ impl Group {
disk_name: String,
directory_path: PathBuf,
owner_node_name: String,
dump_sem: Arc<Semaphore>,
pearl_creation_context: PearlCreationContext,
) -> Self {
Self {
holders: Arc::new(UgradableRwLock::new(HoldersContainer::new(
Expand All @@ -48,7 +48,7 @@ impl Group {
directory_path,
disk_name,
owner_node_name,
dump_sem,
pearl_creation_context,
}
}

Expand Down Expand Up @@ -79,7 +79,7 @@ impl Group {
debug!("{}: save holders to group", self);
let mut holders = self.holders.write().await;
holders.clear();
holders.extend(new_holders).await;
holders.extend(new_holders).await;
debug!("{}: start holders", self);
Self::run_pearls(&mut holders, pp).await
}
Expand Down Expand Up @@ -180,7 +180,7 @@ impl Group {
) -> Result<(ChildId, Holder), Error> {
// importantly, only one thread can hold an upgradable lock at a time
let holders = self.holders.upgradable_read().await;

let created_holder_index = Self::find_actual_holder(&holders, data_timestamp).await;
Ok(if let Ok(index_and_holder) = created_holder_index {
index_and_holder
Expand All @@ -192,7 +192,7 @@ impl Group {
"pearl init failed",
self.settings.config().settings().create_pearl_wait_delay(),
).await?;
debug!("backend pearl group save pearl storage prepared");
debug!("backend pearl group save pearl storage prepared");
let mut holders = RwLockUpgradableReadGuard::upgrade(holders).await;
let new_index = holders.push(pearl.clone()).await;
debug!("group create write pearl holder inserted, index {:?}", new_index);
Expand Down Expand Up @@ -334,7 +334,7 @@ impl Group {
Ok(exist)
}


pub async fn delete(
&self,
key: BobKey,
Expand Down Expand Up @@ -365,12 +365,12 @@ impl Group {
async fn delete_in_actual_holder(&self, holder: (ChildId, Holder), key: BobKey, meta: &BobMeta) -> Result<u64, Error> {
// In actual holder we delete with force_delete = true
let delete_count = Self::delete_common(holder.1.clone(), key, meta, true).await?;
// We need to add marker record to alien regardless of record presence
// We need to add marker record to alien regardless of record presence
self.holders
.read()
.await
.add_to_parents(holder.0, &Key::from(key));

Ok(delete_count)
}

Expand Down Expand Up @@ -417,8 +417,8 @@ impl Group {
result
}
}



pub fn holders(&self) -> Arc<UgradableRwLock<HoldersContainer>> {
self.holders.clone()
}
Expand Down Expand Up @@ -501,7 +501,7 @@ impl Group {
self.vdisk_id,
path,
config,
self.dump_sem.clone(),
self.pearl_creation_context.clone(),
)
}

Expand Down
Loading

0 comments on commit 0151283

Please sign in to comment.