Skip to content

Commit

Permalink
Merge remote-tracking branch 'kaleido/pins' into namespacemgr
Browse files Browse the repository at this point in the history
  • Loading branch information
awrichar committed Jun 9, 2022
2 parents 492193a + 2d5a320 commit 615893b
Show file tree
Hide file tree
Showing 23 changed files with 66 additions and 105 deletions.
15 changes: 6 additions & 9 deletions .github/workflows/docker_release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,37 +45,34 @@ jobs:
docker push ghcr.io/hyperledger/firefly:latest
- name: Tag alpha release
if: github.event.action == 'prereleased' && contains(env.GITHUB_REF, 'alpha')
if: github.event.action == 'prereleased' && contains(github.ref, 'alpha')
run: |
echo ${{ env.GITHUB_REF }}
docker tag ghcr.io/hyperledger/firefly:${GITHUB_REF##*/} ghcr.io/hyperledger/firefly:alpha
- name: Push alpha tag
if: github.event.action == 'prereleased' && contains(env.GITHUB_REF, 'alpha')
if: github.event.action == 'prereleased' && contains(github.ref, 'alpha')
run: |
echo ${{ secrets.GITHUB_TOKEN }} | docker login ghcr.io -u $GITHUB_ACTOR --password-stdin
docker push ghcr.io/hyperledger/firefly:alpha
- name: Tag beta release
if: github.event.action == 'prereleased' && contains(env.GITHUB_REF, 'beta')
if: github.event.action == 'prereleased' && contains(github.ref, 'beta')
run: |
echo ${{ env.GITHUB_REF }}
docker tag ghcr.io/hyperledger/firefly:${GITHUB_REF##*/} ghcr.io/hyperledger/firefly:beta
- name: Push beta tag
if: github.event.action == 'prereleased' && contains(env.GITHUB_REF, 'beta')
if: github.event.action == 'prereleased' && contains(github.ref, 'beta')
run: |
echo ${{ secrets.GITHUB_TOKEN }} | docker login ghcr.io -u $GITHUB_ACTOR --password-stdin
docker push ghcr.io/hyperledger/firefly:beta
- name: Tag rc release
if: github.event.action == 'prereleased' && contains(env.GITHUB_REF, 'rc')
if: github.event.action == 'prereleased' && contains(github.ref, 'rc')
run: |
echo ${{ env.GITHUB_REF }}
docker tag ghcr.io/hyperledger/firefly:${GITHUB_REF##*/} ghcr.io/hyperledger/firefly:rc
- name: Push rc tag
if: github.event.action == 'prereleased' && contains(env.GITHUB_REF, 'rc')
if: github.event.action == 'prereleased' && contains(github.ref, 'rc')
run: |
echo ${{ secrets.GITHUB_TOKEN }} | docker login ghcr.io -u $GITHUB_ACTOR --password-stdin
docker push ghcr.io/hyperledger/firefly:rc
3 changes: 3 additions & 0 deletions db/migrations/postgres/000092_add_pin_namespace.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
BEGIN;
ALTER TABLE pins DROP COLUMN namespace;
COMMIT;
5 changes: 5 additions & 0 deletions db/migrations/postgres/000092_add_pin_namespace.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
BEGIN;
ALTER TABLE pins ADD COLUMN namespace VARCHAR(64);
UPDATE pins SET namespace = "ff_system";
ALTER TABLE pins ALTER COLUMN namespace SET NOT NULL;
COMMIT;
1 change: 1 addition & 0 deletions db/migrations/sqlite/000092_add_pin_namespace.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE pins DROP COLUMN namespace;
2 changes: 2 additions & 0 deletions db/migrations/sqlite/000092_add_pin_namespace.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE pins ADD COLUMN namespace VARCHAR(64);
UPDATE pins SET namespace = "ff_system";
10 changes: 8 additions & 2 deletions docs/swagger/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19854,7 +19854,7 @@ paths:
- Non-Default Namespace
/namespaces/{ns}/pins:
get:
description: Queries the pins table that is the status of the event aggregator
description: Queries the list of pins received from the blockchain
operationId: getPinsNamespace
parameters:
- description: The namespace which scopes this request
Expand Down Expand Up @@ -19983,6 +19983,9 @@ paths:
of the blockchain cannot use pin hash to match this transaction
to other transactions or participants
type: boolean
namespace:
description: The namespace of the pin
type: string
sequence:
description: The order of the pin in the local FireFly database,
which matches the order in which pins were delivered to FireFly
Expand Down Expand Up @@ -27352,7 +27355,7 @@ paths:
- Default Namespace
/pins:
get:
description: Queries the pins table that is the status of the event aggregator
description: Queries the list of pins received from the blockchain
operationId: getPins
parameters:
- description: Server-side request timeout (milliseconds, or set a custom suffix
Expand Down Expand Up @@ -27474,6 +27477,9 @@ paths:
of the blockchain cannot use pin hash to match this transaction
to other transactions or participants
type: boolean
namespace:
description: The namespace of the pin
type: string
sequence:
description: The order of the pin in the local FireFly database,
which matches the order in which pins were delivered to FireFly
Expand Down
4 changes: 2 additions & 2 deletions internal/apiserver/route_get_pins.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@ var getPins = &ffapi.Route{
Method: http.MethodGet,
PathParams: nil,
QueryParams: nil,
Description: coremsgs.APIEndpointsGetStatusPins,
Description: coremsgs.APIEndpointsGetPins,
JSONInputValue: nil,
JSONOutputValue: func() interface{} { return []core.Pin{} },
JSONOutputCodes: []int{http.StatusOK},
Extensions: &coreExtensions{
FilterFactory: database.PinQueryFactory,
CoreJSONHandler: func(r *ffapi.APIRequest, cr *coreRequest) (output interface{}, err error) {
return filterResult(cr.or.GetPins(cr.ctx, cr.filter))
return filterResult(cr.or.GetPins(cr.ctx, extractNamespace(r.PP), cr.filter))
},
},
}
2 changes: 1 addition & 1 deletion internal/apiserver/route_get_pins_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestGetPins(t *testing.T) {
req.Header.Set("Content-Type", "application/json; charset=utf-8")
res := httptest.NewRecorder()

o.On("GetPins", mock.Anything, mock.Anything).
o.On("GetPins", mock.Anything, "default", mock.Anything).
Return([]*core.Pin{}, nil, nil)
r.ServeHTTP(res, req)

Expand Down
2 changes: 1 addition & 1 deletion internal/coremsgs/en_api_translations.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ var (
APIEndpointsGetOpByID = ffm("api.endpoints.getOpByID", "Gets an operation by ID")
APIEndpointsGetOps = ffm("api.endpoints.getOps", "Gets a a list of operations")
APIEndpointsGetStatusBatchManager = ffm("api.endpoints.getStatusBatchManager", "Gets the status of the batch manager")
APIEndpointsGetStatusPins = ffm("api.endpoints.getStatusPins", "Queries the pins table that is the status of the event aggregator")
APIEndpointsGetPins = ffm("api.endpoints.getPins", "Queries the list of pins received from the blockchain")
APIEndpointsGetStatusWebSockets = ffm("api.endpoints.getStatusWebSockets", "Gets the status of the current WebSocket connections to this node")
APIEndpointsGetStatus = ffm("api.endpoints.getStatus", "Gets the status of this node")
APIEndpointsGetSubscriptionByID = ffm("api.endpoints.getSubscriptionByID", "Gets a subscription by its ID")
Expand Down
1 change: 1 addition & 0 deletions internal/coremsgs/en_struct_descriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,7 @@ var (

// Pin field descriptions
PinSequence = ffm("Pin.sequence", "The order of the pin in the local FireFly database, which matches the order in which pins were delivered to FireFly by the blockchain connector event stream")
PinNamespace = ffm("Pin.namespace", "The namespace of the pin")
PinMasked = ffm("Pin.masked", "True if the pin is for a private message, and hence is masked with the group ID and salted with a nonce so observers of the blockchain cannot use pin hash to match this transaction to other transactions or participants")
PinHash = ffm("Pin.hash", "The hash represents a topic within a message in the batch. If a message has multiple topics, then multiple pins are created. If the message is private, the hash is masked for privacy")
PinBatch = ffm("Pin.batch", "The UUID of the batch of messages this pin is part of")
Expand Down
28 changes: 5 additions & 23 deletions internal/database/sqlcommon/pin_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

var (
pinColumns = []string{
"namespace",
"masked",
"hash",
"batch_id",
Expand Down Expand Up @@ -93,14 +94,15 @@ func (s *SQLCommon) attemptPinInsert(ctx context.Context, tx *txWrapper, pin *co
s.setPinInsertValues(sq.Insert(pinsTable).Columns(pinColumns...), pin),
func() {
log.L(ctx).Debugf("Triggering creation event for pin %d", pin.Sequence)
s.callbacks.OrderedCollectionEvent(database.CollectionPins, core.ChangeEventTypeCreated, pin.Sequence)
s.callbacks.OrderedCollectionNSEvent(database.CollectionPins, core.ChangeEventTypeCreated, pin.Namespace, pin.Sequence)
},
)
return err
}

func (s *SQLCommon) setPinInsertValues(query sq.InsertBuilder, pin *core.Pin) sq.InsertBuilder {
return query.Values(
pin.Namespace,
pin.Masked,
pin.Hash,
pin.Batch,
Expand Down Expand Up @@ -128,7 +130,7 @@ func (s *SQLCommon) InsertPins(ctx context.Context, pins []*core.Pin) error {
err := s.insertTxRows(ctx, pinsTable, tx, query, func() {
for i, pin := range pins {
pin.Sequence = sequences[i]
s.callbacks.OrderedCollectionEvent(database.CollectionPins, core.ChangeEventTypeCreated, pin.Sequence)
s.callbacks.OrderedCollectionNSEvent(database.CollectionPins, core.ChangeEventTypeCreated, pin.Namespace, pin.Sequence)
}
}, sequences, true /* we want the caller to be able to retry with individual upserts */)
if err != nil {
Expand All @@ -149,6 +151,7 @@ func (s *SQLCommon) InsertPins(ctx context.Context, pins []*core.Pin) error {
func (s *SQLCommon) pinResult(ctx context.Context, row *sql.Rows) (*core.Pin, error) {
pin := core.Pin{}
err := row.Scan(
&pin.Namespace,
&pin.Masked,
&pin.Hash,
&pin.Batch,
Expand Down Expand Up @@ -218,24 +221,3 @@ func (s *SQLCommon) UpdatePins(ctx context.Context, filter database.Filter, upda

return s.commitTx(ctx, tx, autoCommit)
}

func (s *SQLCommon) DeletePin(ctx context.Context, sequence int64) (err error) {

ctx, tx, autoCommit, err := s.beginOrUseTx(ctx)
if err != nil {
return err
}
defer s.rollbackTx(ctx, tx, autoCommit)

err = s.deleteTx(ctx, pinsTable, tx, sq.Delete(pinsTable).Where(sq.Eq{
sequenceColumn: sequence,
}),
func() {
s.callbacks.OrderedCollectionEvent(database.CollectionPins, core.ChangeEventTypeDeleted, sequence)
})
if err != nil {
return err
}

return s.commitTx(ctx, tx, autoCommit)
}
35 changes: 6 additions & 29 deletions internal/database/sqlcommon/pin_sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func TestPinsE2EWithDB(t *testing.T) {

// Create a new pin entry
pin := &core.Pin{
Namespace: "ns",
Masked: true,
Hash: fftypes.NewRandB32(),
Batch: fftypes.NewUUID(),
Expand All @@ -49,8 +50,7 @@ func TestPinsE2EWithDB(t *testing.T) {
Dispatched: false,
}

s.callbacks.On("OrderedCollectionEvent", database.CollectionPins, core.ChangeEventTypeCreated, mock.Anything).Return()
s.callbacks.On("OrderedCollectionEvent", database.CollectionPins, core.ChangeEventTypeDeleted, mock.Anything).Return()
s.callbacks.On("OrderedCollectionNSEvent", database.CollectionPins, core.ChangeEventTypeCreated, "ns", mock.Anything).Return()

err := s.UpsertPin(ctx, pin)
assert.NoError(t, err)
Expand Down Expand Up @@ -83,13 +83,6 @@ func TestPinsE2EWithDB(t *testing.T) {
assert.Equal(t, existingSequence, pin.Sequence)
assert.True(t, pin.Dispatched)

// Test delete
err = s.DeletePin(ctx, pin.Sequence)
assert.NoError(t, err)
p, _, err := s.GetPins(ctx, filter)
assert.NoError(t, err)
assert.Equal(t, 0, len(p))

s.callbacks.AssertExpectations(t)
}

Expand Down Expand Up @@ -157,10 +150,10 @@ func TestInsertPinsMultiRowOK(t *testing.T) {
s.features.MultiRowInsert = true
s.fakePSQLInsert = true

pin1 := &core.Pin{Hash: fftypes.NewRandB32()}
pin2 := &core.Pin{Hash: fftypes.NewRandB32()}
s.callbacks.On("OrderedCollectionEvent", database.CollectionPins, core.ChangeEventTypeCreated, int64(1001))
s.callbacks.On("OrderedCollectionEvent", database.CollectionPins, core.ChangeEventTypeCreated, int64(1002))
pin1 := &core.Pin{Namespace: "ns1", Hash: fftypes.NewRandB32()}
pin2 := &core.Pin{Namespace: "ns1", Hash: fftypes.NewRandB32()}
s.callbacks.On("OrderedCollectionNSEvent", database.CollectionPins, core.ChangeEventTypeCreated, "ns1", int64(1001))
s.callbacks.On("OrderedCollectionNSEvent", database.CollectionPins, core.ChangeEventTypeCreated, "ns1", int64(1002))

mock.ExpectBegin()
mock.ExpectQuery("INSERT.*").WillReturnRows(sqlmock.NewRows([]string{sequenceColumn}).
Expand Down Expand Up @@ -258,19 +251,3 @@ func TestUpdatePinsBadUpdate(t *testing.T) {
err := s.UpdatePins(ctx, database.PinQueryFactory.NewFilter(ctx).Eq("bad", 1), database.PinQueryFactory.NewUpdate(ctx).Set("dispatched", true))
assert.Regexp(t, "FF00142", err)
}

func TestPinDeleteBeginFail(t *testing.T) {
s, mock := newMockProvider().init()
mock.ExpectBegin().WillReturnError(fmt.Errorf("pop"))
err := s.DeletePin(context.Background(), 12345)
assert.Regexp(t, "FF10114", err)
}

func TestPinDeleteFail(t *testing.T) {
s, mock := newMockProvider().init()
mock.ExpectBegin()
mock.ExpectExec("DELETE .*").WillReturnError(fmt.Errorf("pop"))
mock.ExpectRollback()
err := s.DeletePin(context.Background(), 12345)
assert.Regexp(t, "FF10118", err)
}
1 change: 1 addition & 0 deletions internal/events/batch_pin_complete.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ func (em *eventManager) persistContexts(ctx context.Context, batchPin *blockchai
pins := make([]*core.Pin, len(batchPin.Contexts))
for idx, hash := range batchPin.Contexts {
pins[idx] = &core.Pin{
Namespace: batchPin.Namespace,
Masked: private,
Hash: hash,
Batch: batchPin.BatchID,
Expand Down
3 changes: 2 additions & 1 deletion internal/orchestrator/data_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,8 @@ func (or *orchestrator) GetTransactionBlockchainEvents(ctx context.Context, ns,
return or.database().GetBlockchainEvents(ctx, filter)
}

func (or *orchestrator) GetPins(ctx context.Context, filter database.AndFilter) ([]*core.Pin, *database.FilterResult, error) {
func (or *orchestrator) GetPins(ctx context.Context, ns string, filter database.AndFilter) ([]*core.Pin, *database.FilterResult, error) {
filter = or.scopeNS(ns, filter)
return or.database().GetPins(ctx, filter)
}

Expand Down
2 changes: 1 addition & 1 deletion internal/orchestrator/data_query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -766,6 +766,6 @@ func TestGetPins(t *testing.T) {
or.mdi.On("GetPins", mock.Anything, mock.Anything).Return([]*core.Pin{}, nil, nil)
fb := database.PinQueryFactory.NewFilter(context.Background())
f := fb.And(fb.Eq("hash", u))
_, _, err := or.GetPins(context.Background(), f)
_, _, err := or.GetPins(context.Background(), "ns1", f)
assert.NoError(t, err)
}
2 changes: 1 addition & 1 deletion internal/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ type Orchestrator interface {
GetEventsWithReferences(ctx context.Context, ns string, filter database.AndFilter) ([]*core.EnrichedEvent, *database.FilterResult, error)
GetBlockchainEventByID(ctx context.Context, ns, id string) (*core.BlockchainEvent, error)
GetBlockchainEvents(ctx context.Context, ns string, filter database.AndFilter) ([]*core.BlockchainEvent, *database.FilterResult, error)
GetPins(ctx context.Context, filter database.AndFilter) ([]*core.Pin, *database.FilterResult, error)
GetPins(ctx context.Context, ns string, filter database.AndFilter) ([]*core.Pin, *database.FilterResult, error)

// Charts
GetChartHistogram(ctx context.Context, ns string, startTime int64, endTime int64, buckets int64, tableName database.CollectionName) ([]*core.ChartHistogram, error)
Expand Down
3 changes: 2 additions & 1 deletion internal/orchestrator/persistence_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,14 @@ func (or *orchestrator) OrderedUUIDCollectionNSEvent(resType database.OrderedUUI
})
}

func (or *orchestrator) OrderedCollectionEvent(resType database.OrderedCollection, eventType core.ChangeEventType, sequence int64) {
func (or *orchestrator) OrderedCollectionNSEvent(resType database.OrderedCollectionNS, eventType core.ChangeEventType, ns string, sequence int64) {
if eventType == core.ChangeEventTypeCreated && resType == database.CollectionPins {
or.events.NewPins() <- sequence
}
or.adminEvents.Dispatch(&core.ChangeEvent{
Collection: string(resType),
Type: eventType,
Namespace: ns,
Sequence: &sequence,
})
}
Expand Down
2 changes: 1 addition & 1 deletion internal/orchestrator/persistence_events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestPinCreated(t *testing.T) {
}
mae.On("Dispatch", mock.Anything).Return()
mem.On("NewPins").Return((chan<- int64)(make(chan int64, 1)))
o.OrderedCollectionEvent(database.CollectionPins, core.ChangeEventTypeCreated, 12345)
o.OrderedCollectionNSEvent(database.CollectionPins, core.ChangeEventTypeCreated, "ns1", 12345)
mem.AssertExpectations(t)
mae.AssertExpectations(t)
}
Expand Down
6 changes: 3 additions & 3 deletions mocks/databasemocks/callbacks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 0 additions & 14 deletions mocks/databasemocks/plugin.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 615893b

Please sign in to comment.