From 28687ca8582bf464164a8675c7bdabb1f66a2c0b Mon Sep 17 00:00:00 2001 From: Kostas Christidis Date: Thu, 2 Mar 2017 13:39:04 -0500 Subject: [PATCH] [FAB-2606] Enable deliver_stdout to seek https://jira.hyperledger.org/browse/FAB-2606 The `deliver_stdout` client is useful for debugging. This changeset introduces a `seek` flag that allows us to seek to oldest or newest (using the convention in [1]), or extract a specific block. [1] https://godoc.org/github.com/Shopify/sarama#pkg-constants Change-Id: Iaf8d3c596cfc23768d2bfc257ee32ef9a4512c54 Signed-off-by: Kostas Christidis --- .../sample_clients/deliver_stdout/client.go | 39 +++++++++++++++---- 1 file changed, 32 insertions(+), 7 deletions(-) diff --git a/orderer/sample_clients/deliver_stdout/client.go b/orderer/sample_clients/deliver_stdout/client.go index 17746f1c444..e8002b607c5 100644 --- a/orderer/sample_clients/deliver_stdout/client.go +++ b/orderer/sample_clients/deliver_stdout/client.go @@ -30,6 +30,12 @@ import ( "google.golang.org/grpc" ) +var ( + oldest = &ab.SeekPosition{Type: &ab.SeekPosition_Oldest{Oldest: &ab.SeekOldest{}}} + newest = &ab.SeekPosition{Type: &ab.SeekPosition_Newest{Newest: &ab.SeekNewest{}}} + maxStop = &ab.SeekPosition{Type: &ab.SeekPosition_Specified{Specified: &ab.SeekSpecified{Number: math.MaxUint64}}} +) + type deliverClient struct { client ab.AtomicBroadcast_DeliverClient chainID string @@ -39,7 +45,7 @@ func newDeliverClient(client ab.AtomicBroadcast_DeliverClient, chainID string) * return &deliverClient{client: client, chainID: chainID} } -func seekHelper(chainID string, start *ab.SeekPosition) *cb.Envelope { +func seekHelper(chainID string, start *ab.SeekPosition, stop *ab.SeekPosition) *cb.Envelope { return &cb.Envelope{ Payload: utils.MarshalOrPanic(&cb.Payload{ Header: &cb.Header{ @@ -51,7 +57,7 @@ func seekHelper(chainID string, start *ab.SeekPosition) *cb.Envelope { Data: utils.MarshalOrPanic(&ab.SeekInfo{ Start: start, - Stop: &ab.SeekPosition{Type: &ab.SeekPosition_Specified{Specified: &ab.SeekSpecified{Number: math.MaxUint64}}}, + Stop: stop, Behavior: ab.SeekInfo_BLOCK_UNTIL_READY, }), }), @@ -59,15 +65,16 @@ func seekHelper(chainID string, start *ab.SeekPosition) *cb.Envelope { } func (r *deliverClient) seekOldest() error { - return r.client.Send(seekHelper(r.chainID, &ab.SeekPosition{Type: &ab.SeekPosition_Oldest{Oldest: &ab.SeekOldest{}}})) + return r.client.Send(seekHelper(r.chainID, oldest, maxStop)) } func (r *deliverClient) seekNewest() error { - return r.client.Send(seekHelper(r.chainID, &ab.SeekPosition{Type: &ab.SeekPosition_Newest{Newest: &ab.SeekNewest{}}})) + return r.client.Send(seekHelper(r.chainID, newest, maxStop)) } -func (r *deliverClient) seek(blockNumber uint64) error { - return r.client.Send(seekHelper(r.chainID, &ab.SeekPosition{Type: &ab.SeekPosition_Specified{Specified: &ab.SeekSpecified{Number: blockNumber}}})) +func (r *deliverClient) seekSingle(blockNumber uint64) error { + specific := &ab.SeekPosition{Type: &ab.SeekPosition_Specified{Specified: &ab.SeekSpecified{Number: blockNumber}}} + return r.client.Send(seekHelper(r.chainID, specific, specific)) } func (r *deliverClient) readUntilClose() { @@ -93,11 +100,21 @@ func main() { var chainID string var serverAddr string + var seek int flag.StringVar(&serverAddr, "server", fmt.Sprintf("%s:%d", config.General.ListenAddress, config.General.ListenPort), "The RPC server to connect to.") flag.StringVar(&chainID, "chainID", provisional.TestChainID, "The chain ID to deliver from.") + flag.IntVar(&seek, "seek", -2, "Specify the range of requested blocks."+ + "Acceptable values:"+ + "-2 (or -1) to start from oldest (or newest) and keep at it indefinitely."+ + "N >= 0 to fetch block N only.") flag.Parse() + if seek < -2 { + fmt.Println("Wrong seek value.") + flag.PrintDefaults() + } + conn, err := grpc.Dial(serverAddr, grpc.WithInsecure()) if err != nil { fmt.Println("Error connecting:", err) @@ -110,7 +127,15 @@ func main() { } s := newDeliverClient(client, chainID) - err = s.seekOldest() + switch seek { + case -2: + err = s.seekOldest() + case -1: + err = s.seekNewest() + default: + err = s.seekSingle(uint64(seek)) + } + if err != nil { fmt.Println("Received error:", err) }