diff --git a/src/go/rpk/pkg/config/params.go b/src/go/rpk/pkg/config/params.go index 81228a8e212d..843e604c206f 100644 --- a/src/go/rpk/pkg/config/params.go +++ b/src/go/rpk/pkg/config/params.go @@ -1595,6 +1595,7 @@ func (c *Config) fixSchemePorts() error { if port == "" { port = strconv.Itoa(DefaultKafkaPort) } + host = normalizeHost(host) c.redpandaYaml.Rpk.KafkaAPI.Brokers[i] = net.JoinHostPort(host, port) } for i, a := range c.redpandaYaml.Rpk.AdminAPI.Addresses { @@ -1602,6 +1603,7 @@ func (c *Config) fixSchemePorts() error { if err != nil { return fmt.Errorf("unable to fix admin address %v: %w", a, err) } + host = normalizeHost(host) switch scheme { case "": if port == "" { @@ -1614,54 +1616,87 @@ func (c *Config) fixSchemePorts() error { return fmt.Errorf("unable to fix admin address %v: unsupported scheme %q", a, scheme) } } - p := c.rpkYaml.Profile(c.rpkYaml.CurrentProfile) - for i, k := range p.KafkaAPI.Brokers { - _, host, port, err := rpknet.SplitSchemeHostPort(k) - if err != nil { - return fmt.Errorf("unable to fix broker address %v: %w", k, err) - } - if port == "" { - port = strconv.Itoa(DefaultKafkaPort) - } - p.KafkaAPI.Brokers[i] = net.JoinHostPort(host, port) - } - for i, a := range p.AdminAPI.Addresses { + for i, a := range c.redpandaYaml.Rpk.SR.Addresses { scheme, host, port, err := rpknet.SplitSchemeHostPort(a) if err != nil { - return fmt.Errorf("unable to fix admin address %v: %w", a, err) + return fmt.Errorf("unable to fix schema registry address %v: %w", a, err) } + host = normalizeHost(host) switch scheme { case "": if port == "" { - port = strconv.Itoa(DefaultAdminPort) + port = strconv.Itoa(DefaultSchemaRegPort) } - p.AdminAPI.Addresses[i] = net.JoinHostPort(host, port) + c.redpandaYaml.Rpk.SR.Addresses[i] = net.JoinHostPort(host, port) case "http", "https": continue // keep whatever port exists; empty ports will default to 80 or 443 default: - return fmt.Errorf("unable to fix admin address %v: unsupported scheme %q", a, scheme) + return fmt.Errorf("unable to fix schema registry address %v: unsupported scheme %q", a, scheme) } } - for i, a := range p.SR.Addresses { - scheme, host, port, err := rpknet.SplitSchemeHostPort(a) - if err != nil { - return fmt.Errorf("unable to fix schema registry address %v: %w", a, err) - } - switch scheme { - case "": + + p := c.rpkYaml.Profile(c.rpkYaml.CurrentProfile) + if p != nil { + for i, k := range p.KafkaAPI.Brokers { + _, host, port, err := rpknet.SplitSchemeHostPort(k) + if err != nil { + return fmt.Errorf("unable to fix broker address %v: %w", k, err) + } + host = normalizeHost(host) if port == "" { - port = strconv.Itoa(DefaultSchemaRegPort) + port = strconv.Itoa(DefaultKafkaPort) + } + p.KafkaAPI.Brokers[i] = net.JoinHostPort(host, port) + } + for i, a := range p.AdminAPI.Addresses { + scheme, host, port, err := rpknet.SplitSchemeHostPort(a) + if err != nil { + return fmt.Errorf("unable to fix admin address %v: %w", a, err) + } + host = normalizeHost(host) + switch scheme { + case "": + if port == "" { + port = strconv.Itoa(DefaultAdminPort) + } + p.AdminAPI.Addresses[i] = net.JoinHostPort(host, port) + case "http", "https": + continue // keep whatever port exists; empty ports will default to 80 or 443 + default: + return fmt.Errorf("unable to fix admin address %v: unsupported scheme %q", a, scheme) + } + } + for i, a := range p.SR.Addresses { + scheme, host, port, err := rpknet.SplitSchemeHostPort(a) + if err != nil { + return fmt.Errorf("unable to fix schema registry address %v: %w", a, err) + } + host = normalizeHost(host) + switch scheme { + case "": + if port == "" { + port = strconv.Itoa(DefaultSchemaRegPort) + } + p.SR.Addresses[i] = net.JoinHostPort(host, port) + case "http", "https": + continue // keep whatever port exists; empty ports will default to 80 or 443 + default: + return fmt.Errorf("unable to fix schema registry address %v: unsupported scheme %q", a, scheme) } - p.SR.Addresses[i] = net.JoinHostPort(host, port) - case "http", "https": - continue // keep whatever port exists; empty ports will default to 80 or 443 - default: - return fmt.Errorf("unable to fix schema registry address %v: unsupported scheme %q", a, scheme) } } return nil } +// normalizeHost remove surrounding brackets if present for ipv6, +// net.JoinHostPort will add them back. +func normalizeHost(host string) string { + if strings.HasPrefix(host, "[") && strings.HasSuffix(host, "]") { + return host[1 : len(host)-1] + } + return host +} + func (c *Config) addConfigToProfiles() { for i := range c.rpkYaml.Profiles { c.rpkYaml.Profiles[i].c = c diff --git a/src/go/rpk/pkg/config/params_test.go b/src/go/rpk/pkg/config/params_test.go index 216aa22d3fb9..c9f1d179d889 100644 --- a/src/go/rpk/pkg/config/params_test.go +++ b/src/go/rpk/pkg/config/params_test.go @@ -1242,3 +1242,180 @@ func TestXSetDefaultsPaths(t *testing.T) { } } } + +func TestConfig_fixSchemePorts(t *testing.T) { + tests := []struct { + name string + config Config + expect Config + errMsg string + }{ + { + name: "fix missing ports in redpanda.yaml addresses", + config: Config{ + redpandaYaml: RedpandaYaml{ + Rpk: RpkNodeConfig{ + KafkaAPI: RpkKafkaAPI{ + Brokers: []string{"broker1", "broker2:9093"}, + }, + AdminAPI: RpkAdminAPI{ + Addresses: []string{"address1", "address2:2222"}, + }, + SR: RpkSchemaRegistryAPI{ + Addresses: []string{"address1", "address2", "address3:8088"}, + }, + }, + }, + }, + expect: Config{ + redpandaYaml: RedpandaYaml{ + Rpk: RpkNodeConfig{ + KafkaAPI: RpkKafkaAPI{ + Brokers: []string{"broker1:9092", "broker2:9093"}, + }, + AdminAPI: RpkAdminAPI{ + Addresses: []string{"address1:9644", "address2:2222"}, + }, + SR: RpkSchemaRegistryAPI{ + Addresses: []string{"address1:8081", "address2:8081", "address3:8088"}, + }, + }, + }, + }, + }, + { + name: "fix missing ports in profile brokers", + config: Config{ + rpkYaml: RpkYaml{ + CurrentProfile: "default", + Profiles: []RpkProfile{ + { + Name: "default", + KafkaAPI: RpkKafkaAPI{ + Brokers: []string{"profile-broker1", "profile-broker2:9094"}, + }, + AdminAPI: RpkAdminAPI{ + Addresses: []string{"address1", "address2:2222"}, + }, + SR: RpkSchemaRegistryAPI{ + Addresses: []string{"address1", "address2", "address3:8088"}, + }, + }, + }, + }, + }, + expect: Config{ + rpkYaml: RpkYaml{ + CurrentProfile: "default", + Profiles: []RpkProfile{ + { + Name: "default", + KafkaAPI: RpkKafkaAPI{ + Brokers: []string{"profile-broker1:9092", "profile-broker2:9094"}, + }, + AdminAPI: RpkAdminAPI{ + Addresses: []string{"address1:9644", "address2:2222"}, + }, + SR: RpkSchemaRegistryAPI{ + Addresses: []string{"address1:8081", "address2:8081", "address3:8088"}, + }, + }, + }, + }, + }, + }, + { + name: "fix missing ports in redpanda.yaml addresses", + config: Config{ + redpandaYaml: RedpandaYaml{ + Rpk: RpkNodeConfig{ + KafkaAPI: RpkKafkaAPI{ + Brokers: []string{"broker1", "broker2:9093"}, + }, + AdminAPI: RpkAdminAPI{ + Addresses: []string{"address1", "address2:2222"}, + }, + SR: RpkSchemaRegistryAPI{ + Addresses: []string{"address1", "address2", "address3:8088"}, + }, + }, + }, + }, + expect: Config{ + redpandaYaml: RedpandaYaml{ + Rpk: RpkNodeConfig{ + KafkaAPI: RpkKafkaAPI{ + Brokers: []string{"broker1:9092", "broker2:9093"}, + }, + AdminAPI: RpkAdminAPI{ + Addresses: []string{"address1:9644", "address2:2222"}, + }, + SR: RpkSchemaRegistryAPI{ + Addresses: []string{"address1:8081", "address2:8081", "address3:8088"}, + }, + }, + }, + }, + }, + { + name: "fix missing ports in redpanda.yaml addresses with IPv6", + config: Config{ + redpandaYaml: RedpandaYaml{ + Rpk: RpkNodeConfig{ + KafkaAPI: RpkKafkaAPI{ + Brokers: []string{"[2001:db8::1]", "[2001:db8::2]:9093"}, + }, + AdminAPI: RpkAdminAPI{ + Addresses: []string{"[2001:db8::3]", "[2001:db8::4]:2222"}, + }, + SR: RpkSchemaRegistryAPI{ + Addresses: []string{"[2001:db8::5]", "[2001:db8::6]", "[2001:db8::7]:8088"}, + }, + }, + }, + }, + expect: Config{ + redpandaYaml: RedpandaYaml{ + Rpk: RpkNodeConfig{ + KafkaAPI: RpkKafkaAPI{ + Brokers: []string{"[2001:db8::1]:9092", "[2001:db8::2]:9093"}, + }, + AdminAPI: RpkAdminAPI{ + Addresses: []string{"[2001:db8::3]:9644", "[2001:db8::4]:2222"}, + }, + SR: RpkSchemaRegistryAPI{ + Addresses: []string{"[2001:db8::5]:8081", "[2001:db8::6]:8081", "[2001:db8::7]:8088"}, + }, + }, + }, + }, + }, + { + name: "invalid broker address", + config: Config{ + redpandaYaml: RedpandaYaml{ + Rpk: RpkNodeConfig{ + KafkaAPI: RpkKafkaAPI{ + Brokers: []string{":invalid"}, + }, + }, + }, + }, + expect: Config{}, + errMsg: "unable to fix broker address", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := tt.config.fixSchemePorts() + if tt.errMsg != "" { + require.Error(t, err) + require.Contains(t, err.Error(), tt.errMsg) + } else { + require.NoError(t, err) + require.Equal(t, tt.expect, tt.config) + } + }) + } +}