Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport of fix(txn): validate verbs into release/1.19.x #21520

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/21519.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
txn: Fix a bug where mismatched Consul server versions could result in undetected data loss for when using newer Transaction verbs.
```
7 changes: 5 additions & 2 deletions agent/consul/kvs_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ func kvsPreApply(logger hclog.Logger, srv *Server, authz resolver.Result, op api
return false, fmt.Errorf("Must provide key")
}

// Apply the ACL policy if any.
// Apply the ACL policy if any, and validate operation.
// enumcover:api.KVOp
switch op {
case api.KVDeleteTree:
var authzContext acl.AuthorizerContext
Expand All @@ -66,13 +67,15 @@ func kvsPreApply(logger hclog.Logger, srv *Server, authz resolver.Result, op api
return false, err
}

default:
case api.KVCheckNotExists, api.KVUnlock, api.KVLock, api.KVCAS, api.KVDeleteCAS, api.KVDelete, api.KVSet:
var authzContext acl.AuthorizerContext
dirEnt.FillAuthzContext(&authzContext)

if err := authz.ToAllowAuthorizer().KeyWriteAllowed(dirEnt.Key, &authzContext); err != nil {
return false, err
}
default:
return false, fmt.Errorf("unknown KV operation: %s", op)
}

// If this is a lock, we must check for a lock-delay. Since lock-delay
Expand Down
56 changes: 43 additions & 13 deletions agent/consul/state/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,31 @@
package state

import (
"errors"
"fmt"

"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
)

type UnsupportedFSMApplyPanicError struct {
Wrapped error
}

func (e *UnsupportedFSMApplyPanicError) Unwrap() error {
return e.Wrapped
}

func (e *UnsupportedFSMApplyPanicError) Error() string {
return e.Wrapped.Error()
}

// txnKVS handles all KV-related operations.
func (s *Store) txnKVS(tx WriteTxn, idx uint64, op *structs.TxnKVOp) (structs.TxnResults, error) {
var entry *structs.DirEntry
var err error

// enumcover: api.KVOp
switch op.Verb {
case api.KVSet:
entry = &op.DirEnt
Expand Down Expand Up @@ -95,7 +109,7 @@ func (s *Store) txnKVS(tx WriteTxn, idx uint64, op *structs.TxnKVOp) (structs.Tx
}

default:
err = fmt.Errorf("unknown KV verb %q", op.Verb)
err = &UnsupportedFSMApplyPanicError{fmt.Errorf("unknown KV verb %q", op.Verb)}
}
if err != nil {
return nil, err
Expand Down Expand Up @@ -123,11 +137,12 @@ func (s *Store) txnKVS(tx WriteTxn, idx uint64, op *structs.TxnKVOp) (structs.Tx
func txnSession(tx WriteTxn, idx uint64, op *structs.TxnSessionOp) error {
var err error

// enumcover: api.SessionOp
switch op.Verb {
case api.SessionDelete:
err = sessionDeleteWithSession(tx, &op.Session, idx)
default:
err = fmt.Errorf("unknown Session verb %q", op.Verb)
return &UnsupportedFSMApplyPanicError{fmt.Errorf("unknown session verb %q", op.Verb)}
}
if err != nil {
return fmt.Errorf("failed to delete session: %v", err)
Expand All @@ -146,11 +161,17 @@ func txnLegacyIntention(tx WriteTxn, idx uint64, op *structs.TxnIntentionOp) err
case structs.IntentionOpDelete:
return legacyIntentionDeleteTxn(tx, idx, op.Intention.ID)
case structs.IntentionOpDeleteAll:
fallthrough // deliberately not available via this api
// deliberately not available via this api
return fmt.Errorf("Intention op not supported %q", op.Op)
case structs.IntentionOpUpsert:
fallthrough // deliberately not available via this api
// deliberately not available via this api
return fmt.Errorf("Intention op not supported %q", op.Op)
default:
return fmt.Errorf("unknown Intention op %q", op.Op)
// If we've gotten to this point, the unknown verb has slipped by
// endpoint validation. This means it could be a mismatch in Server versions
// that are sending known verbs as part of Raft logs. We panic rather than silently
// swallowing the error during Raft Apply.
panic(fmt.Sprintf("unknown Intention op %q", op.Op))
}
}

Expand Down Expand Up @@ -202,7 +223,7 @@ func (s *Store) txnNode(tx WriteTxn, idx uint64, op *structs.TxnNodeOp) (structs
}

default:
err = fmt.Errorf("unknown Node verb %q", op.Verb)
err = &UnsupportedFSMApplyPanicError{fmt.Errorf("unknown Node verb %q", op.Verb)}
}
if err != nil {
return nil, err
Expand Down Expand Up @@ -271,7 +292,7 @@ func (s *Store) txnService(tx WriteTxn, idx uint64, op *structs.TxnServiceOp) (s
return nil, err

default:
return nil, fmt.Errorf("unknown Service verb %q", op.Verb)
return nil, &UnsupportedFSMApplyPanicError{fmt.Errorf("unknown Service verb %q", op.Verb)}
}
}

Expand Down Expand Up @@ -326,7 +347,7 @@ func (s *Store) txnCheck(tx WriteTxn, idx uint64, op *structs.TxnCheckOp) (struc
}

default:
err = fmt.Errorf("unknown Check verb %q", op.Verb)
err = &UnsupportedFSMApplyPanicError{fmt.Errorf("unknown check verb %q", op.Verb)}
}
if err != nil {
return nil, err
Expand All @@ -352,7 +373,7 @@ func (s *Store) txnCheck(tx WriteTxn, idx uint64, op *structs.TxnCheckOp) (struc
// txnDispatch runs the given operations inside the state store transaction.
func (s *Store) txnDispatch(tx WriteTxn, idx uint64, ops structs.TxnOps) (structs.TxnResults, structs.TxnErrors) {
results := make(structs.TxnResults, 0, len(ops))
errors := make(structs.TxnErrors, 0, len(ops))
errs := make(structs.TxnErrors, 0, len(ops))
for i, op := range ops {
var ret structs.TxnResults
var err error
Expand All @@ -374,24 +395,33 @@ func (s *Store) txnDispatch(tx WriteTxn, idx uint64, ops structs.TxnOps) (struct
// compatibility with pre-1.9.0 raft logs and during upgrades.
err = txnLegacyIntention(tx, idx, op.Intention)
default:
err = fmt.Errorf("no operation specified")
panic("no operation specified")
}

// Accumulate the results.
results = append(results, ret...)

var panicErr *UnsupportedFSMApplyPanicError
if errors.As(err, &panicErr) {
// If we've gotten to this point, the unknown verb has slipped by
// endpoint validation. This means it could be a mismatch in Server versions
// that are sending known verbs as part of Raft logs. We panic rather than silently
// swallowing the error during Raft Apply. See NET-9016 for historical context.
panic(panicErr.Wrapped)
}

// Capture any error along with the index of the operation that
// failed.
if err != nil {
errors = append(errors, &structs.TxnError{
errs = append(errs, &structs.TxnError{
OpIndex: i,
What: err.Error(),
})
}
}

if len(errors) > 0 {
return nil, errors
if len(errs) > 0 {
return nil, errs
}

return results, nil
Expand Down
70 changes: 61 additions & 9 deletions agent/consul/state/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1058,14 +1058,6 @@ func TestStateStore_Txn_KVS_Rollback(t *testing.T) {
},
},
},
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: "nope",
DirEnt: structs.DirEntry{
Key: "foo/delete",
},
},
},
}
results, errors := s.TxnRW(7, ops)
if len(errors) != len(ops) {
Expand All @@ -1086,7 +1078,6 @@ func TestStateStore_Txn_KVS_Rollback(t *testing.T) {
`key "nope" doesn't exist`,
"current modify index",
`key "nope" doesn't exist`,
"unknown KV verb",
}
if len(errors) != len(expected) {
t.Fatalf("bad len: %d != %d", len(errors), len(expected))
Expand Down Expand Up @@ -1415,3 +1406,64 @@ func TestStateStore_Txn_KVS_ModifyIndexes(t *testing.T) {
}
}
}

