Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added ent to ce downgrade changes #19311

Merged
merged 3 commits into from
Oct 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/19311.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
raft: Fix panic during downgrade from enterprise to oss.
```
89 changes: 68 additions & 21 deletions agent/consul/fsm/commands_ce.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package fsm

import (
"errors"
"fmt"
"time"

Expand Down Expand Up @@ -152,7 +153,11 @@ func init() {
func (c *FSM) applyRegister(buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"fsm", "register"}, time.Now())
var req structs.RegisterRequest
if err := structs.Decode(buf, &req); err != nil {
if err := decodeRegistrationReq(buf, &req); err != nil {
if errors.Is(err, ErrDroppingTenantedReq) {
c.logger.Warn("dropping tenanted register request")
return nil
}
panic(fmt.Errorf("failed to decode request: %v", err))
}

Expand All @@ -167,7 +172,11 @@ func (c *FSM) applyRegister(buf []byte, index uint64) interface{} {
func (c *FSM) applyDeregister(buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"fsm", "deregister"}, time.Now())
var req structs.DeregisterRequest
if err := structs.Decode(buf, &req); err != nil {
if err := decodeDeregistrationReq(buf, &req); err != nil {
if errors.Is(err, ErrDroppingTenantedReq) {
c.logger.Warn("dropping tenanted deregister request")
return nil
}
panic(fmt.Errorf("failed to decode request: %v", err))
}

Expand Down Expand Up @@ -195,7 +204,11 @@ func (c *FSM) applyDeregister(buf []byte, index uint64) interface{} {

func (c *FSM) applyKVSOperation(buf []byte, index uint64) interface{} {
var req structs.KVSRequest
if err := structs.Decode(buf, &req); err != nil {
if err := decodeKVSRequest(buf, &req); err != nil {
if errors.Is(err, ErrDroppingTenantedReq) {
c.logger.Warn("dropping tenanted KV request")
return nil
}
panic(fmt.Errorf("failed to decode request: %v", err))
}
defer metrics.MeasureSinceWithLabels([]string{"fsm", "kvs"}, time.Now(),
Expand Down Expand Up @@ -240,7 +253,11 @@ func (c *FSM) applyKVSOperation(buf []byte, index uint64) interface{} {

func (c *FSM) applySessionOperation(buf []byte, index uint64) interface{} {
var req structs.SessionRequest
if err := structs.Decode(buf, &req); err != nil {
if err := decodeSessionRequest(buf, &req); err != nil {
if errors.Is(err, ErrDroppingTenantedReq) {
c.logger.Warn("dropping tenanted session request")
return nil
}
panic(fmt.Errorf("failed to decode request: %v", err))
}
defer metrics.MeasureSinceWithLabels([]string{"fsm", "session"}, time.Now(),
Expand Down Expand Up @@ -299,7 +316,11 @@ func (c *FSM) applyCoordinateBatchUpdate(buf []byte, index uint64) interface{} {
// state store.
func (c *FSM) applyPreparedQueryOperation(buf []byte, index uint64) interface{} {
var req structs.PreparedQueryRequest
if err := structs.Decode(buf, &req); err != nil {
if err := decodePreparedQueryRequest(buf, &req); err != nil {
if errors.Is(err, ErrDroppingTenantedReq) {
c.logger.Warn("dropping tenanted prepared query request")
return nil
}
panic(fmt.Errorf("failed to decode request: %v", err))
}

Expand All @@ -318,7 +339,7 @@ func (c *FSM) applyPreparedQueryOperation(buf []byte, index uint64) interface{}

func (c *FSM) applyTxn(buf []byte, index uint64) interface{} {
var req structs.TxnRequest
if err := structs.Decode(buf, &req); err != nil {
if err := decodeTxnRequest(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
defer metrics.MeasureSince([]string{"fsm", "txn"}, time.Now())
Expand Down Expand Up @@ -485,7 +506,7 @@ func (c *FSM) applyConnectCALeafOperation(buf []byte, index uint64) interface{}

func (c *FSM) applyACLTokenSetOperation(buf []byte, index uint64) interface{} {
var req structs.ACLTokenBatchSetRequest
if err := structs.Decode(buf, &req); err != nil {
if err := decodeACLTokenBatchSetRequest(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
defer metrics.MeasureSinceWithLabels([]string{"fsm", "acl", "token"}, time.Now(),
Expand Down Expand Up @@ -523,7 +544,7 @@ func (c *FSM) applyACLTokenBootstrap(buf []byte, index uint64) interface{} {

func (c *FSM) applyACLPolicySetOperation(buf []byte, index uint64) interface{} {
var req structs.ACLPolicyBatchSetRequest
if err := structs.Decode(buf, &req); err != nil {
if err := decodeACLPolicyBatchSetRequest(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
defer metrics.MeasureSinceWithLabels([]string{"fsm", "acl", "policy"}, time.Now(),
Expand All @@ -544,10 +565,12 @@ func (c *FSM) applyACLPolicyDeleteOperation(buf []byte, index uint64) interface{
}

func (c *FSM) applyConfigEntryOperation(buf []byte, index uint64) interface{} {
req := structs.ConfigEntryRequest{
Entry: &structs.ProxyConfigEntry{},
}
if err := structs.Decode(buf, &req); err != nil {
req := structs.ConfigEntryRequest{}
if err := decodeConfigEntryOperationRequest(buf, &req); err != nil {
if errors.Is(err, ErrDroppingTenantedReq) {
c.logger.Warn("dropping tenanted config entry request")
return nil
}
panic(fmt.Errorf("failed to decode request: %v", err))
}

Expand Down Expand Up @@ -594,7 +617,7 @@ func (c *FSM) applyConfigEntryOperation(buf []byte, index uint64) interface{} {

func (c *FSM) applyACLRoleSetOperation(buf []byte, index uint64) interface{} {
var req structs.ACLRoleBatchSetRequest
if err := structs.Decode(buf, &req); err != nil {
if err := decodeACLRoleBatchSetRequest(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
defer metrics.MeasureSinceWithLabels([]string{"fsm", "acl", "role"}, time.Now(),
Expand All @@ -616,7 +639,7 @@ func (c *FSM) applyACLRoleDeleteOperation(buf []byte, index uint64) interface{}

func (c *FSM) applyACLBindingRuleSetOperation(buf []byte, index uint64) interface{} {
var req structs.ACLBindingRuleBatchSetRequest
if err := structs.Decode(buf, &req); err != nil {
if err := decodeACLBindingRuleBatchSetRequest(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
defer metrics.MeasureSinceWithLabels([]string{"fsm", "acl", "bindingrule"}, time.Now(),
Expand All @@ -638,7 +661,7 @@ func (c *FSM) applyACLBindingRuleDeleteOperation(buf []byte, index uint64) inter

func (c *FSM) applyACLAuthMethodSetOperation(buf []byte, index uint64) interface{} {
var req structs.ACLAuthMethodBatchSetRequest
if err := structs.Decode(buf, &req); err != nil {
if err := decodeACLAuthMethodBatchSetRequest(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
defer metrics.MeasureSinceWithLabels([]string{"fsm", "acl", "authmethod"}, time.Now(),
Expand All @@ -649,7 +672,11 @@ func (c *FSM) applyACLAuthMethodSetOperation(buf []byte, index uint64) interface

func (c *FSM) applyACLAuthMethodDeleteOperation(buf []byte, index uint64) interface{} {
var req structs.ACLAuthMethodBatchDeleteRequest
if err := structs.Decode(buf, &req); err != nil {
if err := decodeACLAuthMethodBatchDeleteRequest(buf, &req); err != nil {
if errors.Is(err, ErrDroppingTenantedReq) {
c.logger.Warn("dropping tenanted acl auth method delete request")
return nil
}
panic(fmt.Errorf("failed to decode request: %v", err))
}
defer metrics.MeasureSinceWithLabels([]string{"fsm", "acl", "authmethod"}, time.Now(),
Expand Down Expand Up @@ -706,7 +733,11 @@ func (c *FSM) applySystemMetadataOperation(buf []byte, index uint64) interface{}

func (c *FSM) applyPeeringWrite(buf []byte, index uint64) interface{} {
var req pbpeering.PeeringWriteRequest
if err := structs.DecodeProto(buf, &req); err != nil {
if err := decodePeeringWriteRequest(buf, &req); err != nil {
if errors.Is(err, ErrDroppingTenantedReq) {
c.logger.Warn("dropping tenanted peering write request")
return nil
}
panic(fmt.Errorf("failed to decode peering write request: %v", err))
}

Expand All @@ -718,7 +749,11 @@ func (c *FSM) applyPeeringWrite(buf []byte, index uint64) interface{} {

func (c *FSM) applyPeeringDelete(buf []byte, index uint64) interface{} {
var req pbpeering.PeeringDeleteRequest
if err := structs.DecodeProto(buf, &req); err != nil {
if err := decodePeeringDeleteRequest(buf, &req); err != nil {
if errors.Is(err, ErrDroppingTenantedReq) {
c.logger.Warn("dropping tenanted peering delete request")
return nil
}
panic(fmt.Errorf("failed to decode peering delete request: %v", err))
}

Expand Down Expand Up @@ -758,7 +793,11 @@ func (c *FSM) applyPeeringTerminate(buf []byte, index uint64) interface{} {

func (c *FSM) applyPeeringTrustBundleWrite(buf []byte, index uint64) interface{} {
var req pbpeering.PeeringTrustBundleWriteRequest
if err := structs.DecodeProto(buf, &req); err != nil {
if err := decodePeeringTrustBundleWriteRequest(buf, &req); err != nil {
if errors.Is(err, ErrDroppingTenantedReq) {
c.logger.Warn("dropping tenanted peering trust bundle write request")
return nil
}
panic(fmt.Errorf("failed to decode peering trust bundle write request: %v", err))
}

Expand All @@ -770,7 +809,11 @@ func (c *FSM) applyPeeringTrustBundleWrite(buf []byte, index uint64) interface{}

func (c *FSM) applyPeeringTrustBundleDelete(buf []byte, index uint64) interface{} {
var req pbpeering.PeeringTrustBundleDeleteRequest
if err := structs.DecodeProto(buf, &req); err != nil {
if err := decodePeeringTrustBundleDeleteRequest(buf, &req); err != nil {
if errors.Is(err, ErrDroppingTenantedReq) {
c.logger.Warn("dropping tenanted peering trust bundle delete request")
return nil
}
panic(fmt.Errorf("failed to decode peering trust bundle delete request: %v", err))
}

Expand All @@ -790,7 +833,11 @@ func (f *FSM) applyResourceOperation(buf []byte, idx uint64) any {

func (c *FSM) applyManualVirtualIPs(buf []byte, index uint64) interface{} {
var req state.ServiceVirtualIP
if err := structs.Decode(buf, &req); err != nil {
if err := decodeServiceVirtualIPRequest(buf, &req); err != nil {
if errors.Is(err, ErrDroppingTenantedReq) {
c.logger.Warn("dropping tenanted virtual ip request")
return nil
}
panic(fmt.Errorf("failed to decode request: %v", err))
}

Expand Down
145 changes: 145 additions & 0 deletions agent/consul/fsm/decode_ce.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1

//go:build !consulent
// +build !consulent

package fsm

import (
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/proto/private/pbpeering"
)

func decodeRegistrationReq(buf []byte, req *structs.RegisterRequest) error {
if !structs.CEDowngrade {
return structs.Decode(buf, req)
}
return decodeRegistration(buf, req)
}

func decodeDeregistrationReq(buf []byte, req *structs.DeregisterRequest) error {
if !structs.CEDowngrade {
return structs.Decode(buf, req)
}
return decodeDeregistration(buf, req)
}

func decodeKVSRequest(buf []byte, req *structs.KVSRequest) error {
if !structs.CEDowngrade {
return structs.Decode(buf, req)
}
return decodeKVS(buf, req)
}

func decodeSessionRequest(buf []byte, req *structs.SessionRequest) error {
if !structs.CEDowngrade {
return structs.Decode(buf, req)
}

return decodeSession(buf, req)
}

func decodePreparedQueryRequest(buf []byte, req *structs.PreparedQueryRequest) error {
if !structs.CEDowngrade {
return structs.Decode(buf, req)
}
return decodePreparedQuery(buf, req)
}

func decodeTxnRequest(buf []byte, req *structs.TxnRequest) error {
if !structs.CEDowngrade {
return structs.Decode(buf, req)
}
return decodeTxn(buf, req)
}

func decodeACLTokenBatchSetRequest(buf []byte, req *structs.ACLTokenBatchSetRequest) error {
if !structs.CEDowngrade {
return structs.Decode(buf, req)
}
return decodeACLTokenBatchSet(buf, req)

}

func decodeACLPolicyBatchSetRequest(buf []byte, req *structs.ACLPolicyBatchSetRequest) error {
if !structs.CEDowngrade {
return structs.Decode(buf, req)
}
return decodeACLPolicyBatchSet(buf, req)

}

func decodeACLRoleBatchSetRequest(buf []byte, req *structs.ACLRoleBatchSetRequest) error {
if !structs.CEDowngrade {
return structs.Decode(buf, req)
}
return decodeACLRoleBatchSet(buf, req)
}

func decodeACLBindingRuleBatchSetRequest(buf []byte, req *structs.ACLBindingRuleBatchSetRequest) error {
if !structs.CEDowngrade {
return structs.Decode(buf, req)
}
return decodeACLBindingRuleBatchSet(buf, req)
}

func decodeACLAuthMethodBatchSetRequest(buf []byte, req *structs.ACLAuthMethodBatchSetRequest) error {
if !structs.CEDowngrade {
return structs.Decode(buf, req)
}
return decodeACLAuthMethodBatchSet(buf, req)
}

func decodeACLAuthMethodBatchDeleteRequest(buf []byte, req *structs.ACLAuthMethodBatchDeleteRequest) error {
if !structs.CEDowngrade {
return structs.Decode(buf, req)
}

return decodeACLAuthMethodBatchDelete(buf, req)
}

func decodeServiceVirtualIPRequest(buf []byte, req *state.ServiceVirtualIP) error {
if !structs.CEDowngrade {
return structs.Decode(buf, req)
}
return decodeServiceVirtualIP(buf, req)
}

func decodePeeringWriteRequest(buf []byte, req *pbpeering.PeeringWriteRequest) error {
if !structs.CEDowngrade {
return structs.DecodeProto(buf, req)
}
return decodePeeringWrite(buf, req)
}

func decodePeeringDeleteRequest(buf []byte, req *pbpeering.PeeringDeleteRequest) error {
if !structs.CEDowngrade {
return structs.DecodeProto(buf, req)
}

return decodePeeringDelete(buf, req)
}

func decodePeeringTrustBundleWriteRequest(buf []byte, req *pbpeering.PeeringTrustBundleWriteRequest) error {
if !structs.CEDowngrade {
return structs.DecodeProto(buf, req)
}
return decodePeeringTrustBundleWrite(buf, req)
}

func decodePeeringTrustBundleDeleteRequest(buf []byte, req *pbpeering.PeeringTrustBundleDeleteRequest) error {
if !structs.CEDowngrade {
return structs.DecodeProto(buf, req)
}
return decodePeeringTrustBundleDelete(buf, req)
}

func decodeConfigEntryOperationRequest(buf []byte, req *structs.ConfigEntryRequest) error {
if !structs.CEDowngrade {
return structs.Decode(buf, req)
}

return decodeConfigEntryOperation(buf, req)
}
Loading