Skip to content

Commit

Permalink
Merge pull request #991 from kaleido-io/rewind
Browse files Browse the repository at this point in the history
Add /pins/rewind API for requesing manual rewind
  • Loading branch information
peterbroadhurst authored Aug 20, 2022
2 parents de314c8 + abe6470 commit d812ffe
Show file tree
Hide file tree
Showing 14 changed files with 329 additions and 23 deletions.
113 changes: 111 additions & 2 deletions docs/swagger/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19578,7 +19578,7 @@ paths:
- Non-Default Namespace
/namespaces/{ns}/network/organizations/{nameOrId}:
get:
description: Gets information about a specifc org in the network
description: Gets information about a specific org in the network
operationId: getNetworkOrgNamespace
parameters:
- description: The name or ID of the org
Expand Down Expand Up @@ -20409,6 +20409,64 @@ paths:
description: ""
tags:
- Non-Default Namespace
/namespaces/{ns}/pins/rewind:
post:
description: Force a rewind of the event aggregator to a previous position,
to re-evaluate all unconfirmed pins since that point
operationId: postPinsRewindNamespace
parameters:
- description: The namespace which scopes this request
in: path
name: ns
required: true
schema:
example: default
type: string
- description: Server-side request timeout (milliseconds, or set a custom suffix
like 10s)
in: header
name: Request-Timeout
schema:
default: 2m0s
type: string
requestBody:
content:
application/json:
schema:
properties:
batch:
description: The ID of the batch to which the event aggregator should
rewind. Either sequence or batch must be specified
format: uuid
type: string
sequence:
description: The sequence of the pin to which the event aggregator
should rewind. Either sequence or batch must be specified
format: int64
type: integer
type: object
responses:
"200":
content:
application/json:
schema:
properties:
batch:
description: The ID of the batch to which the event aggregator
should rewind. Either sequence or batch must be specified
format: uuid
type: string
sequence:
description: The sequence of the pin to which the event aggregator
should rewind. Either sequence or batch must be specified
format: int64
type: integer
type: object
description: Success
default:
description: ""
tags:
- Non-Default Namespace
/namespaces/{ns}/status:
get:
description: Gets the status of this namespace
Expand Down Expand Up @@ -27188,7 +27246,7 @@ paths:
- Default Namespace
/network/organizations/{nameOrId}:
get:
description: Gets information about a specifc org in the network
description: Gets information about a specific org in the network
operationId: getNetworkOrg
parameters:
- description: The name or ID of the org
Expand Down Expand Up @@ -27977,6 +28035,57 @@ paths:
description: ""
tags:
- Default Namespace
/pins/rewind:
post:
description: Force a rewind of the event aggregator to a previous position,
to re-evaluate all unconfirmed pins since that point
operationId: postPinsRewind
parameters:
- description: Server-side request timeout (milliseconds, or set a custom suffix
like 10s)
in: header
name: Request-Timeout
schema:
default: 2m0s
type: string
requestBody:
content:
application/json:
schema:
properties:
batch:
description: The ID of the batch to which the event aggregator should
rewind. Either sequence or batch must be specified
format: uuid
type: string
sequence:
description: The sequence of the pin to which the event aggregator
should rewind. Either sequence or batch must be specified
format: int64
type: integer
type: object
responses:
"200":
content:
application/json:
schema:
properties:
batch:
description: The ID of the batch to which the event aggregator
should rewind. Either sequence or batch must be specified
format: uuid
type: string
sequence:
description: The sequence of the pin to which the event aggregator
should rewind. Either sequence or batch must be specified
format: int64
type: integer
type: object
description: Success
default:
description: ""
tags:
- Default Namespace
/status:
get:
description: Gets the status of this namespace
Expand Down
42 changes: 42 additions & 0 deletions internal/apiserver/route_post_pins_rewind.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright © 2022 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package apiserver

import (
"net/http"

"github.com/hyperledger/firefly-common/pkg/ffapi"
"github.com/hyperledger/firefly/internal/coremsgs"
"github.com/hyperledger/firefly/pkg/core"
)

var postPinsRewind = &ffapi.Route{
Name: "postPinsRewind",
Path: "pins/rewind",
Method: http.MethodPost,
PathParams: nil,
QueryParams: nil,
Description: coremsgs.APIEndpointsPostPinsRewind,
JSONInputValue: func() interface{} { return &core.PinRewind{} },
JSONOutputValue: func() interface{} { return &core.PinRewind{} },
JSONOutputCodes: []int{http.StatusOK},
Extensions: &coreExtensions{
CoreJSONHandler: func(r *ffapi.APIRequest, cr *coreRequest) (output interface{}, err error) {
return cr.or.RewindPins(cr.ctx, r.Input.(*core.PinRewind))
},
},
}
45 changes: 45 additions & 0 deletions internal/apiserver/route_post_pins_rewind_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright © 2021 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package apiserver

import (
"bytes"
"encoding/json"
"net/http/httptest"
"testing"

"github.com/hyperledger/firefly/pkg/core"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)

