From 763cd0bffb6e40d9c8813dd2a9cf0bed28427667 Mon Sep 17 00:00:00 2001 From: Dan Stough Date: Fri, 5 Jul 2024 14:51:20 -0400 Subject: [PATCH] fix(txn): validate verbs (#21519) * fix(txn): validate verbs * changelog --- .changelog/21519.txt | 3 + agent/consul/kvs_endpoint.go | 7 +- agent/consul/state/txn.go | 56 +++++++++---- agent/consul/state/txn_test.go | 70 ++++++++++++++--- agent/consul/txn_endpoint.go | 119 ++++++++++++++++++++++++++-- agent/consul/txn_endpoint_test.go | 125 ++++++++++++++++++++++++++++++ 6 files changed, 350 insertions(+), 30 deletions(-) create mode 100644 .changelog/21519.txt diff --git a/.changelog/21519.txt b/.changelog/21519.txt new file mode 100644 index 000000000000..2913443b1c4d --- /dev/null +++ b/.changelog/21519.txt @@ -0,0 +1,3 @@ +```release-note:bug +txn: Fix a bug where mismatched Consul server versions could result in undetected data loss for when using newer Transaction verbs. +``` diff --git a/agent/consul/kvs_endpoint.go b/agent/consul/kvs_endpoint.go index 65dc2cd56d40..bfad9fa1932a 100644 --- a/agent/consul/kvs_endpoint.go +++ b/agent/consul/kvs_endpoint.go @@ -42,7 +42,8 @@ func kvsPreApply(logger hclog.Logger, srv *Server, authz resolver.Result, op api return false, fmt.Errorf("Must provide key") } - // Apply the ACL policy if any. + // Apply the ACL policy if any, and validate operation. + // enumcover:api.KVOp switch op { case api.KVDeleteTree: var authzContext acl.AuthorizerContext @@ -66,13 +67,15 @@ func kvsPreApply(logger hclog.Logger, srv *Server, authz resolver.Result, op api return false, err } - default: + case api.KVCheckNotExists, api.KVUnlock, api.KVLock, api.KVCAS, api.KVDeleteCAS, api.KVDelete, api.KVSet: var authzContext acl.AuthorizerContext dirEnt.FillAuthzContext(&authzContext) if err := authz.ToAllowAuthorizer().KeyWriteAllowed(dirEnt.Key, &authzContext); err != nil { return false, err } + default: + return false, fmt.Errorf("unknown KV operation: %s", op) } // If this is a lock, we must check for a lock-delay. Since lock-delay diff --git a/agent/consul/state/txn.go b/agent/consul/state/txn.go index 30189fc1ed60..66cc4bb33d40 100644 --- a/agent/consul/state/txn.go +++ b/agent/consul/state/txn.go @@ -4,17 +4,31 @@ package state import ( + "errors" "fmt" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" ) +type UnsupportedFSMApplyPanicError struct { + Wrapped error +} + +func (e *UnsupportedFSMApplyPanicError) Unwrap() error { + return e.Wrapped +} + +func (e *UnsupportedFSMApplyPanicError) Error() string { + return e.Wrapped.Error() +} + // txnKVS handles all KV-related operations. func (s *Store) txnKVS(tx WriteTxn, idx uint64, op *structs.TxnKVOp) (structs.TxnResults, error) { var entry *structs.DirEntry var err error + // enumcover: api.KVOp switch op.Verb { case api.KVSet: entry = &op.DirEnt @@ -95,7 +109,7 @@ func (s *Store) txnKVS(tx WriteTxn, idx uint64, op *structs.TxnKVOp) (structs.Tx } default: - err = fmt.Errorf("unknown KV verb %q", op.Verb) + err = &UnsupportedFSMApplyPanicError{fmt.Errorf("unknown KV verb %q", op.Verb)} } if err != nil { return nil, err @@ -123,11 +137,12 @@ func (s *Store) txnKVS(tx WriteTxn, idx uint64, op *structs.TxnKVOp) (structs.Tx func txnSession(tx WriteTxn, idx uint64, op *structs.TxnSessionOp) error { var err error + // enumcover: api.SessionOp switch op.Verb { case api.SessionDelete: err = sessionDeleteWithSession(tx, &op.Session, idx) default: - err = fmt.Errorf("unknown Session verb %q", op.Verb) + return &UnsupportedFSMApplyPanicError{fmt.Errorf("unknown session verb %q", op.Verb)} } if err != nil { return fmt.Errorf("failed to delete session: %v", err) @@ -146,11 +161,17 @@ func txnLegacyIntention(tx WriteTxn, idx uint64, op *structs.TxnIntentionOp) err case structs.IntentionOpDelete: return legacyIntentionDeleteTxn(tx, idx, op.Intention.ID) case structs.IntentionOpDeleteAll: - fallthrough // deliberately not available via this api + // deliberately not available via this api + return fmt.Errorf("Intention op not supported %q", op.Op) case structs.IntentionOpUpsert: - fallthrough // deliberately not available via this api + // deliberately not available via this api + return fmt.Errorf("Intention op not supported %q", op.Op) default: - return fmt.Errorf("unknown Intention op %q", op.Op) + // If we've gotten to this point, the unknown verb has slipped by + // endpoint validation. This means it could be a mismatch in Server versions + // that are sending known verbs as part of Raft logs. We panic rather than silently + // swallowing the error during Raft Apply. + panic(fmt.Sprintf("unknown Intention op %q", op.Op)) } } @@ -202,7 +223,7 @@ func (s *Store) txnNode(tx WriteTxn, idx uint64, op *structs.TxnNodeOp) (structs } default: - err = fmt.Errorf("unknown Node verb %q", op.Verb) + err = &UnsupportedFSMApplyPanicError{fmt.Errorf("unknown Node verb %q", op.Verb)} } if err != nil { return nil, err @@ -271,7 +292,7 @@ func (s *Store) txnService(tx WriteTxn, idx uint64, op *structs.TxnServiceOp) (s return nil, err default: - return nil, fmt.Errorf("unknown Service verb %q", op.Verb) + return nil, &UnsupportedFSMApplyPanicError{fmt.Errorf("unknown Service verb %q", op.Verb)} } } @@ -326,7 +347,7 @@ func (s *Store) txnCheck(tx WriteTxn, idx uint64, op *structs.TxnCheckOp) (struc } default: - err = fmt.Errorf("unknown Check verb %q", op.Verb) + err = &UnsupportedFSMApplyPanicError{fmt.Errorf("unknown check verb %q", op.Verb)} } if err != nil { return nil, err @@ -352,7 +373,7 @@ func (s *Store) txnCheck(tx WriteTxn, idx uint64, op *structs.TxnCheckOp) (struc // txnDispatch runs the given operations inside the state store transaction. func (s *Store) txnDispatch(tx WriteTxn, idx uint64, ops structs.TxnOps) (structs.TxnResults, structs.TxnErrors) { results := make(structs.TxnResults, 0, len(ops)) - errors := make(structs.TxnErrors, 0, len(ops)) + errs := make(structs.TxnErrors, 0, len(ops)) for i, op := range ops { var ret structs.TxnResults var err error @@ -374,24 +395,33 @@ func (s *Store) txnDispatch(tx WriteTxn, idx uint64, ops structs.TxnOps) (struct // compatibility with pre-1.9.0 raft logs and during upgrades. err = txnLegacyIntention(tx, idx, op.Intention) default: - err = fmt.Errorf("no operation specified") + panic("no operation specified") } // Accumulate the results. results = append(results, ret...) + var panicErr *UnsupportedFSMApplyPanicError + if errors.As(err, &panicErr) { + // If we've gotten to this point, the unknown verb has slipped by + // endpoint validation. This means it could be a mismatch in Server versions + // that are sending known verbs as part of Raft logs. We panic rather than silently + // swallowing the error during Raft Apply. See NET-9016 for historical context. + panic(panicErr.Wrapped) + } + // Capture any error along with the index of the operation that // failed. if err != nil { - errors = append(errors, &structs.TxnError{ + errs = append(errs, &structs.TxnError{ OpIndex: i, What: err.Error(), }) } } - if len(errors) > 0 { - return nil, errors + if len(errs) > 0 { + return nil, errs } return results, nil diff --git a/agent/consul/state/txn_test.go b/agent/consul/state/txn_test.go index bda004a63a3b..c3d95a4efe3e 100644 --- a/agent/consul/state/txn_test.go +++ b/agent/consul/state/txn_test.go @@ -1058,14 +1058,6 @@ func TestStateStore_Txn_KVS_Rollback(t *testing.T) { }, }, }, - &structs.TxnOp{ - KV: &structs.TxnKVOp{ - Verb: "nope", - DirEnt: structs.DirEntry{ - Key: "foo/delete", - }, - }, - }, } results, errors := s.TxnRW(7, ops) if len(errors) != len(ops) { @@ -1086,7 +1078,6 @@ func TestStateStore_Txn_KVS_Rollback(t *testing.T) { `key "nope" doesn't exist`, "current modify index", `key "nope" doesn't exist`, - "unknown KV verb", } if len(errors) != len(expected) { t.Fatalf("bad len: %d != %d", len(errors), len(expected)) @@ -1415,3 +1406,64 @@ func TestStateStore_Txn_KVS_ModifyIndexes(t *testing.T) { } } } + +// TestStateStore_UnknownTxnOperationsPanic validates that unknown txn operations panic. +// If we error in this case this is from an FSM Apply, the state store of this agent could potentially be out of +// sync with other agents that applied the operation. In the case of responding to a local endpoint, we require +// that the operation type be validated prior to being sent to the state store. +// See NET-9016 for historical context. +func TestStateStore_UnknownTxnOperationsPanic(t *testing.T) { + s := testStateStore(t) + + testCases := []structs.TxnOps{ + { + &structs.TxnOp{ + KV: &structs.TxnKVOp{ + Verb: "sand-the-floor", + DirEnt: structs.DirEntry{ + Key: "foo/a", + }, + }, + }, + }, + { + &structs.TxnOp{ + Node: &structs.TxnNodeOp{ + Verb: "wax-the-car", + }, + }, + }, + { + &structs.TxnOp{ + Service: &structs.TxnServiceOp{ + Verb: "paint-the-house", + }, + }, + }, + { + &structs.TxnOp{ + Check: &structs.TxnCheckOp{ + Verb: "paint-the-fence", + }, + }, + }, + { + &structs.TxnOp{ + Session: &structs.TxnSessionOp{ + Verb: "sweep-the-knee", + }, + }, + }, + { + &structs.TxnOp{ + Intention: &structs.TxnIntentionOp{ // nolint:staticcheck // SA1019 intentional use of deprecated field + Op: "flying-crane-kick", + }, + }, + }, + } + + for _, tc := range testCases { + require.Panics(t, func() { s.TxnRW(3, tc) }) + } +} diff --git a/agent/consul/txn_endpoint.go b/agent/consul/txn_endpoint.go index f39cd502cb17..e704c9a2eda1 100644 --- a/agent/consul/txn_endpoint.go +++ b/agent/consul/txn_endpoint.go @@ -57,8 +57,15 @@ func (t *Txn) preCheck(authorizer resolver.Result, ops structs.TxnOps) structs.T }) } case op.Node != nil: - // Skip the pre-apply checks if this is a GET. - if op.Node.Verb == api.NodeGet { + requiresPreApply, err := nodeVerbValidate(op.Node.Verb) + if err != nil { + errors = append(errors, &structs.TxnError{ + OpIndex: i, + What: err.Error(), + }) + break + } + if !requiresPreApply { break } @@ -79,8 +86,15 @@ func (t *Txn) preCheck(authorizer resolver.Result, ops structs.TxnOps) structs.T }) } case op.Service != nil: - // Skip the pre-apply checks if this is a GET. - if op.Service.Verb == api.ServiceGet { + requiresPreApply, err := serviceVerbValidate(op.Service.Verb) + if err != nil { + errors = append(errors, &structs.TxnError{ + OpIndex: i, + What: err.Error(), + }) + break + } + if !requiresPreApply { break } @@ -92,8 +106,15 @@ func (t *Txn) preCheck(authorizer resolver.Result, ops structs.TxnOps) structs.T }) } case op.Check != nil: - // Skip the pre-apply checks if this is a GET. - if op.Check.Verb == api.CheckGet { + requiresPreApply, err := checkVerbValidate(op.Check.Verb) + if err != nil { + errors = append(errors, &structs.TxnError{ + OpIndex: i, + What: err.Error(), + }) + break + } + if !requiresPreApply { break } @@ -106,6 +127,25 @@ func (t *Txn) preCheck(authorizer resolver.Result, ops structs.TxnOps) structs.T What: err.Error(), }) } + case op.Intention != nil: + if err := intentionVerbValidate(op.Intention.Op); err != nil { + errors = append(errors, &structs.TxnError{ + OpIndex: i, + What: err.Error(), + }) + } + case op.Session != nil: + if err := sessionVerbValidate(op.Session.Verb); err != nil { + errors = append(errors, &structs.TxnError{ + OpIndex: i, + What: err.Error(), + }) + } + default: + errors = append(errors, &structs.TxnError{ + OpIndex: i, + What: "unknown operation type", + }) } } @@ -224,3 +264,70 @@ func (t *Txn) Read(args *structs.TxnReadRequest, reply *structs.TxnReadResponse) return nil } + +// nodeVerbValidate checks for a known operation type. For certain operations, +// it also indicated if further "preApply" checks are required. +func nodeVerbValidate(op api.NodeOp) (bool, error) { + // enumcover: api.NodeOp + switch op { + // Skip the pre-apply checks if this is a GET. + case api.NodeGet: + return false, nil + case api.NodeSet, api.NodeCAS, api.NodeDelete, api.NodeDeleteCAS: + return true, nil + default: + return false, fmt.Errorf("unknown node operation: %s", op) + } +} + +// serviceVerbValidate checks for a known operation type. For certain operations, +// it also indicated if further "preApply" checks are required. +func serviceVerbValidate(op api.ServiceOp) (bool, error) { + // enumcover: api.ServiceOp + switch op { + // Skip the pre-apply checks if this is a GET. + case api.ServiceGet: + return false, nil + case api.ServiceSet, api.ServiceCAS, api.ServiceDelete, api.ServiceDeleteCAS: + return true, nil + default: + return false, fmt.Errorf("unknown service operation: %s", op) + } +} + +// checkVerbValidate checks for a known operation type. For certain operations, +// it also indicated if further "preApply" checks are required. +func checkVerbValidate(op api.CheckOp) (bool, error) { + // enumcover: api.CheckOp + switch op { + // Skip the pre-apply checks if this is a GET. + case api.CheckGet: + return false, nil + case api.CheckSet, api.CheckCAS, api.CheckDelete, api.CheckDeleteCAS: + return true, nil + default: + return false, fmt.Errorf("unknown check operation: %s", op) + } +} + +// intentionVerbValidate checks for a known operation type. +func intentionVerbValidate(op structs.IntentionOp) error { + // enumcover: structs.IntentionOp + switch op { + case structs.IntentionOpCreate, structs.IntentionOpDelete, structs.IntentionOpUpdate, structs.IntentionOpDeleteAll, structs.IntentionOpUpsert: + return nil + default: + return fmt.Errorf("unknown intention operation: %s", op) + } +} + +// sessionVerbValidate checks for a known operation type. +func sessionVerbValidate(op api.SessionOp) error { + // enumcover: api.SessionOp + switch op { + case api.SessionDelete: + return nil + default: + return fmt.Errorf("unknown session operation: %s", op) + } +} diff --git a/agent/consul/txn_endpoint_test.go b/agent/consul/txn_endpoint_test.go index ef2ecd13a3f8..03bccf95ff5e 100644 --- a/agent/consul/txn_endpoint_test.go +++ b/agent/consul/txn_endpoint_test.go @@ -946,3 +946,128 @@ func TestTxn_Read_ACLDeny(t *testing.T) { require.Empty(t, out.Results) }) } + +// TestTxn_Validation works across RW and RO Txn endpoints validating the "preCheck()" operation consistently +// validates operations provided in the request. +func TestTxn_Validation(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + t.Parallel() + + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + codec := rpcClient(t, s1) + defer codec.Close() + + testrpc.WaitForLeader(t, s1.RPC, "dc1") + + // Each one of these test cases should error as invalid. + testCases := []struct { + request structs.TxnReadRequest + expectedError string + }{ + { + request: structs.TxnReadRequest{ + Datacenter: "dc1", + Ops: structs.TxnOps{ + &structs.TxnOp{ + KV: &structs.TxnKVOp{ + Verb: "tick", + DirEnt: structs.DirEntry{ + Key: "nope", + }, + }, + }, + }, + }, + expectedError: "unknown KV operation", + }, + { + request: structs.TxnReadRequest{ + Datacenter: "dc1", + Ops: structs.TxnOps{ + &structs.TxnOp{ + Node: &structs.TxnNodeOp{ + Verb: "tick", + }, + }, + }, + }, + expectedError: "unknown node operation", + }, + { + request: structs.TxnReadRequest{ + Datacenter: "dc1", + Ops: structs.TxnOps{ + &structs.TxnOp{ + Service: &structs.TxnServiceOp{ + Verb: "tick", + }, + }, + }, + }, + expectedError: "unknown service operation", + }, + { + request: structs.TxnReadRequest{ + Datacenter: "dc1", + Ops: structs.TxnOps{ + &structs.TxnOp{ + Check: &structs.TxnCheckOp{ + Verb: "tick", + }, + }, + }, + }, + expectedError: "unknown check operation", + }, + { + request: structs.TxnReadRequest{ + Datacenter: "dc1", + Ops: structs.TxnOps{ + &structs.TxnOp{ + Session: &structs.TxnSessionOp{ + Verb: "tick", + }, + }, + }, + }, + expectedError: "unknown session operation", + }, + { + request: structs.TxnReadRequest{ + Datacenter: "dc1", + Ops: structs.TxnOps{ + &structs.TxnOp{ + Intention: &structs.TxnIntentionOp{ // nolint:staticcheck // SA1019 intentional use of deprecated field + Op: "BOOM!", + }, + }, + }, + }, + expectedError: "unknown intention operation", + }, + { + request: structs.TxnReadRequest{ + Datacenter: "dc1", + Ops: structs.TxnOps{ + &structs.TxnOp{ + // Intentionally Empty + }, + }, + }, + expectedError: "unknown operation type", + }, + } + + for _, tc := range testCases { + var out structs.TxnReadResponse + err := msgpackrpc.CallWithCodec(codec, "Txn.Read", &tc.request, &out) + require.NoError(t, err) + require.Greater(t, len(out.Errors), 0) + require.Contains(t, out.Errors[0].Error(), tc.expectedError) + } +}