-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement StreamAttestations RPC Endpoint #4390
Changes from all commits
ccb4b1f
b0fb260
44f33bd
2f06afe
b9fe42e
0c0f041
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,6 +11,8 @@ import ( | |
"github.com/prysmaticlabs/prysm/beacon-chain/db/filters" | ||
"github.com/prysmaticlabs/prysm/shared/pagination" | ||
"github.com/prysmaticlabs/prysm/shared/params" | ||
"github.com/prysmaticlabs/prysm/shared/slotutil" | ||
|
||
"google.golang.org/grpc/codes" | ||
"google.golang.org/grpc/status" | ||
) | ||
|
@@ -113,12 +115,29 @@ func (bs *Server) ListAttestations( | |
}, nil | ||
} | ||
|
||
// StreamAttestations to clients every single time a new attestation is received. | ||
// TODO(#4184): Implement. | ||
// StreamAttestations to clients at the end of every slot. This method retrieves the | ||
// aggregated attestations currently in the pool at the start of a slot and sends | ||
// them over a gRPC stream. | ||
func (bs *Server) StreamAttestations( | ||
_ *ptypes.Empty, _ ethpb.BeaconChain_StreamAttestationsServer, | ||
_ *ptypes.Empty, stream ethpb.BeaconChain_StreamAttestationsServer, | ||
) error { | ||
return status.Error(codes.Unimplemented, "Not yet implemented") | ||
genesisTime := bs.GenesisTimeFetcher.GenesisTime() | ||
st := slotutil.GetSlotTicker(genesisTime, params.BeaconConfig().SecondsPerSlot) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. one concern for this, what if I call this method a large number of slots after genesis; ex: 100,000. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Confirmed with the ticker code it does not loop based on the time since genesis, it merely aligns the time with the current slot and ticks every time a new slot happens |
||
for { | ||
select { | ||
case <-st.C(): | ||
atts := bs.Pool.AggregatedAttestations() | ||
for i := 0; i < len(atts); i++ { | ||
if err := stream.Send(atts[i]); err != nil { | ||
return status.Errorf(codes.Unavailable, "Could not send over stream: %v", err) | ||
} | ||
} | ||
case <-bs.Ctx.Done(): | ||
return status.Error(codes.Canceled, "Context canceled") | ||
case <-stream.Context().Done(): | ||
return status.Error(codes.Canceled, "Context canceled") | ||
} | ||
} | ||
} | ||
|
||
// AttestationPool retrieves pending attestations. | ||
|
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bad go imports