Skip to content

Commit

Permalink
Conn timeout as flag in topicctl (#200)
Browse files Browse the repository at this point in the history
* Conn timeout as flag in topicctl

* Fixing test cases topicctl
  • Loading branch information
ssingudasu authored Jun 13, 2024
1 parent aedd232 commit 1fd5669
Show file tree
Hide file tree
Showing 8 changed files with 31 additions and 13 deletions.
18 changes: 14 additions & 4 deletions cmd/topicctl/subcmd/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"os"
"time"

"github.com/aws/aws-sdk-go/aws/session"
"github.com/hashicorp/go-multierror"
Expand All @@ -29,6 +30,7 @@ type sharedOptions struct {
tlsServerName string
zkAddr string
zkPrefix string
connTimeout time.Duration
}

func (s sharedOptions) validate() error {
Expand Down Expand Up @@ -164,6 +166,7 @@ func (s sharedOptions) getAdminClient(
Username: s.saslUsername,
SecretsManagerArn: s.saslSecretsManagerArn,
},
ConnTimeout: s.connTimeout,
},
ReadOnly: readOnly,
},
Expand All @@ -172,10 +175,11 @@ func (s sharedOptions) getAdminClient(
return admin.NewZKAdminClient(
ctx,
admin.ZKAdminClientConfig{
ZKAddrs: []string{s.zkAddr},
ZKPrefix: s.zkPrefix,
Sess: sess,
ReadOnly: readOnly,
ZKAddrs: []string{s.zkAddr},
ZKPrefix: s.zkPrefix,
Sess: sess,
ReadOnly: readOnly,
KafkaConnTimeout: s.connTimeout,
},
)
}
Expand Down Expand Up @@ -275,6 +279,12 @@ func addSharedFlags(cmd *cobra.Command, options *sharedOptions) {
"",
"Prefix for cluster-related nodes in zk",
)
cmd.PersistentFlags().DurationVar(
&options.connTimeout,
"conn-timeout",
10*time.Second,
"Kafka connection timeout",
)
}

func addSharedConfigOnlyFlags(cmd *cobra.Command, options *sharedOptions) {
Expand Down
7 changes: 4 additions & 3 deletions pkg/admin/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,10 @@ const (

// ConnectorConfig contains the configuration used to contruct a connector.
type ConnectorConfig struct {
BrokerAddr string
TLS TLSConfig
SASL SASLConfig
BrokerAddr string
TLS TLSConfig
SASL SASLConfig
ConnTimeout time.Duration
}

// TLSConfig stores the TLS-related configuration for a connection.
Expand Down
4 changes: 3 additions & 1 deletion pkg/admin/zkclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type ZKAdminClientConfig struct {
ExpectedClusterID string
Sess *session.Session
ReadOnly bool
KafkaConnTimeout time.Duration
}

// NewZKAdminClient creates and returns a new Client instance.
Expand Down Expand Up @@ -136,7 +137,8 @@ func NewZKAdminClient(
client.bootstrapAddrs = bootstrapAddrs
client.Connector, err = NewConnector(
ConnectorConfig{
BrokerAddr: bootstrapAddrs[0],
BrokerAddr: bootstrapAddrs[0],
ConnTimeout: config.KafkaConnTimeout,
},
)

Expand Down
6 changes: 6 additions & 0 deletions pkg/groups/groups_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ func TestGetGroups(t *testing.T) {
ctx := context.Background()
connector, err := admin.NewConnector(admin.ConnectorConfig{
BrokerAddr: util.TestKafkaAddr(),
ConnTimeout: 10 * time.Second,
})
require.NoError(t, err)

Expand Down Expand Up @@ -83,6 +84,7 @@ func TestGetGroupsMultiMember(t *testing.T) {
ctx := context.Background()
connector, err := admin.NewConnector(admin.ConnectorConfig{
BrokerAddr: util.TestKafkaAddr(),
ConnTimeout: 10 * time.Second,
})
require.NoError(t, err)

Expand Down Expand Up @@ -164,6 +166,7 @@ func TestGetGroupsMultiMemberMultiTopic(t *testing.T) {
ctx := context.Background()
connector, err := admin.NewConnector(admin.ConnectorConfig{
BrokerAddr: util.TestKafkaAddr(),
ConnTimeout: 10 * time.Second,
})
require.NoError(t, err)

Expand Down Expand Up @@ -260,6 +263,7 @@ func TestGetLags(t *testing.T) {
ctx := context.Background()
connector, err := admin.NewConnector(admin.ConnectorConfig{
BrokerAddr: util.TestKafkaAddr(),
ConnTimeout: 10 * time.Second,
})
require.NoError(t, err)

Expand Down Expand Up @@ -303,6 +307,7 @@ func TestGetEarliestOrLatestOffset(t *testing.T) {
ctx := context.Background()
connector, err := admin.NewConnector(admin.ConnectorConfig{
BrokerAddr: util.TestKafkaAddr(),
ConnTimeout: 10 * time.Second,
})
require.NoError(t, err)

Expand Down Expand Up @@ -350,6 +355,7 @@ func TestResetOffsets(t *testing.T) {
ctx := context.Background()
connector, err := admin.NewConnector(admin.ConnectorConfig{
BrokerAddr: util.TestKafkaAddr(),
ConnTimeout: 10 * time.Second,
})
require.NoError(t, err)

Expand Down
5 changes: 1 addition & 4 deletions pkg/messages/bounds.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,6 @@ const (
// Parameters for backoff when there are connection errors
maxRetries = 4
backoffInitSleepDuration = 200 * time.Millisecond

// Connection timeout
connTimeout = 10 * time.Second
)

// Bounds represents the start and end "bounds" of the messages in
Expand Down Expand Up @@ -284,6 +281,6 @@ func dialLeaderRetries(
return nil, fmt.Errorf("Error dialing partition %d: %+v", partition, err)
}

conn.SetDeadline(time.Now().Add(connTimeout))
conn.SetDeadline(time.Now().Add(connector.Config.ConnTimeout))
return conn, nil
}
1 change: 1 addition & 0 deletions pkg/messages/bounds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ func TestGetAllPartitionBounds(t *testing.T) {
ctx := context.Background()
connector, err := admin.NewConnector(admin.ConnectorConfig{
BrokerAddr: util.TestKafkaAddr(),
ConnTimeout: 10 * time.Second,
})
require.NoError(t, err)

Expand Down
1 change: 1 addition & 0 deletions pkg/messages/tail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ func TestTailerGetMessages(t *testing.T) {

connector, err := admin.NewConnector(admin.ConnectorConfig{
BrokerAddr: util.TestKafkaAddr(),
ConnTimeout: 10 * time.Second,
})
require.NoError(t, err)

Expand Down
2 changes: 1 addition & 1 deletion pkg/version/version.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package version

// Version is the current topicctl version.
const Version = "1.17.0"
const Version = "1.18.0"

0 comments on commit 1fd5669

Please sign in to comment.