Skip to content

Commit

Permalink
Merge branch 'main' into feat/chain-parallelism-mapping
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Aug 22, 2022
2 parents 802b147 + e72a6e6 commit 74352d3
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 124 deletions.
2 changes: 1 addition & 1 deletion src/storage/src/hummock/compactor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ impl Compactor {
(tracker_id, sstable_id_manager_clone),
|(tracker_id, sstable_id_manager)| {
tokio::spawn(async move {
sstable_id_manager.remove_watermark_sst_id(tracker_id).await;
sstable_id_manager.remove_watermark_sst_id(tracker_id);
});
},
);
Expand Down
7 changes: 2 additions & 5 deletions src/storage/src/hummock/local_version_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,9 +317,7 @@ impl LocalVersionManager {
let sstable_id_manager_clone = self.sstable_id_manager.clone();
tokio::spawn(async move {
for cleaned_epoch in cleaned_epochs {
sstable_id_manager_clone
.remove_watermark_sst_id(TrackerId::Epoch(cleaned_epoch))
.await;
sstable_id_manager_clone.remove_watermark_sst_id(TrackerId::Epoch(cleaned_epoch));
}
});
{
Expand Down Expand Up @@ -963,8 +961,7 @@ impl LocalVersionManager {
for cleaned_epoch in cleaned_epochs {
local_version_manager
.sstable_id_manager
.remove_watermark_sst_id(TrackerId::Epoch(cleaned_epoch))
.await;
.remove_watermark_sst_id(TrackerId::Epoch(cleaned_epoch));
}

// Notify completion of the Clear event.
Expand Down
161 changes: 43 additions & 118 deletions src/storage/src/hummock/sstable/sstable_id_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::cmp;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::future::Future;
use std::ops::{Deref, DerefMut};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
Expand All @@ -24,7 +24,7 @@ use parking_lot::Mutex;
use risingwave_hummock_sdk::{HummockEpoch, HummockSstableId, SstIdRange};
use risingwave_pb::meta::heartbeat_request::extra_info::Info;
use risingwave_rpc_client::{ExtraInfoSource, HummockMetaClient};
use tokio::sync::{Notify, RwLock};
use tokio::sync::Notify;

use crate::hummock::{HummockError, HummockResult};

Expand Down Expand Up @@ -143,26 +143,23 @@ impl SstableIdManager {
None => self.sst_id_tracker.get_next_auto_tracker_id(),
Some(epoch) => TrackerId::Epoch(epoch),
};
self.sst_id_tracker
.add_tracker(tracker_id, async {
self.map_next_sst_id(|available_sst_ids| available_sst_ids.peek_next_sst_id())
.await
})
let next_sst_id = self
.map_next_sst_id(|available_sst_ids| available_sst_ids.peek_next_sst_id())
.await?;
self.sst_id_tracker.add_tracker(tracker_id, next_sst_id);
Ok(tracker_id)
}

pub async fn remove_watermark_sst_id(&self, tracker_id: TrackerId) {
self.sst_id_tracker.remove_tracker(tracker_id).await;
pub fn remove_watermark_sst_id(&self, tracker_id: TrackerId) {
self.sst_id_tracker.remove_tracker(tracker_id);
}

/// Returns GC watermark. It equals
/// - min(effective watermarks), if number of effective watermarks > 0.
/// - `HummockSstableId::MAX`, if no effective watermark.
pub async fn global_watermark_sst_id(&self) -> HummockSstableId {
pub fn global_watermark_sst_id(&self) -> HummockSstableId {
self.sst_id_tracker
.tracking_sst_ids()
.await
.into_iter()
.min()
.unwrap_or(HummockSstableId::MAX)
Expand All @@ -172,9 +169,7 @@ impl SstableIdManager {
#[async_trait::async_trait]
impl ExtraInfoSource for SstableIdManager {
async fn get_extra_info(&self) -> Option<Info> {
Some(Info::HummockGcWatermark(
self.global_watermark_sst_id().await,
))
Some(Info::HummockGcWatermark(self.global_watermark_sst_id()))
}
}

Expand All @@ -189,86 +184,57 @@ pub enum TrackerId {
/// `SstIdTracker` tracks a min(SST id) for various caller, identified by a `TrackerId`.
pub struct SstIdTracker {
auto_id: AtomicU64,
inner: RwLock<SstIdTrackerInner>,
inner: parking_lot::RwLock<SstIdTrackerInner>,
}

impl SstIdTracker {
fn new() -> Self {
Self {
auto_id: Default::default(),
inner: RwLock::new(SstIdTrackerInner::new()),
inner: parking_lot::RwLock::new(SstIdTrackerInner::new()),
}
}

/// Adds a tracker to track SST id given by `get_sst_id`.
///
/// Consecutive `get_sst_id` must return non-decreasing SST ids.
async fn add_tracker<F>(&self, tracker_id: TrackerId, get_sst_id: F) -> HummockResult<()>
where
F: Future<Output = HummockResult<HummockSstableId>>,
{
let mut guard = self.inner.write().await;
let sst_id = get_sst_id.await?;
guard.add_tracker(tracker_id, sst_id)
/// Adds a tracker to track `sst_id`. If a tracker with `tracker_id` already exists, it will
/// track the smallest `sst_id` ever given.
fn add_tracker(&self, tracker_id: TrackerId, sst_id: HummockSstableId) {
self.inner.write().add_tracker(tracker_id, sst_id);
}

/// Removes given `tracker_id`.
async fn remove_tracker(&self, tracker_id: TrackerId) {
self.inner.write().await.remove_tracker(tracker_id);
fn remove_tracker(&self, tracker_id: TrackerId) {
self.inner.write().remove_tracker(tracker_id);
}

fn get_next_auto_tracker_id(&self) -> TrackerId {
TrackerId::Auto(self.auto_id.fetch_add(1, Ordering::Relaxed) + 1)
}

async fn tracking_sst_ids(&self) -> Vec<HummockSstableId> {
self.inner.read().await.tracking_sst_ids()
fn tracking_sst_ids(&self) -> Vec<HummockSstableId> {
self.inner.read().tracking_sst_ids()
}
}

struct SstIdTrackerInner {
tracking_sst_ids: HashMap<TrackerId, HummockSstableId>,
/// Used for sanity check
max_seen_sst_id: HummockSstableId,
}

impl SstIdTrackerInner {
fn new() -> Self {
Self {
tracking_sst_ids: Default::default(),
max_seen_sst_id: HummockSstableId::MIN,
}
}

/// Consecutive `add_tracker` must be given non-decreasing `sst_id`,
/// so that min(`tracking_sst_ids`) never moves backwards.
fn add_tracker(
&mut self,
tracker_id: TrackerId,
sst_id: HummockSstableId,
) -> HummockResult<()> {
if sst_id < self.max_seen_sst_id {
return Err(HummockError::sst_id_tracker_error(
"Tracker's SST id moves backwards",
));
}
self.max_seen_sst_id = sst_id;
fn add_tracker(&mut self, tracker_id: TrackerId, sst_id: HummockSstableId) {
match self.tracking_sst_ids.entry(tracker_id) {
Entry::Occupied(o) => {
if sst_id < *o.get() {
return Err(HummockError::sst_id_tracker_error(format!(
"Tracker's SST id moves backwards in {:#?}",
o.key()
)));
// Just track the minimum one.
}
Entry::Occupied(mut o) => {
*o.get_mut() = cmp::min(*o.get_mut(), sst_id);
}
Entry::Vacant(v) => {
v.insert(sst_id);
}
}

Ok(())
}

fn remove_tracker(&mut self, tracker_id: TrackerId) {
Expand All @@ -291,7 +257,7 @@ mod test {
#[tokio::test]
async fn test_sst_id_tracker_basic() {
let sst_id_tacker = SstIdTracker::new();
assert!(sst_id_tacker.tracking_sst_ids().await.is_empty());
assert!(sst_id_tacker.tracking_sst_ids().is_empty());
let auto_id =
try_match_expand!(sst_id_tacker.get_next_auto_tracker_id(), TrackerId::Auto).unwrap();
assert_eq!(auto_id, AutoTrackerId::MIN + 1);
Expand All @@ -300,84 +266,43 @@ mod test {
let auto_id_2 = sst_id_tacker.get_next_auto_tracker_id();
let auto_id_3 = sst_id_tacker.get_next_auto_tracker_id();

sst_id_tacker
.add_tracker(auto_id_1, async { Ok(10) })
.await
.unwrap();
sst_id_tacker.add_tracker(auto_id_1, 10);
assert_eq!(
sst_id_tacker
.tracking_sst_ids()
.await
.into_iter()
.min()
.unwrap(),
sst_id_tacker.tracking_sst_ids().into_iter().min().unwrap(),
10
);

// Fails due to SST id moving backwards.
sst_id_tacker
.add_tracker(auto_id_1, async { Ok(9) })
.await
.unwrap_err();
sst_id_tacker
.add_tracker(auto_id_2, async { Ok(9) })
.await
.unwrap_err();
// OK to move SST id backwards.
sst_id_tacker.add_tracker(auto_id_1, 9);
sst_id_tacker.add_tracker(auto_id_2, 9);

// OK to add same id to the same tracker
sst_id_tacker
.add_tracker(auto_id_1, async { Ok(10) })
.await
.unwrap();
sst_id_tacker.add_tracker(auto_id_1, 10);
// OK to add same id to another tracker
sst_id_tacker
.add_tracker(auto_id_2, async { Ok(10) })
.await
.unwrap();
sst_id_tacker.add_tracker(auto_id_2, 10);

sst_id_tacker
.add_tracker(auto_id_3, async { Ok(20) })
.await
.unwrap();
sst_id_tacker
.add_tracker(auto_id_2, async { Ok(30) })
.await
.unwrap();
// Tracker 1 and 2 both hold id 10.
sst_id_tacker.add_tracker(auto_id_3, 20);
sst_id_tacker.add_tracker(auto_id_2, 30);
// Tracker 1 and 2 both hold id 9.
assert_eq!(
sst_id_tacker
.tracking_sst_ids()
.await
.into_iter()
.min()
.unwrap(),
10
sst_id_tacker.tracking_sst_ids().into_iter().min().unwrap(),
9
);

sst_id_tacker.remove_tracker(auto_id_1).await;
// Tracker 2 still holds 10.
sst_id_tacker.remove_tracker(auto_id_1);
// Tracker 2 still holds 9.
assert_eq!(
sst_id_tacker
.tracking_sst_ids()
.await
.into_iter()
.min()
.unwrap(),
10
sst_id_tacker.tracking_sst_ids().into_iter().min().unwrap(),
9
);

sst_id_tacker.remove_tracker(auto_id_2).await;
sst_id_tacker.remove_tracker(auto_id_2);
assert_eq!(
sst_id_tacker
.tracking_sst_ids()
.await
.into_iter()
.min()
.unwrap(),
sst_id_tacker.tracking_sst_ids().into_iter().min().unwrap(),
20
);

sst_id_tacker.remove_tracker(auto_id_3).await;
assert!(sst_id_tacker.tracking_sst_ids().await.is_empty());
sst_id_tacker.remove_tracker(auto_id_3);
assert!(sst_id_tacker.tracking_sst_ids().is_empty());
}
}

0 comments on commit 74352d3

Please sign in to comment.