// TestStateStore_UnknownTxnOperationsPanic validates that unknown txn operations panic.
// If we error in this case this is from an FSM Apply, the state store of this agent could potentially be out of
// sync with other agents that applied the operation. In the case of responding to a local endpoint, we require
// that the operation type be validated prior to being sent to the state store.
// See NET-9016 for historical context.
func TestStateStore_UnknownTxnOperationsPanic(t *testing.T) {
s := testStateStore(t)

testCases := []structs.TxnOps{
{
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: "sand-the-floor",
DirEnt: structs.DirEntry{
Key: "foo/a",
},
},
},
},
{
&structs.TxnOp{
Node: &structs.TxnNodeOp{
Verb: "wax-the-car",
},
},
},
{
&structs.TxnOp{
Service: &structs.TxnServiceOp{
Verb: "paint-the-house",
},
},
},
{
&structs.TxnOp{
Check: &structs.TxnCheckOp{
Verb: "paint-the-fence",
},
},
},
{
&structs.TxnOp{
Session: &structs.TxnSessionOp{
Verb: "sweep-the-knee",
},
},
},
{
&structs.TxnOp{
Intention: &structs.TxnIntentionOp{ // nolint:staticcheck // SA1019 intentional use of deprecated field
Op: "flying-crane-kick",
},
},
},
}

for _, tc := range testCases {
require.Panics(t, func() { s.TxnRW(3, tc) })
}
}
Loading
Loading