func TestPostPinsRewind(t *testing.T) {
o, r := newTestAPIServer()
o.On("Authorize", mock.Anything, mock.Anything).Return(nil)
input := core.PinRewind{}
var buf bytes.Buffer
json.NewEncoder(&buf).Encode(&input)
req := httptest.NewRequest("POST", "/api/v1/pins/rewind", &buf)
req.Header.Set("Content-Type", "application/json; charset=utf-8")
res := httptest.NewRecorder()

o.On("RewindPins", mock.Anything, mock.AnythingOfType("*core.PinRewind")).
Return(&core.PinRewind{}, nil)
r.ServeHTTP(res, req)

assert.Equal(t, 200, res.Result().StatusCode)
}
1 change: 1 addition & 0 deletions internal/apiserver/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ var routes = append(
postNewOrganizationSelf,
postNodesSelf,
postOpRetry,
postPinsRewind,
postTokenApproval,
postTokenBurn,
postTokenMint,
Expand Down
3 changes: 2 additions & 1 deletion internal/coremsgs/en_api_translations.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ var (
APIEndpointsGetNetworkIdentities = ffm("api.endpoints.getNetworkIdentities", "Gets the list of identities in the network (deprecated - use /identities instead of /network/identities")
APIEndpointsGetNetworkNode = ffm("api.endpoints.getNetworkNode", "Gets information about a specific node in the network")
APIEndpointsGetNetworkNodes = ffm("api.endpoints.getNetworkNodes", "Gets a list of nodes in the network")
APIEndpointsGetNetworkOrg = ffm("api.endpoints.getNetworkOrg", "Gets information about a specifc org in the network")
APIEndpointsGetNetworkOrg = ffm("api.endpoints.getNetworkOrg", "Gets information about a specific org in the network")
APIEndpointsGetNetworkOrgs = ffm("api.endpoints.APIEndpointsGetNetworkOrgs", "Gets a list of orgs in the network")
APIEndpointsGetOpByID = ffm("api.endpoints.getOpByID", "Gets an operation by ID")
APIEndpointsGetOps = ffm("api.endpoints.getOps", "Gets a a list of operations")
Expand Down Expand Up @@ -162,6 +162,7 @@ var (
APIEndpointsPostNewOrganization = ffm("api.endpoints.postNewOrganization", "Registers a new org in the network")
APIEndpointsPostNewSubscription = ffm("api.endpoints.postNewSubscription", "Creates a new subscription for an application to receive events from FireFly")
APIEndpointsPostOpRetry = ffm("api.endpoints.postOpRetry", "Retries a failed operation")
APIEndpointsPostPinsRewind = ffm("api.endpoints.postPinsRewind", "Force a rewind of the event aggregator to a previous position, to re-evaluate all unconfirmed pins since that point")
APIEndpointsPostTokenApproval = ffm("api.endpoints.postTokenApproval", "Creates a token approval")
APIEndpointsPostTokenBurn = ffm("api.endpoints.postTokenBurn", "Burns some tokens")
APIEndpointsPostTokenMint = ffm("api.endpoints.postTokenMint", "Mints some tokens")
Expand Down
22 changes: 12 additions & 10 deletions internal/coremsgs/en_struct_descriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,16 +452,18 @@ var (
BatchFlushStatusTotalErrors = ffm("BatchFlushStatus.totalErrors", "The total count of error flushed encountered by this processor since it started")

// Pin field descriptions
PinSequence = ffm("Pin.sequence", "The order of the pin in the local FireFly database, which matches the order in which pins were delivered to FireFly by the blockchain connector event stream")
PinNamespace = ffm("Pin.namespace", "The namespace of the pin")
PinMasked = ffm("Pin.masked", "True if the pin is for a private message, and hence is masked with the group ID and salted with a nonce so observers of the blockchain cannot use pin hash to match this transaction to other transactions or participants")
PinHash = ffm("Pin.hash", "The hash represents a topic within a message in the batch. If a message has multiple topics, then multiple pins are created. If the message is private, the hash is masked for privacy")
PinBatch = ffm("Pin.batch", "The UUID of the batch of messages this pin is part of")
PinBatchHash = ffm("Pin.batchHash", "The manifest hash batch of messages this pin is part of")
PinIndex = ffm("Pin.index", "The index of this pin within the batch. One pin is created for each topic, of each message in the batch")
PinDispatched = ffm("Pin.dispatched", "Once true, this pin has been processed and will not be processed again")
PinSigner = ffm("Pin.signer", "The blockchain signing key that submitted this transaction, as passed through to FireFly by the smart contract that emitted the blockchain event")
PinCreated = ffm("Pin.created", "The time the FireFly node created the pin")
PinSequence = ffm("Pin.sequence", "The order of the pin in the local FireFly database, which matches the order in which pins were delivered to FireFly by the blockchain connector event stream")
PinNamespace = ffm("Pin.namespace", "The namespace of the pin")
PinMasked = ffm("Pin.masked", "True if the pin is for a private message, and hence is masked with the group ID and salted with a nonce so observers of the blockchain cannot use pin hash to match this transaction to other transactions or participants")
PinHash = ffm("Pin.hash", "The hash represents a topic within a message in the batch. If a message has multiple topics, then multiple pins are created. If the message is private, the hash is masked for privacy")
PinBatch = ffm("Pin.batch", "The UUID of the batch of messages this pin is part of")
PinBatchHash = ffm("Pin.batchHash", "The manifest hash batch of messages this pin is part of")
PinIndex = ffm("Pin.index", "The index of this pin within the batch. One pin is created for each topic, of each message in the batch")
PinDispatched = ffm("Pin.dispatched", "Once true, this pin has been processed and will not be processed again")
PinSigner = ffm("Pin.signer", "The blockchain signing key that submitted this transaction, as passed through to FireFly by the smart contract that emitted the blockchain event")
PinCreated = ffm("Pin.created", "The time the FireFly node created the pin")
PinRewindSequence = ffm("PinRewind.sequence", "The sequence of the pin to which the event aggregator should rewind. Either sequence or batch must be specified")
PinRewindBatch = ffm("PinRewind.batch", "The ID of the batch to which the event aggregator should rewind. Either sequence or batch must be specified")

// Subscription field descriptions
SubscriptionID = ffm("Subscription.id", "The UUID of the subscription")
Expand Down
12 changes: 6 additions & 6 deletions internal/events/aggregator_rewind_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,15 +217,15 @@ func TestProcessStagedRewindsNoDIDs(t *testing.T) {

func TestPopRewindsDoublePopNoBlock(t *testing.T) {

ag, cancel := newTestAggregator()
defer cancel()
em, done := newTestEventManager(t)
defer done()

ag.rewinder.queuedRewinds = []*rewind{{rewindType: rewindBatch}}
batchIDs := ag.rewinder.popRewinds()
em.QueueBatchRewind(fftypes.NewUUID())
batchIDs := em.aggregator.rewinder.popRewinds()
assert.Empty(t, batchIDs)

ag.rewinder.queuedRewinds = []*rewind{{rewindType: rewindBatch}}
batchIDs = ag.rewinder.popRewinds()
em.QueueBatchRewind(fftypes.NewUUID())
batchIDs = em.aggregator.rewinder.popRewinds()
assert.Empty(t, batchIDs)

}
5 changes: 5 additions & 0 deletions internal/events/event_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type EventManager interface {
DeleteDurableSubscription(ctx context.Context, subDef *core.Subscription) (err error)
CreateUpdateDurableSubscription(ctx context.Context, subDef *core.Subscription, mustNew bool) (err error)
EnrichEvent(ctx context.Context, event *core.Event) (*core.EnrichedEvent, error)
QueueBatchRewind(batchID *fftypes.UUID)
Start() error
WaitStop()

Expand Down Expand Up @@ -279,3 +280,7 @@ func (em *eventManager) GetPlugins() []*core.NamespaceStatusPlugin {
func (em *eventManager) EnrichEvent(ctx context.Context, event *core.Event) (*core.EnrichedEvent, error) {
return em.enricher.enrichEvent(ctx, event)
}

func (em *eventManager) QueueBatchRewind(batchID *fftypes.UUID) {
em.aggregator.queueBatchRewind(batchID)
}
17 changes: 17 additions & 0 deletions internal/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ type Orchestrator interface {
GetBlockchainEventByID(ctx context.Context, id string) (*core.BlockchainEvent, error)
GetBlockchainEvents(ctx context.Context, filter database.AndFilter) ([]*core.BlockchainEvent, *database.FilterResult, error)
GetPins(ctx context.Context, filter database.AndFilter) ([]*core.Pin, *database.FilterResult, error)
RewindPins(ctx context.Context, rewind *core.PinRewind) (*core.PinRewind, error)

// Charts
GetChartHistogram(ctx context.Context, startTime int64, endTime int64, buckets int64, tableName database.CollectionName) ([]*core.ChartHistogram, error)
Expand Down Expand Up @@ -524,3 +525,19 @@ func (or *orchestrator) Authorize(ctx context.Context, authReq *fftypes.AuthReq)
}
return nil
}

func (or *orchestrator) RewindPins(ctx context.Context, rewind *core.PinRewind) (*core.PinRewind, error) {
if rewind.Sequence > 0 {
fb := database.PinQueryFactory.NewFilter(ctx)
if pins, _, err := or.GetPins(ctx, fb.And(fb.Eq("seq", rewind.Sequence))); err != nil {
return nil, err
} else if len(pins) > 0 {
rewind.Batch = pins[0].Batch
or.events.QueueBatchRewind(rewind.Batch)
return rewind, nil
}
return nil, i18n.NewError(ctx, coremsgs.Msg404NotFound)
}
or.events.QueueBatchRewind(rewind.Batch)
return rewind, nil
}
Loading

0 comments on commit d812ffe

Please sign in to comment.