Skip to content

[4/n] [installinator] split fetching and progress reporting traits #8038

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 20 additions & 25 deletions installinator/src/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,13 @@ use tufaceous_lib::ControlPlaneZoneImages;
use update_engine::StepResult;

use crate::{
ArtifactWriter, WriteDestination,
artifact::ArtifactIdOpts,
peers::{DiscoveryMechanism, FetchedArtifact, HttpPeers, Peers},
reporter::ProgressReporter,
write::{ArtifactWriter, WriteDestination},
peers::{
DiscoveryMechanism, FetchArtifactBackend, FetchedArtifact,
HttpFetchBackend,
},
reporter::{HttpProgressBackend, ProgressReporter, ReportProgressBackend},
};

/// Installinator app.
Expand Down Expand Up @@ -90,11 +93,11 @@ struct DebugDiscoverOpts {

impl DebugDiscoverOpts {
async fn exec(self, log: &slog::Logger) -> Result<()> {
let peers = Peers::new(
let peers = FetchArtifactBackend::new(
log,
Box::new(HttpPeers::new(
log,
self.opts.mechanism.discover_peers(log).await?,
Box::new(HttpFetchBackend::new(
&log,
self.opts.mechanism.discover_peers(&log).await?,
)),
Duration::from_secs(10),
);
Expand Down Expand Up @@ -185,22 +188,14 @@ impl InstallOpts {
let image_id = self.artifact_ids.resolve()?;

let discovery = self.discover_opts.mechanism.clone();
let discovery_log = log.clone();
let (progress_reporter, event_sender) =
ProgressReporter::new(log, image_id.update_id, move || {
let log = discovery_log.clone();
let discovery = discovery.clone();
async move {
Ok(Peers::new(
&log,
Box::new(HttpPeers::new(
&log,
discovery.discover_peers(&log).await?,
)),
Duration::from_secs(10),
))
}
});
let (progress_reporter, event_sender) = ProgressReporter::new(
log,
image_id.update_id,
ReportProgressBackend::new(
log,
HttpProgressBackend::new(log, discovery),
),
);
let progress_handle = progress_reporter.start();
let discovery = &self.discover_opts.mechanism;

Expand Down Expand Up @@ -499,9 +494,9 @@ async fn fetch_artifact(
cx,
&log,
|| async {
Ok(Peers::new(
Ok(FetchArtifactBackend::new(
&log,
Box::new(HttpPeers::new(
Box::new(HttpFetchBackend::new(
&log,
discovery.discover_peers(&log).await?,
)),
Expand Down
96 changes: 30 additions & 66 deletions installinator/src/mock_peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ use tufaceous_artifact::ArtifactHashId;
use uuid::Uuid;

use crate::{
errors::HttpError,
peers::{FetchReceiver, PeerAddress, PeersImpl},
errors::{DiscoverPeersError, HttpError},
peers::{FetchArtifactImpl, FetchReceiver, PeerAddress},
reporter::ReportProgressImpl,
};

struct MockPeersUniverse {
Expand Down Expand Up @@ -126,7 +127,7 @@ impl MockPeersUniverse {
})
}

fn attempts(&self) -> impl Iterator<Item = Result<MockPeers>> + '_ {
fn attempts(&self) -> impl Iterator<Item = Result<MockFetchBackend>> + '_ {
self.attempt_bitmaps.iter().enumerate().map(
move |(i, &attempt_bitmap)| {
match attempt_bitmap {
Expand All @@ -141,7 +142,7 @@ impl MockPeersUniverse {
.then(|| (*addr, peer.clone()))
})
.collect();
Ok(MockPeers {
Ok(MockFetchBackend {
artifact: self.artifact.clone(),
selected_peers,
})
Expand Down Expand Up @@ -171,13 +172,13 @@ enum AttemptBitmap {
}

#[derive(Debug)]
struct MockPeers {
struct MockFetchBackend {
artifact: Bytes,
// Peers within the universe that have been selected
selected_peers: BTreeMap<PeerAddress, MockPeer>,
}

impl MockPeers {
impl MockFetchBackend {
fn get(&self, peer: PeerAddress) -> Option<&MockPeer> {
self.selected_peers.get(&peer)
}
Expand Down Expand Up @@ -232,7 +233,7 @@ impl MockPeers {
}

#[async_trait]
impl PeersImpl for MockPeers {
impl FetchArtifactImpl for MockFetchBackend {
fn peers(&self) -> Box<dyn Iterator<Item = PeerAddress> + Send + '_> {
Box::new(self.selected_peers.keys().copied())
}
Expand All @@ -258,18 +259,6 @@ impl PeersImpl for MockPeers {
// TODO: add tests to ensure an invalid artifact size is correctly detected
Ok((artifact_size, receiver))
}

async fn report_progress_impl(
&self,
_peer: PeerAddress,
_update_id: Uuid,
_report: EventReport,
) -> Result<(), ClientError> {
panic!(
"this is currently unused -- at some point we'll want to \
unify this with MockReportPeers"
)
}
}

#[derive(Clone)]
Expand Down Expand Up @@ -462,12 +451,12 @@ impl ResponseAction_ {
///
/// In the future, this will be combined with `MockPeers` so we can model.
#[derive(Debug)]
struct MockReportPeers {
struct MockProgressBackend {
update_id: Uuid,
report_sender: mpsc::Sender<EventReport>,
}

impl MockReportPeers {
impl MockProgressBackend {
const VALID_PEER: PeerAddress = PeerAddress::new(SocketAddr::new(
IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)),
2000,
Expand All @@ -485,27 +474,11 @@ impl MockReportPeers {
}

#[async_trait]
impl PeersImpl for MockReportPeers {
fn peers(&self) -> Box<dyn Iterator<Item = PeerAddress> + Send + '_> {
Box::new(
[Self::VALID_PEER, Self::INVALID_PEER, Self::UNRESPONSIVE_PEER]
.into_iter(),
)
}

fn peer_count(&self) -> usize {
3
}

async fn fetch_from_peer_impl(
impl ReportProgressImpl for MockProgressBackend {
async fn discover_peers(
&self,
_peer: PeerAddress,
_artifact_hash_id: ArtifactHashId,
) -> Result<(u64, FetchReceiver), HttpError> {
unimplemented!(
"this should never be called -- \
eventually we'll want to unify this with MockPeers",
)
) -> Result<Vec<PeerAddress>, DiscoverPeersError> {
Ok(vec![Self::VALID_PEER, Self::INVALID_PEER, Self::UNRESPONSIVE_PEER])
}

async fn report_progress_impl(
Expand Down Expand Up @@ -542,8 +515,8 @@ mod tests {
use super::*;
use crate::{
errors::DiscoverPeersError,
peers::{FetchedArtifact, Peers},
reporter::ProgressReporter,
peers::{FetchArtifactBackend, FetchedArtifact},
reporter::{ProgressReporter, ReportProgressBackend},
test_helpers::{dummy_artifact_hash_id, with_test_runtime},
};

Expand Down Expand Up @@ -582,25 +555,14 @@ mod tests {
ReceiverStream::new(report_receiver).collect::<Vec<_>>().await
});

let reporter_log = logctx.log.clone();

let (progress_reporter, event_sender) =
ProgressReporter::new(&logctx.log, update_id, move || {
let reporter_log = reporter_log.clone();
let report_sender = report_sender.clone();

async move {
Ok(Peers::new(
&reporter_log,
Box::new(MockReportPeers {
update_id,
report_sender,
}),
// The timeout is currently unused by broadcast_report.
Duration::from_secs(10),
))
}
});
let (progress_reporter, event_sender) = ProgressReporter::new(
&logctx.log,
update_id,
ReportProgressBackend::new(
&logctx.log,
MockProgressBackend { update_id, report_sender },
),
);
let progress_handle = progress_reporter.start();

let engine = UpdateEngine::new(&logctx.log, event_sender);
Expand Down Expand Up @@ -685,17 +647,19 @@ mod tests {
async fn fetch_artifact(
cx: &StepContext,
log: &slog::Logger,
attempts: impl IntoIterator<Item = Result<MockPeers>>,
attempts: impl IntoIterator<Item = Result<MockFetchBackend>>,
timeout: Duration,
) -> Result<FetchedArtifact> {
let mut attempts = attempts.into_iter();
FetchedArtifact::loop_fetch_from_peers(
cx,
log,
|| match attempts.next() {
Some(Ok(peers)) => {
future::ok(Peers::new(&log, Box::new(peers), timeout))
}
Some(Ok(peers)) => future::ok(FetchArtifactBackend::new(
&log,
Box::new(peers),
timeout,
)),
Some(Err(error)) => {
future::err(DiscoverPeersError::Retry(error))
}
Expand Down
Loading
Loading