Skip to content

Commit 4e281e0

Browse files
committed
Merge branch 'main' into golddydev/epochs-blocks
2 parents c3eccf6 + c983831 commit 4e281e0

File tree

4 files changed

+265
-22
lines changed

4 files changed

+265
-22
lines changed

common/src/queries/epochs.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,14 @@ pub struct EpochInfo {
4444
}
4545

4646
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
47-
pub struct NextEpochs {}
47+
pub struct NextEpochs {
48+
pub epochs: Vec<EpochActivityMessage>,
49+
}
4850

4951
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
50-
pub struct PreviousEpochs {}
52+
pub struct PreviousEpochs {
53+
pub epochs: Vec<EpochActivityMessage>,
54+
}
5155

5256
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
5357
pub struct EpochStakeDistribution {}

modules/epochs_state/src/epochs_history.rs

Lines changed: 88 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,17 +22,40 @@ impl EpochsHistoryState {
2222
}
2323
}
2424

25-
pub fn is_enabled(&self) -> bool {
26-
self.epochs_history.is_some()
27-
}
28-
2925
/// Get Epoch Activity Message for certain pool operator at certain epoch
3026
pub fn get_historical_epoch(&self, epoch: u64) -> Result<Option<EpochActivityMessage>> {
31-
if self.is_enabled() {
32-
Ok(self
33-
.epochs_history
34-
.as_ref()
35-
.and_then(|epochs| epochs.get(&epoch).map(|e| e.clone())))
27+
if let Some(epochs_history) = self.epochs_history.as_ref() {
28+
Ok(epochs_history.get(&epoch).map(|e| e.clone()))
29+
} else {
30+
Err(anyhow::anyhow!("Historical epoch storage is disabled"))
31+
}
32+
}
33+
34+
/// Get Epoch Activity Messages for epochs following a specific epoch. (exclusive)
35+
pub fn get_next_epochs(&self, epoch: u64) -> Result<Vec<EpochActivityMessage>> {
36+
if let Some(epochs_history) = self.epochs_history.as_ref() {
37+
let mut epochs: Vec<EpochActivityMessage> = epochs_history
38+
.iter()
39+
.filter(|entry| *entry.key() > epoch)
40+
.map(|e| e.value().clone())
41+
.collect();
42+
epochs.sort_by(|a, b| a.epoch.cmp(&b.epoch));
43+
Ok(epochs)
44+
} else {
45+
Err(anyhow::anyhow!("Historical epoch storage is disabled"))
46+
}
47+
}
48+
49+
/// Get Epoch Activity Messages for epochs following a specific epoch. (exclusive)
50+
pub fn get_previous_epochs(&self, epoch: u64) -> Result<Vec<EpochActivityMessage>> {
51+
if let Some(epochs_history) = self.epochs_history.as_ref() {
52+
let mut epochs: Vec<EpochActivityMessage> = epochs_history
53+
.iter()
54+
.filter(|entry| *entry.key() < epoch)
55+
.map(|e| e.value().clone())
56+
.collect();
57+
epochs.sort_by(|a, b| a.epoch.cmp(&b.epoch));
58+
Ok(epochs)
3659
} else {
3760
Err(anyhow::anyhow!("Historical epoch storage is disabled"))
3861
}
@@ -114,4 +137,60 @@ mod tests {
114137
assert_eq!(history.total_blocks, 1);
115138
assert_eq!(history.total_fees, 50);
116139
}
140+
141+
#[test]
142+
fn get_next_previous_epochs_sorts_epochs() {
143+
let epochs_history = EpochsHistoryState::new(&StoreConfig::new(true));
144+
let block = make_block(200);
145+
epochs_history.handle_epoch_activity(
146+
&block,
147+
&EpochActivityMessage {
148+
epoch: 199,
149+
epoch_start_time: 0,
150+
epoch_end_time: 0,
151+
first_block_time: 0,
152+
last_block_time: 0,
153+
total_blocks: 1,
154+
total_txs: 1,
155+
total_outputs: 100,
156+
total_fees: 50,
157+
vrf_vkey_hashes: vec![],
158+
nonce: None,
159+
},
160+
);
161+
162+
let block = make_block(201);
163+
epochs_history.handle_epoch_activity(
164+
&block,
165+
&EpochActivityMessage {
166+
epoch: 200,
167+
epoch_start_time: 0,
168+
epoch_end_time: 0,
169+
first_block_time: 0,
170+
last_block_time: 0,
171+
total_blocks: 1,
172+
total_txs: 1,
173+
total_outputs: 100,
174+
total_fees: 50,
175+
vrf_vkey_hashes: vec![],
176+
nonce: None,
177+
},
178+
);
179+
180+
let next_epochs = epochs_history.get_next_epochs(199).expect("history disabled in test");
181+
assert_eq!(next_epochs.len(), 1);
182+
assert_eq!(next_epochs[0].epoch, 200);
183+
184+
let previous_epochs =
185+
epochs_history.get_previous_epochs(201).expect("history disabled in test");
186+
assert_eq!(previous_epochs.len(), 2);
187+
assert_eq!(previous_epochs[0].epoch, 199);
188+
189+
let next_epochs = epochs_history.get_next_epochs(200).expect("history disabled in test");
190+
assert_eq!(next_epochs.len(), 0);
191+
192+
let previous_epochs =
193+
epochs_history.get_previous_epochs(199).expect("history disabled in test");
194+
assert_eq!(previous_epochs.len(), 0);
195+
}
117196
}

modules/epochs_state/src/epochs_state.rs

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@
44
use acropolis_common::{
55
messages::{CardanoMessage, Message, StateQuery, StateQueryResponse},
66
queries::epochs::{
7-
EpochInfo, EpochsStateQuery, EpochsStateQueryResponse, LatestEpoch,
8-
DEFAULT_EPOCHS_QUERY_TOPIC,
7+
EpochInfo, EpochsStateQuery, EpochsStateQueryResponse, LatestEpoch, NextEpochs,
8+
PreviousEpochs, DEFAULT_EPOCHS_QUERY_TOPIC,
99
},
1010
state_history::{StateHistory, StateHistoryStore},
1111
BlockInfo, BlockStatus, Era,
@@ -281,6 +281,44 @@ impl EpochsState {
281281
}
282282
}
283283

284+
EpochsStateQuery::GetNextEpochs { epoch_number } => {
285+
let current_epoch = state.get_epoch_info();
286+
if *epoch_number > current_epoch.epoch {
287+
EpochsStateQueryResponse::NotFound
288+
} else {
289+
match epochs_history.get_next_epochs(*epoch_number) {
290+
Ok(mut epochs) => {
291+
// check the current epoch also
292+
if current_epoch.epoch > *epoch_number {
293+
epochs.push(current_epoch);
294+
}
295+
EpochsStateQueryResponse::NextEpochs(NextEpochs { epochs })
296+
}
297+
Err(_) => EpochsStateQueryResponse::Error(
298+
"Historical epoch storage is disabled".to_string(),
299+
),
300+
}
301+
}
302+
}
303+
304+
EpochsStateQuery::GetPreviousEpochs { epoch_number } => {
305+
let current_epoch = state.get_epoch_info();
306+
if *epoch_number > current_epoch.epoch {
307+
EpochsStateQueryResponse::NotFound
308+
} else {
309+
match epochs_history.get_previous_epochs(*epoch_number) {
310+
Ok(epochs) => {
311+
EpochsStateQueryResponse::PreviousEpochs(PreviousEpochs {
312+
epochs,
313+
})
314+
}
315+
Err(_) => EpochsStateQueryResponse::Error(
316+
"Historical epoch storage is disabled".to_string(),
317+
),
318+
}
319+
}
320+
}
321+
284322
EpochsStateQuery::GetLatestEpochBlocksMintedByPool { vrf_key_hash } => {
285323
EpochsStateQueryResponse::LatestEpochBlocksMintedByPool(
286324
state.get_latest_epoch_blocks_minted_by_pool(vrf_key_hash),

modules/rest_blockfrost/src/handlers/epochs.rs

Lines changed: 131 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ pub async fn handle_epoch_info_blockfrost(
131131
response.active_stake = Some(total_active_stakes);
132132
}
133133

134-
let json = match serde_json::to_string(&response) {
134+
let json = match serde_json::to_string_pretty(&response) {
135135
Ok(j) => j,
136136
Err(e) => {
137137
return Ok(RESTResponse::with_text(
@@ -257,19 +257,141 @@ pub async fn handle_epoch_params_blockfrost(
257257
}
258258

259259
pub async fn handle_epoch_next_blockfrost(
260-
_context: Arc<Context<Message>>,
261-
_params: Vec<String>,
262-
_handlers_config: Arc<HandlersConfig>,
260+
context: Arc<Context<Message>>,
261+
params: Vec<String>,
262+
handlers_config: Arc<HandlersConfig>,
263263
) -> Result<RESTResponse> {
264-
Ok(RESTResponse::with_text(501, "Not implemented"))
264+
if params.len() != 1 {
265+
return Ok(RESTResponse::with_text(
266+
400,
267+
"Expected one parameter: an epoch number",
268+
));
269+
}
270+
let param = &params[0];
271+
272+
let parsed = match param.parse::<u64>() {
273+
Ok(num) => num,
274+
Err(_) => {
275+
return Ok(RESTResponse::with_text(
276+
400,
277+
"Invalid epoch number parameter",
278+
));
279+
}
280+
};
281+
282+
let next_epochs_msg = Arc::new(Message::StateQuery(StateQuery::Epochs(
283+
EpochsStateQuery::GetNextEpochs {
284+
epoch_number: parsed,
285+
},
286+
)));
287+
let next_epochs = query_state(
288+
&context,
289+
&handlers_config.epochs_query_topic,
290+
next_epochs_msg,
291+
|message| match message {
292+
Message::StateQueryResponse(StateQueryResponse::Epochs(
293+
EpochsStateQueryResponse::NextEpochs(response),
294+
)) => Ok(response
295+
.epochs
296+
.into_iter()
297+
.map(|epoch| EpochActivityRest::from(epoch))
298+
.collect::<Vec<_>>()),
299+
Message::StateQueryResponse(StateQueryResponse::Epochs(
300+
EpochsStateQueryResponse::Error(e),
301+
)) => {
302+
return Err(anyhow::anyhow!(
303+
"Internal server error while retrieving next epochs: {e}"
304+
));
305+
}
306+
Message::StateQueryResponse(StateQueryResponse::Epochs(
307+
EpochsStateQueryResponse::NotFound,
308+
)) => Err(anyhow::anyhow!("Epoch not found")),
309+
_ => Err(anyhow::anyhow!(
310+
"Unexpected message type while retrieving next epochs"
311+
)),
312+
},
313+
)
314+
.await?;
315+
316+
let json = match serde_json::to_string_pretty(&next_epochs) {
317+
Ok(j) => j,
318+
Err(e) => {
319+
return Ok(RESTResponse::with_text(
320+
500,
321+
&format!("Failed to serialize epoch info: {e}"),
322+
));
323+
}
324+
};
325+
Ok(RESTResponse::with_json(200, &json))
265326
}
266327

267328
pub async fn handle_epoch_previous_blockfrost(
268-
_context: Arc<Context<Message>>,
269-
_params: Vec<String>,
270-
_handlers_config: Arc<HandlersConfig>,
329+
context: Arc<Context<Message>>,
330+
params: Vec<String>,
331+
handlers_config: Arc<HandlersConfig>,
271332
) -> Result<RESTResponse> {
272-
Ok(RESTResponse::with_text(501, "Not implemented"))
333+
if params.len() != 1 {
334+
return Ok(RESTResponse::with_text(
335+
400,
336+
"Expected one parameter: an epoch number",
337+
));
338+
}
339+
let param = &params[0];
340+
341+
let parsed = match param.parse::<u64>() {
342+
Ok(num) => num,
343+
Err(_) => {
344+
return Ok(RESTResponse::with_text(
345+
400,
346+
"Invalid epoch number parameter",
347+
));
348+
}
349+
};
350+
351+
let previous_epochs_msg = Arc::new(Message::StateQuery(StateQuery::Epochs(
352+
EpochsStateQuery::GetPreviousEpochs {
353+
epoch_number: parsed,
354+
},
355+
)));
356+
let previous_epochs = query_state(
357+
&context,
358+
&handlers_config.epochs_query_topic,
359+
previous_epochs_msg,
360+
|message| match message {
361+
Message::StateQueryResponse(StateQueryResponse::Epochs(
362+
EpochsStateQueryResponse::PreviousEpochs(response),
363+
)) => Ok(response
364+
.epochs
365+
.into_iter()
366+
.map(|epoch| EpochActivityRest::from(epoch))
367+
.collect::<Vec<_>>()),
368+
Message::StateQueryResponse(StateQueryResponse::Epochs(
369+
EpochsStateQueryResponse::Error(e),
370+
)) => {
371+
return Err(anyhow::anyhow!(
372+
"Internal server error while retrieving previous epochs: {e}"
373+
));
374+
}
375+
Message::StateQueryResponse(StateQueryResponse::Epochs(
376+
EpochsStateQueryResponse::NotFound,
377+
)) => Err(anyhow::anyhow!("Epoch not found")),
378+
_ => Err(anyhow::anyhow!(
379+
"Unexpected message type while retrieving previous epochs"
380+
)),
381+
},
382+
)
383+
.await?;
384+
385+
let json = match serde_json::to_string_pretty(&previous_epochs) {
386+
Ok(j) => j,
387+
Err(e) => {
388+
return Ok(RESTResponse::with_text(
389+
500,
390+
&format!("Failed to serialize epoch info: {e}"),
391+
));
392+
}
393+
};
394+
Ok(RESTResponse::with_json(200, &json))
273395
}
274396

275397
pub async fn handle_epoch_total_stakes_blockfrost(

0 commit comments

Comments
 (0)