Skip to content

Commit

Permalink
change: rename CurrentSnapshotData to Snapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
drmingdrmer committed Aug 23, 2021
1 parent f168696 commit fea63b2
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 16 deletions.
4 changes: 2 additions & 2 deletions async-raft/src/core/replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::quorum;
use crate::replication::RaftEvent;
use crate::replication::ReplicaEvent;
use crate::replication::ReplicationStream;
use crate::storage::CurrentSnapshotData;
use crate::storage::Snapshot;
use crate::AppData;
use crate::AppDataResponse;
use crate::LogId;
Expand Down Expand Up @@ -277,7 +277,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
async fn handle_needs_snapshot(
&mut self,
_: NodeId,
tx: oneshot::Sender<CurrentSnapshotData<S::SnapshotData>>,
tx: oneshot::Sender<Snapshot<S::SnapshotData>>,
) -> RaftResult<()> {
// Ensure snapshotting is configured, else do nothing.
let threshold = match &self.core.config.snapshot_policy {
Expand Down
12 changes: 6 additions & 6 deletions async-raft/src/replication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::error::RaftResult;
use crate::raft::AppendEntriesRequest;
use crate::raft::Entry;
use crate::raft::InstallSnapshotRequest;
use crate::storage::CurrentSnapshotData;
use crate::storage::Snapshot;
use crate::AppData;
use crate::AppDataResponse;
use crate::LogId;
Expand Down Expand Up @@ -566,7 +566,7 @@ where S: AsyncRead + AsyncSeek + Send + Unpin + 'static
/// The ID of the target node from which the event was sent.
target: NodeId,
/// The response channel for delivering the snapshot data.
tx: oneshot::Sender<CurrentSnapshotData<S>>,
tx: oneshot::Sender<Snapshot<S>>,
},
/// Some critical error has taken place, and Raft needs to shutdown.
Shutdown,
Expand Down Expand Up @@ -801,8 +801,8 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
struct SnapshottingState<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> {
/// An exclusive handle to the replication core.
replication_core: &'a mut ReplicationCore<D, R, N, S>,
snapshot: Option<CurrentSnapshotData<S::SnapshotData>>,
snapshot_fetch_rx: Option<oneshot::Receiver<CurrentSnapshotData<S::SnapshotData>>>,
snapshot: Option<Snapshot<S::SnapshotData>>,
snapshot_fetch_rx: Option<oneshot::Receiver<Snapshot<S::SnapshotData>>>,
}

impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> SnapshottingState<'a, D, R, N, S> {
Expand Down Expand Up @@ -865,7 +865,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
/// If an error comes up during processing, this routine should simple be called again after
/// issuing a new request to the storage layer.
#[tracing::instrument(level = "trace", skip(self, rx))]
async fn wait_for_snapshot(&mut self, mut rx: oneshot::Receiver<CurrentSnapshotData<S::SnapshotData>>) {
async fn wait_for_snapshot(&mut self, mut rx: oneshot::Receiver<Snapshot<S::SnapshotData>>) {
loop {
let span = tracing::debug_span!("FFF:wait_for_snapshot");
let _ent = span.enter();
Expand Down Expand Up @@ -898,7 +898,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
}

#[tracing::instrument(level = "trace", skip(self, snapshot))]
async fn stream_snapshot(&mut self, mut snapshot: CurrentSnapshotData<S::SnapshotData>) -> RaftResult<()> {
async fn stream_snapshot(&mut self, mut snapshot: Snapshot<S::SnapshotData>) -> RaftResult<()> {
let end = snapshot.snapshot.seek(SeekFrom::End(0)).await?;

let mut offset = 0;
Expand Down
6 changes: 3 additions & 3 deletions async-raft/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub struct SnapshotMeta {
}

/// The data associated with the current snapshot.
pub struct CurrentSnapshotData<S>
pub struct Snapshot<S>
where S: AsyncRead + AsyncSeek + Send + Unpin + 'static
{
/// metadata of a snapshot
Expand Down Expand Up @@ -214,7 +214,7 @@ where
/// log covered by the snapshot.
///
/// Errors returned from this method will be logged and retried.
async fn do_log_compaction(&self) -> Result<CurrentSnapshotData<Self::SnapshotData>>;
async fn do_log_compaction(&self) -> Result<Snapshot<Self::SnapshotData>>;

/// Create a new blank snapshot, returning a writable handle to the snapshot object.
///
Expand Down Expand Up @@ -261,7 +261,7 @@ where
/// of the snapshot, which should be decoded for creating this method's response data.
///
/// Errors returned from this method will cause Raft to go into shutdown.
async fn get_current_snapshot(&self) -> Result<Option<CurrentSnapshotData<Self::SnapshotData>>>;
async fn get_current_snapshot(&self) -> Result<Option<Snapshot<Self::SnapshotData>>>;
}

/// APIs for debugging a store.
Expand Down
10 changes: 5 additions & 5 deletions memstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ use async_raft::async_trait::async_trait;
use async_raft::raft::Entry;
use async_raft::raft::EntryPayload;
use async_raft::raft::MembershipConfig;
use async_raft::storage::CurrentSnapshotData;
use async_raft::storage::HardState;
use async_raft::storage::InitialState;
use async_raft::storage::Snapshot;
use async_raft::AppData;
use async_raft::AppDataResponse;
use async_raft::LogId;
Expand Down Expand Up @@ -344,7 +344,7 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
}

#[tracing::instrument(level = "trace", skip(self))]
async fn do_log_compaction(&self) -> Result<CurrentSnapshotData<Self::SnapshotData>> {
async fn do_log_compaction(&self) -> Result<Snapshot<Self::SnapshotData>> {
let (data, last_applied_log);
let membership_config;
{
Expand Down Expand Up @@ -390,7 +390,7 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
} // Release log & snapshot write locks.

tracing::info!({ snapshot_size = snapshot_size }, "log compaction complete");
Ok(CurrentSnapshotData {
Ok(Snapshot {
meta,
snapshot: Box::new(Cursor::new(data)),
})
Expand Down Expand Up @@ -448,13 +448,13 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
}

#[tracing::instrument(level = "trace", skip(self))]
async fn get_current_snapshot(&self) -> Result<Option<CurrentSnapshotData<Self::SnapshotData>>> {
async fn get_current_snapshot(&self) -> Result<Option<Snapshot<Self::SnapshotData>>> {
match &*self.current_snapshot.read().await {
Some(snapshot) => {
// TODO(xp): try not to clone the entire data.
// If snapshot.data is Arc<T> that impl AsyncRead etc then the sharing can be done.
let data = snapshot.data.clone();
Ok(Some(CurrentSnapshotData {
Ok(Some(Snapshot {
meta: snapshot.meta.clone(),
snapshot: Box::new(Cursor::new(data)),
}))
Expand Down

0 comments on commit fea63b2

Please sign in to comment.