From 11dd16870c27345171a8b6b3748c6e364123671c Mon Sep 17 00:00:00 2001 From: mahajanadhitya Date: Fri, 26 May 2023 15:38:06 +0530 Subject: [PATCH 01/13] Changes --- kafka/adminapi.go | 206 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 206 insertions(+) diff --git a/kafka/adminapi.go b/kafka/adminapi.go index f7d7b8ac2..735bfac27 100644 --- a/kafka/adminapi.go +++ b/kafka/adminapi.go @@ -2346,6 +2346,212 @@ func (a *AdminClient) AlterConsumerGroupOffsets( return acgor, nil } +type ScramMechanism int + +const ( + // ResourceUnknown - Unknown + Scram_Unknown = ResourceType(C.RD_KAFKA_SCRAM_MECHANISM_UNKNOWN) + // ResourceAny - match any resource type (DescribeConfigs) + Scram_SHA_256 = ResourceType(C.RD_KAFKA_SCRAM_MECHANISM_SHA_256) + // ResourceTopic - Topic + Scram_SHA_512 = ResourceType(C.RD_KAFKA_SCRAM_MECHANISM_SHA_512) +) + +type ScramCredentialInfo struct { + iterations int + mechanism ScramMechanism +} +type UserScramCredentialsDescription struct { + user string + scram_credential_infos []ScramCredentialInfo + err error +} + +func (a *AdminClient) DescribeUserScramCredentials( + ctx context.Context, users []string, + options ...AlterConsumerGroupOffsetsAdminOption) (result map[string]UserScramCredentialsDescription, err error) { + + result = make(map[string]UserScramCredentialsDescription) + err = a.verifyClient() + if err != nil { + return result, err + } + + // Convert user names into char** required by the implementation. + cUserList := make([]*C.char, len(users)) + cUserCount := C.size_t(len(users)) + + for idx, user := range users { + cUserList[idx] = C.CString(user) + defer C.free(unsafe.Pointer(cUserList[idx])) + } + + var cUserListPtr **C.char + if cUserCount > 0 { + cUserListPtr = ((**C.char)(&cUserList[0])) + } + + // Convert Go AdminOptions (if any) to C AdminOptions. + genericOptions := make([]AdminOption, len(options)) + for i := range options { + genericOptions[i] = options[i] + } + cOptions, err := adminOptionsSetup( + a.handle, C.RD_KAFKA_ADMIN_OP_DESCRIBEUSERSCRAMCREDENTIALS, genericOptions) + if err != nil { + return result, err + } + defer C.rd_kafka_AdminOptions_destroy(cOptions) + + // Create temporary queue for async operation. + cQueue := C.rd_kafka_queue_new(a.handle.rk) + defer C.rd_kafka_queue_destroy(cQueue) + + // Call rd_kafka_DescribeConsumerGroups (asynchronous). + C.rd_kafka_DescribeConsumerGroups( + a.handle.rk, + cUserListPtr, + cUserCount, + cOptions, + cQueue) + + // Wait for result, error or context timeout. + rkev, err := a.waitResult( + ctx, cQueue, C.RD_KAFKA_EVENT_DESCRIBEUSERSCRAMCREDENTIALS_RESULT) + if err != nil { + return result, err + } + defer C.rd_kafka_event_destroy(rkev) + + cRes := C.rd_kafka_event_DescribeUserScramCredentials_result(rkev) + + // Convert result from C to Go. + var cDescriptionCount C.size_t + cDescriptionCount = C.rd_kafka_DescribeUserScramCredentials_result_get_count(cRes) + for i := 0; i < int(cDescriptionCount); i++ { + var cDescription C.rd_kafka_UserScramCredentialsDescription_t + var cError C.rd_kafka_error_t + cDescription = C.rd_kafka_DescribeUserScramCredentials_result_get_description(cRes, i) + goUser := C.GoString(C.rd_kafka_UserScramCredentialsDescription_get_user(cDescription)) + goUserDescription := UserScramCredentialsDescription{user: goUser, err: nil, scram_credential_infos: nil} + // If Errored Populate the Error + cError = C.rd_kafka_UserScramCredentialsDescription_get_error(cDescription) + + if C.rd_kafka_error_code(cError) { + // populate the error + goUserDescription.err = newError(C.rd_kafka_error_code(cError)) + } else { + var cCredentialCount C.size_t + cCredentialCount = C.rd_kafka_UserScramCredentialsDescription_get_scramcredentialinfo_cnt(cDescription) + var scram_credential_infos []ScramCredentialInfo + for j := 0; j < int(cCredentialCount); j++ { + var scram_credential_info C.rd_kafka_ScramCredentialInfo_t + scram_credential_info = rd_kafka_UserScramCredentialsDescription_get_scramcredentialinfo(cDescription, j) + var cmechanism int + var citerations int + cmechanism = rd_kafka_ScramCredentialInfo_get_mechanism(scram_credential_info) + citerations = rd_kafka_ScramCredentialInfo_get_iterations(scram_credential_info) + cred := ScramCredentialInfo{mechanism: ScramMechanism(cmechanism), iterations: citerations} + scram_credential_info = append(scram_credential_infos, cred) + } + goUserDescription.scram_credential_infos = scram_credential_infos + } + result[goUser] = goUserDescription + } + return result, nil +} + +type UserScramCredentialDeletion struct { + user string + mechanism ScramMechanism +} +type UserScramCredentialUpsertion struct { + user string + salt string + password string + iterations int + mechanism ScramMechanism +} + +func (a *AdminClient) AlterUserScramCredentials( + ctx context.Context, upsertions []UserScramCredentialUpsertion, deletions []UserScramCredentialDeletion, + options ...AlterConsumerGroupOffsetsAdminOption) (result map[string]error, err error) { + result = make(map[string]error) + err = a.verifyClient() + if err != nil { + return result, err + } + + // Convert user names into char** required by the implementation. + cAlterationList := make([]*C.rd_kafka_UserScramCredentialAlteration_t, len(upsertions)+len(deletions)) + cAlterationCount := C.size_t(len(upsertions) + len(deletions)) + idx := 0 + + for itr := 0; itr < len(upsertions); itr++ { + cAlterationList[idx] = C.rd_kafka_UserScramCredentialUpsertion_new(upsertions[itr].user, upsertions[itr].salt, upsertions[itr].password, upsertions[itr].mechanism, upsertions[itr].iterations) + defer C.free(unsafe.Pointer(cAlterationList[idx])) + idx = idx + 1 + } + + for itr := 0; itr < len(deletions); itr++ { + cAlterationList[idx] = C.rd_kafka_UserScramCredentialDeletion_new(deletions[itr].user, deletions[itr].mechanism) + defer C.free(unsafe.Pointer(cAlterationList[idx])) + idx = idx + 1 + } + var cAlterationListPtr **C.rd_kafka_UserScramCredentialAlteration_t + if cAlterationCount > 0 { + cAlterationListPtr = ((**C.rd_kafka_UserScramCredentialAlteration_t)(&cAlterationList[0])) + } + + // Convert Go AdminOptions (if any) to C AdminOptions. + genericOptions := make([]AdminOption, len(options)) + for i := range options { + genericOptions[i] = options[i] + } + cOptions, err := adminOptionsSetup( + a.handle, C.RD_KAFKA_ADMIN_OP_ALTERUSERSCRAMCREDENTIALS, genericOptions) + if err != nil { + return result, err + } + defer C.rd_kafka_AdminOptions_destroy(cOptions) + + // Create temporary queue for async operation. + cQueue := C.rd_kafka_queue_new(a.handle.rk) + defer C.rd_kafka_queue_destroy(cQueue) + + // Call rd_kafka_DescribeConsumerGroups (asynchronous). + C.rd_kafka_AlterUserScramCredentials( + a.handle.rk, + cAlterationListPtr, + cAlterationCount, + cOptions, + cQueue) + + // Wait for result, error or context timeout. + rkev, err := a.waitResult( + ctx, cQueue, C.RD_KAFKA_EVENT_ALTERUSERSCRAMCREDENTIALS_RESULT) + if err != nil { + return result, err + } + defer C.rd_kafka_event_destroy(rkev) + + cRes := C.rd_kafka_event_AlterUserScramCredentials_result(rkev) + + // Convert result from C to Go. + var cUserCount C.size_t + cUserCount = C.rd_kafka_AlterUserScramCredentials_result_get_count(cRes) + for i := 0; i < int(cUserCount); i++ { + var cUserResult C.rd_kafka_UserScramCredentialAlterationResultElement_t + + cUserResult = C.rd_kafka_AlterUserScramCredentials_result_get_element(cRes, i) + goUser := C.GoString(C.rd_kafka_UserScramCredentialAlterationResultElement_get_user(cUserResult)) + + goerr := newError(C.rd_kafka_error_code(C.rd_kafka_UserScramCredentialAlterationResultElement_get_error(cUserResult))) + result[goUser] = goerr + } + return result, nil +} + // NewAdminClient creats a new AdminClient instance with a new underlying client instance func NewAdminClient(conf *ConfigMap) (*AdminClient, error) { From 084476788ac8fe69cd0ab4b142f8cd699fb92e2a Mon Sep 17 00:00:00 2001 From: mahajanadhitya Date: Fri, 26 May 2023 16:50:36 +0530 Subject: [PATCH 02/13] Changes --- kafka/adminapi.go | 27 +++++++++++++-------------- kafka/adminoptions.go | 15 ++++++++++++++- 2 files changed, 27 insertions(+), 15 deletions(-) diff --git a/kafka/adminapi.go b/kafka/adminapi.go index 735bfac27..313a8f492 100644 --- a/kafka/adminapi.go +++ b/kafka/adminapi.go @@ -2350,11 +2350,11 @@ type ScramMechanism int const ( // ResourceUnknown - Unknown - Scram_Unknown = ResourceType(C.RD_KAFKA_SCRAM_MECHANISM_UNKNOWN) + Scram_Unknown = ScramMechanism(C.RD_KAFKA_SCRAM_MECHANISM_UNKNOWN) // ResourceAny - match any resource type (DescribeConfigs) - Scram_SHA_256 = ResourceType(C.RD_KAFKA_SCRAM_MECHANISM_SHA_256) + Scram_SHA_256 = ScramMechanism(C.RD_KAFKA_SCRAM_MECHANISM_SHA_256) // ResourceTopic - Topic - Scram_SHA_512 = ResourceType(C.RD_KAFKA_SCRAM_MECHANISM_SHA_512) + Scram_SHA_512 = ScramMechanism(C.RD_KAFKA_SCRAM_MECHANISM_SHA_512) ) type ScramCredentialInfo struct { @@ -2369,7 +2369,7 @@ type UserScramCredentialsDescription struct { func (a *AdminClient) DescribeUserScramCredentials( ctx context.Context, users []string, - options ...AlterConsumerGroupOffsetsAdminOption) (result map[string]UserScramCredentialsDescription, err error) { + options ...DescribeUserScramCredentialsAdminOption) (result map[string]UserScramCredentialsDescription, err error) { result = make(map[string]UserScramCredentialsDescription) err = a.verifyClient() @@ -2408,7 +2408,7 @@ func (a *AdminClient) DescribeUserScramCredentials( defer C.rd_kafka_queue_destroy(cQueue) // Call rd_kafka_DescribeConsumerGroups (asynchronous). - C.rd_kafka_DescribeConsumerGroups( + C.rd_kafka_DescribeUserScramCredentials( a.handle.rk, cUserListPtr, cUserCount, @@ -2447,7 +2447,7 @@ func (a *AdminClient) DescribeUserScramCredentials( for j := 0; j < int(cCredentialCount); j++ { var scram_credential_info C.rd_kafka_ScramCredentialInfo_t scram_credential_info = rd_kafka_UserScramCredentialsDescription_get_scramcredentialinfo(cDescription, j) - var cmechanism int + var cmechanism C.rd_kafka_ScramMechanism_t var citerations int cmechanism = rd_kafka_ScramCredentialInfo_get_mechanism(scram_credential_info) citerations = rd_kafka_ScramCredentialInfo_get_iterations(scram_credential_info) @@ -2466,16 +2466,15 @@ type UserScramCredentialDeletion struct { mechanism ScramMechanism } type UserScramCredentialUpsertion struct { - user string - salt string - password string - iterations int - mechanism ScramMechanism + user string + salt string + password string + scram_credential_info ScramCredentialInfo } func (a *AdminClient) AlterUserScramCredentials( ctx context.Context, upsertions []UserScramCredentialUpsertion, deletions []UserScramCredentialDeletion, - options ...AlterConsumerGroupOffsetsAdminOption) (result map[string]error, err error) { + options ...AlterUserScramCredentialsAdminOption) (result map[string]error, err error) { result = make(map[string]error) err = a.verifyClient() if err != nil { @@ -2488,13 +2487,13 @@ func (a *AdminClient) AlterUserScramCredentials( idx := 0 for itr := 0; itr < len(upsertions); itr++ { - cAlterationList[idx] = C.rd_kafka_UserScramCredentialUpsertion_new(upsertions[itr].user, upsertions[itr].salt, upsertions[itr].password, upsertions[itr].mechanism, upsertions[itr].iterations) + cAlterationList[idx] = C.rd_kafka_UserScramCredentialUpsertion_new(upsertions[itr].user, upsertions[itr].salt, upsertions[itr].password, C.rd_kafka_ScramMechanism_t(upsertions[itr].scram_credential_info.mechanism), upsertions[itr].scram_credential_info.iterations) defer C.free(unsafe.Pointer(cAlterationList[idx])) idx = idx + 1 } for itr := 0; itr < len(deletions); itr++ { - cAlterationList[idx] = C.rd_kafka_UserScramCredentialDeletion_new(deletions[itr].user, deletions[itr].mechanism) + cAlterationList[idx] = C.rd_kafka_UserScramCredentialDeletion_new(deletions[itr].user, C.rd_kafka_ScramMechanism_t(deletions[itr].mechanism)) // problem the end point expects rd_kafka_ScramMechanism , will it type cast itself defer C.free(unsafe.Pointer(cAlterationList[idx])) idx = idx + 1 } diff --git a/kafka/adminoptions.go b/kafka/adminoptions.go index c0b9eb690..16b5efdc1 100644 --- a/kafka/adminoptions.go +++ b/kafka/adminoptions.go @@ -133,7 +133,10 @@ func (ao AdminOptionRequestTimeout) supportsListConsumerGroupOffsets() { } func (ao AdminOptionRequestTimeout) supportsAlterConsumerGroupOffsets() { } - +func (ao AdminOptionRequestTimeout) supportsDescribeUserScramCredentials() { +} +func (ao AdminOptionRequestTimeout) supportsAlterUserScramCredentials() { +} func (ao AdminOptionRequestTimeout) apply(cOptions *C.rd_kafka_AdminOptions_t) error { if !ao.isSet { return nil @@ -415,6 +418,16 @@ type AlterConsumerGroupOffsetsAdminOption interface { apply(cOptions *C.rd_kafka_AdminOptions_t) error } +type DescribeUserScramCredentialsAdminOption interface { + supportsDescribeUserScramCredentials() + apply(cOptions *C.rd_kafka_AdminOptions_t) error +} + +type AlterUserScramCredentialsAdminOption interface { + supportsAlterUserScramCredentials() + apply(cOptions *C.rd_kafka_AdminOptions_t) error +} + // AdminOption is a generic type not to be used directly. // // See CreateTopicsAdminOption et.al. From 0ed9207caa9715c6ed65e77cfd773cc66526b993 Mon Sep 17 00:00:00 2001 From: mahajanadhitya Date: Sat, 27 May 2023 20:20:16 +0530 Subject: [PATCH 03/13] Changes --- .../admin_user_scram_example.go | 125 ++++++++++++++++++ kafka/adminapi.go | 4 +- 2 files changed, 127 insertions(+), 2 deletions(-) create mode 100644 examples/admin_user_scram_example/admin_user_scram_example.go diff --git a/examples/admin_user_scram_example/admin_user_scram_example.go b/examples/admin_user_scram_example/admin_user_scram_example.go new file mode 100644 index 000000000..214263636 --- /dev/null +++ b/examples/admin_user_scram_example/admin_user_scram_example.go @@ -0,0 +1,125 @@ +/** + * Copyright 2022 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Example function-based high-level Apache Kafka consumer +package main + +// consumer_example implements a consumer using the non-channel Poll() API +// to retrieve messages and events. + +import ( + "context" + "fmt" + "os" + "os/signal" + "syscall" + "time" + + "github.com/confluentinc/confluent-kafka-go/v2/kafka" +) + +func main() { + + // if len(os.Args) < 1 { + // fmt.Fprintf(os.Stderr, "Usage: %s \n", + // os.Args[0]) + // os.Exit(1) + // } + var mechanismstring map[kafka.ScramMechanism]string + + mechanismstring[kafka.ScramMechanism.Scram_SHA_256] = "SCRAM-SHA-256" + mechanismstring[kafka.ScramMechanism.Scram_SHA_512] = "SCRAM-SHA-512" + mechanismstring[kafka.ScramMechanism.Scram_Unknown] = "UNKWOWN" + + bootstrapServers := "localhost:9092" + sigchan := make(chan os.Signal, 1) + signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM) + // Create new AdminClient. + ac, err := kafka.NewAdminClient(&kafka.ConfigMap{ + "bootstrap.servers": bootstrapServers, + }) + if err != nil { + fmt.Printf("Failed to create Admin client: %s\n", err) + os.Exit(1) + } + defer ac.Close() + + var users []string + users = append(users, "adhitya") + users = append(users, "pranav") + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) + defer cancel() + + res, err := ac.DescribeUserScramCredentials(ctx, users) + if err != nil { + fmt.Printf("Failed to Describe the User Scram Credentials: %s\n", err) + os.Exit(1) + + } else { + for username, description := range res { + fmt.Printf("Username : %s \n", username) + for i := 0; i < len(description.scram_credential_infos); i++ { + if description.err.code == 0 { + fmt.Printf(" Mechansim : %s Iterations : %d\n",mechanismstring[ description.scram_credential_infos[i].mechanism ], description.scram_credential_infos[i].iterations) + }else { + fmt.Printf((" Error[%d] : %s\n", err.code,err.str) + } + } + + } + } + var alterations []UserScramCredentialUpsertion + alterations = append(alterations, UserScramCredentialUpsertion({ user : "adhitya" , salt:"salt" , password : "password" ,mechanism : kafka.ScramMechanism.SCRAM_SHA_256,iterations :10000}) ) + alterations = append(alterations, UserScramCredentialUpsertion({ user : "pranav" , salt:"salt" , password : "password" ,mechanism : kafka.ScramMechanism.SCRAM_SHA_256,iterations :10000}) ) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) + defer cancel() + + res, err := ac.AlterUserScramCredentials(ctx, users) + if err != nil { + fmt.Printf("Failed to Alter the User Scram Credentials: %s\n", err) + os.Exit(1) + + } else { + for username, err := range res { + fmt.Printf("Username : %s \n", username) + if err.code == 0 { + fmt.Printf(" Success\n") + }else { + fmt.Printf((" Error[%d] : %s\n", err.code,err.str) + } + } + } + res, err := ac.DescribeUserScramCredentials(ctx, users) + if err != nil { + fmt.Printf("Failed to Describe the User Scram Credentials: %s\n", err) + os.Exit(1) + + } else { + for username, description := range res { + fmt.Printf("Username : %s \n", username) + for i := 0; i < len(description.scram_credential_infos); i++ { + if description.err.code == 0 { + fmt.Printf(" Mechansim : %s Iterations : %d\n",mechanismstring[ description.scram_credential_infos[i].mechanism ], description.scram_credential_infos[i].iterations) + }else { + fmt.Printf((" Error[%d] : %s\n", err.code,err.str) + } + } + + } + } +} diff --git a/kafka/adminapi.go b/kafka/adminapi.go index 313a8f492..cfd5a477b 100644 --- a/kafka/adminapi.go +++ b/kafka/adminapi.go @@ -2364,7 +2364,7 @@ type ScramCredentialInfo struct { type UserScramCredentialsDescription struct { user string scram_credential_infos []ScramCredentialInfo - err error + err Error } func (a *AdminClient) DescribeUserScramCredentials( @@ -2474,7 +2474,7 @@ type UserScramCredentialUpsertion struct { func (a *AdminClient) AlterUserScramCredentials( ctx context.Context, upsertions []UserScramCredentialUpsertion, deletions []UserScramCredentialDeletion, - options ...AlterUserScramCredentialsAdminOption) (result map[string]error, err error) { + options ...AlterUserScramCredentialsAdminOption) (result map[string]Error, err error) { result = make(map[string]error) err = a.verifyClient() if err != nil { From 6c974ce704c29c8c7bb76284d328fe773eadc4df Mon Sep 17 00:00:00 2001 From: mahajanadhitya Date: Mon, 29 May 2023 13:18:47 +0530 Subject: [PATCH 04/13] Changes --- .../admin_user_scram_example.go | 65 +++++++++---------- kafka/adminapi.go | 61 +++++++++-------- 2 files changed, 59 insertions(+), 67 deletions(-) diff --git a/examples/admin_user_scram_example/admin_user_scram_example.go b/examples/admin_user_scram_example/admin_user_scram_example.go index 214263636..0dfa940ca 100644 --- a/examples/admin_user_scram_example/admin_user_scram_example.go +++ b/examples/admin_user_scram_example/admin_user_scram_example.go @@ -32,7 +32,6 @@ import ( ) func main() { - // if len(os.Args) < 1 { // fmt.Fprintf(os.Stderr, "Usage: %s \n", // os.Args[0]) @@ -43,7 +42,7 @@ func main() { mechanismstring[kafka.ScramMechanism.Scram_SHA_256] = "SCRAM-SHA-256" mechanismstring[kafka.ScramMechanism.Scram_SHA_512] = "SCRAM-SHA-512" mechanismstring[kafka.ScramMechanism.Scram_Unknown] = "UNKWOWN" - + bootstrapServers := "localhost:9092" sigchan := make(chan os.Signal, 1) signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM) @@ -64,60 +63,56 @@ func main() { ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) defer cancel() - res, err := ac.DescribeUserScramCredentials(ctx, users) - if err != nil { + Describeres, Describeerr := ac.DescribeUserScramCredentials(ctx, users) + if Describeerr != nil { fmt.Printf("Failed to Describe the User Scram Credentials: %s\n", err) os.Exit(1) - + } else { - for username, description := range res { + for username, description := range Describeres { fmt.Printf("Username : %s \n", username) - for i := 0; i < len(description.scram_credential_infos); i++ { - if description.err.code == 0 { - fmt.Printf(" Mechansim : %s Iterations : %d\n",mechanismstring[ description.scram_credential_infos[i].mechanism ], description.scram_credential_infos[i].iterations) - }else { - fmt.Printf((" Error[%d] : %s\n", err.code,err.str) + if description.Err.Code() == 0 { + for i := 0; i < len(description.Scram_Credential_Infos); i++ { + fmt.Printf(" Mechansim : %s Iterations : %d\n", mechanismstring[description.Scram_Credential_Infos[i].Mechanism], description.Scram_Credential_Infos[i].Iterations) } + } else { + fmt.Printf(" Error[%d] : %s\n", description.Err.Code(), description.Err.String()) } - } } - var alterations []UserScramCredentialUpsertion - alterations = append(alterations, UserScramCredentialUpsertion({ user : "adhitya" , salt:"salt" , password : "password" ,mechanism : kafka.ScramMechanism.SCRAM_SHA_256,iterations :10000}) ) - alterations = append(alterations, UserScramCredentialUpsertion({ user : "pranav" , salt:"salt" , password : "password" ,mechanism : kafka.ScramMechanism.SCRAM_SHA_256,iterations :10000}) ) - - ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) - defer cancel() + var alterations []kafka.UserScramCredentialUpsertion + alterations = append(alterations, kafka.UserScramCredentialUpsertion{User: "adhitya", Salt: "salt", Password: "password", Scram_Credential_Info: kafka.ScramCredentialInfo{Mechanism: kafka.ScramMechanism.SCRAM_SHA_256, Iterations: 10000}}) + alterations = append(alterations, kafka.UserScramCredentialUpsertion{User: "pranav", Salt: "salt", Password: "password", Scram_Credential_Info: kafka.ScramCredentialInfo{Mechanism: kafka.ScramMechanism.SCRAM_SHA_256, Iterations: 10000}}) - res, err := ac.AlterUserScramCredentials(ctx, users) - if err != nil { + Alterres, Altererr := ac.AlterUserScramCredentials(ctx, alterations, nil) + if Altererr != nil { fmt.Printf("Failed to Alter the User Scram Credentials: %s\n", err) os.Exit(1) - + } else { - for username, err := range res { + for username, err := range Alterres { fmt.Printf("Username : %s \n", username) - if err.code == 0 { - fmt.Printf(" Success\n") - }else { - fmt.Printf((" Error[%d] : %s\n", err.code,err.str) + if err.Code() == 0 { + fmt.Printf(" Success\n") + } else { + fmt.Printf(" Error[%d] : %s\n", err.Code(), err.String()) } } } - res, err := ac.DescribeUserScramCredentials(ctx, users) - if err != nil { + Describeres, Describeerr = ac.DescribeUserScramCredentials(ctx, users) + if Describeerr != nil { fmt.Printf("Failed to Describe the User Scram Credentials: %s\n", err) os.Exit(1) - + } else { - for username, description := range res { + for username, description := range Describeres { fmt.Printf("Username : %s \n", username) - for i := 0; i < len(description.scram_credential_infos); i++ { - if description.err.code == 0 { - fmt.Printf(" Mechansim : %s Iterations : %d\n",mechanismstring[ description.scram_credential_infos[i].mechanism ], description.scram_credential_infos[i].iterations) - }else { - fmt.Printf((" Error[%d] : %s\n", err.code,err.str) + if description.Err.Code() == 0 { + for i := 0; i < len(description.Scram_Credential_Infos); i++ { + fmt.Printf(" Mechansim : %s Iterations : %d\n", mechanismstring[description.Scram_Credential_Infos[i].Mechanism], description.Scram_Credential_Infos[i].Iterations) } + } else { + fmt.Printf(" Error[%d] : %s\n", description.Err.Code(), description.Err.String()) } } diff --git a/kafka/adminapi.go b/kafka/adminapi.go index cfd5a477b..55076b7e2 100644 --- a/kafka/adminapi.go +++ b/kafka/adminapi.go @@ -2349,22 +2349,19 @@ func (a *AdminClient) AlterConsumerGroupOffsets( type ScramMechanism int const ( - // ResourceUnknown - Unknown Scram_Unknown = ScramMechanism(C.RD_KAFKA_SCRAM_MECHANISM_UNKNOWN) - // ResourceAny - match any resource type (DescribeConfigs) Scram_SHA_256 = ScramMechanism(C.RD_KAFKA_SCRAM_MECHANISM_SHA_256) - // ResourceTopic - Topic Scram_SHA_512 = ScramMechanism(C.RD_KAFKA_SCRAM_MECHANISM_SHA_512) ) type ScramCredentialInfo struct { - iterations int - mechanism ScramMechanism + Iterations int + Mechanism ScramMechanism } type UserScramCredentialsDescription struct { - user string - scram_credential_infos []ScramCredentialInfo - err Error + User string + Scram_Credential_Infos []ScramCredentialInfo + Err Error } func (a *AdminClient) DescribeUserScramCredentials( @@ -2429,32 +2426,32 @@ func (a *AdminClient) DescribeUserScramCredentials( var cDescriptionCount C.size_t cDescriptionCount = C.rd_kafka_DescribeUserScramCredentials_result_get_count(cRes) for i := 0; i < int(cDescriptionCount); i++ { - var cDescription C.rd_kafka_UserScramCredentialsDescription_t - var cError C.rd_kafka_error_t - cDescription = C.rd_kafka_DescribeUserScramCredentials_result_get_description(cRes, i) + var cDescription *C.rd_kafka_UserScramCredentialsDescription_t + var cError *C.rd_kafka_error_t + cDescription = C.rd_kafka_DescribeUserScramCredentials_result_get_description(cRes, C.size_t(i)) goUser := C.GoString(C.rd_kafka_UserScramCredentialsDescription_get_user(cDescription)) - goUserDescription := UserScramCredentialsDescription{user: goUser, err: nil, scram_credential_infos: nil} + goUserDescription := UserScramCredentialsDescription{User: goUser} // If Errored Populate the Error cError = C.rd_kafka_UserScramCredentialsDescription_get_error(cDescription) - if C.rd_kafka_error_code(cError) { + if C.rd_kafka_error_code(cError) != 0 { // populate the error - goUserDescription.err = newError(C.rd_kafka_error_code(cError)) + goUserDescription.Err = newError(C.rd_kafka_error_code(cError)) } else { var cCredentialCount C.size_t cCredentialCount = C.rd_kafka_UserScramCredentialsDescription_get_scramcredentialinfo_cnt(cDescription) var scram_credential_infos []ScramCredentialInfo for j := 0; j < int(cCredentialCount); j++ { - var scram_credential_info C.rd_kafka_ScramCredentialInfo_t - scram_credential_info = rd_kafka_UserScramCredentialsDescription_get_scramcredentialinfo(cDescription, j) + var scram_credential_info *C.rd_kafka_ScramCredentialInfo_t + scram_credential_info = C.rd_kafka_UserScramCredentialsDescription_get_scramcredentialinfo(cDescription, C.size_t(j)) var cmechanism C.rd_kafka_ScramMechanism_t var citerations int - cmechanism = rd_kafka_ScramCredentialInfo_get_mechanism(scram_credential_info) - citerations = rd_kafka_ScramCredentialInfo_get_iterations(scram_credential_info) - cred := ScramCredentialInfo{mechanism: ScramMechanism(cmechanism), iterations: citerations} - scram_credential_info = append(scram_credential_infos, cred) + cmechanism = C.rd_kafka_ScramCredentialInfo_get_mechanism(scram_credential_info) + citerations = int(C.rd_kafka_ScramCredentialInfo_get_iterations(scram_credential_info)) + cred := ScramCredentialInfo{Mechanism: ScramMechanism(cmechanism), Iterations: int(citerations)} + scram_credential_infos = append(scram_credential_infos, cred) } - goUserDescription.scram_credential_infos = scram_credential_infos + goUserDescription.Scram_Credential_Infos = scram_credential_infos } result[goUser] = goUserDescription } @@ -2462,20 +2459,20 @@ func (a *AdminClient) DescribeUserScramCredentials( } type UserScramCredentialDeletion struct { - user string - mechanism ScramMechanism + User string + Mechanism ScramMechanism } type UserScramCredentialUpsertion struct { - user string - salt string - password string - scram_credential_info ScramCredentialInfo + User string + Salt string + Password string + Scram_Credential_Info ScramCredentialInfo } func (a *AdminClient) AlterUserScramCredentials( ctx context.Context, upsertions []UserScramCredentialUpsertion, deletions []UserScramCredentialDeletion, options ...AlterUserScramCredentialsAdminOption) (result map[string]Error, err error) { - result = make(map[string]error) + result = make(map[string]Error) err = a.verifyClient() if err != nil { return result, err @@ -2487,13 +2484,13 @@ func (a *AdminClient) AlterUserScramCredentials( idx := 0 for itr := 0; itr < len(upsertions); itr++ { - cAlterationList[idx] = C.rd_kafka_UserScramCredentialUpsertion_new(upsertions[itr].user, upsertions[itr].salt, upsertions[itr].password, C.rd_kafka_ScramMechanism_t(upsertions[itr].scram_credential_info.mechanism), upsertions[itr].scram_credential_info.iterations) + cAlterationList[idx] = C.rd_kafka_UserScramCredentialUpsertion_new(C.CString(upsertions[itr].User), C.CString(upsertions[itr].Salt), C.CString(upsertions[itr].Password), C.rd_kafka_ScramMechanism_t(upsertions[itr].Scram_Credential_Info.Mechanism), C.int(upsertions[itr].Scram_Credential_Info.Iterations)) defer C.free(unsafe.Pointer(cAlterationList[idx])) idx = idx + 1 } for itr := 0; itr < len(deletions); itr++ { - cAlterationList[idx] = C.rd_kafka_UserScramCredentialDeletion_new(deletions[itr].user, C.rd_kafka_ScramMechanism_t(deletions[itr].mechanism)) // problem the end point expects rd_kafka_ScramMechanism , will it type cast itself + cAlterationList[idx] = C.rd_kafka_UserScramCredentialDeletion_new(C.CString(deletions[itr].User), C.rd_kafka_ScramMechanism_t(deletions[itr].Mechanism)) // problem the end point expects rd_kafka_ScramMechanism , will it type cast itself defer C.free(unsafe.Pointer(cAlterationList[idx])) idx = idx + 1 } @@ -2540,9 +2537,9 @@ func (a *AdminClient) AlterUserScramCredentials( var cUserCount C.size_t cUserCount = C.rd_kafka_AlterUserScramCredentials_result_get_count(cRes) for i := 0; i < int(cUserCount); i++ { - var cUserResult C.rd_kafka_UserScramCredentialAlterationResultElement_t + var cUserResult *C.rd_kafka_UserScramCredentialAlterationResultElement_t - cUserResult = C.rd_kafka_AlterUserScramCredentials_result_get_element(cRes, i) + cUserResult = C.rd_kafka_AlterUserScramCredentials_result_get_element(cRes, C.size_t(i)) goUser := C.GoString(C.rd_kafka_UserScramCredentialAlterationResultElement_get_user(cUserResult)) goerr := newError(C.rd_kafka_error_code(C.rd_kafka_UserScramCredentialAlterationResultElement_get_error(cUserResult))) From 83d7199723d025532f127901926b74900cf116d0 Mon Sep 17 00:00:00 2001 From: mahajanadhitya Date: Mon, 29 May 2023 14:06:56 +0530 Subject: [PATCH 05/13] Changes --- .../admin_user_scram_example.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/examples/admin_user_scram_example/admin_user_scram_example.go b/examples/admin_user_scram_example/admin_user_scram_example.go index 0dfa940ca..44b25fd0d 100644 --- a/examples/admin_user_scram_example/admin_user_scram_example.go +++ b/examples/admin_user_scram_example/admin_user_scram_example.go @@ -37,11 +37,11 @@ func main() { // os.Args[0]) // os.Exit(1) // } - var mechanismstring map[kafka.ScramMechanism]string + mechanismstring := make(map[kafka.ScramMechanism]string) - mechanismstring[kafka.ScramMechanism.Scram_SHA_256] = "SCRAM-SHA-256" - mechanismstring[kafka.ScramMechanism.Scram_SHA_512] = "SCRAM-SHA-512" - mechanismstring[kafka.ScramMechanism.Scram_Unknown] = "UNKWOWN" + mechanismstring[kafka.Scram_SHA_256] = "SCRAM-SHA-256" + mechanismstring[kafka.Scram_SHA_512] = "SCRAM-SHA-512" + mechanismstring[kafka.Scram_Unknown] = "UNKWOWN" bootstrapServers := "localhost:9092" sigchan := make(chan os.Signal, 1) @@ -81,8 +81,8 @@ func main() { } } var alterations []kafka.UserScramCredentialUpsertion - alterations = append(alterations, kafka.UserScramCredentialUpsertion{User: "adhitya", Salt: "salt", Password: "password", Scram_Credential_Info: kafka.ScramCredentialInfo{Mechanism: kafka.ScramMechanism.SCRAM_SHA_256, Iterations: 10000}}) - alterations = append(alterations, kafka.UserScramCredentialUpsertion{User: "pranav", Salt: "salt", Password: "password", Scram_Credential_Info: kafka.ScramCredentialInfo{Mechanism: kafka.ScramMechanism.SCRAM_SHA_256, Iterations: 10000}}) + alterations = append(alterations, kafka.UserScramCredentialUpsertion{User: "adhitya", Salt: "salt", Password: "password", Scram_Credential_Info: kafka.ScramCredentialInfo{Mechanism: kafka.Scram_SHA_256, Iterations: 10000}}) + alterations = append(alterations, kafka.UserScramCredentialUpsertion{User: "pranav", Salt: "salt", Password: "password", Scram_Credential_Info: kafka.ScramCredentialInfo{Mechanism: kafka.Scram_SHA_256, Iterations: 10000}}) Alterres, Altererr := ac.AlterUserScramCredentials(ctx, alterations, nil) if Altererr != nil { From 5d88f279c44470946f45128840c759d3ac8bbe0d Mon Sep 17 00:00:00 2001 From: mahajanadhitya Date: Mon, 29 May 2023 17:32:08 +0530 Subject: [PATCH 06/13] Changes --- .../admin_user_scram_example.go | 53 +++++++++++--- kafka/adminapi.go | 12 ++- kafka/adminapi_test.go | 30 ++++++++ kafka/integration_test.go | 73 +++++++++++++++++++ 4 files changed, 153 insertions(+), 15 deletions(-) diff --git a/examples/admin_user_scram_example/admin_user_scram_example.go b/examples/admin_user_scram_example/admin_user_scram_example.go index 44b25fd0d..c1b39e008 100644 --- a/examples/admin_user_scram_example/admin_user_scram_example.go +++ b/examples/admin_user_scram_example/admin_user_scram_example.go @@ -57,15 +57,13 @@ func main() { defer ac.Close() var users []string - users = append(users, "adhitya") - users = append(users, "pranav") - + users = append(users, "non-existent") ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) defer cancel() Describeres, Describeerr := ac.DescribeUserScramCredentials(ctx, users) if Describeerr != nil { - fmt.Printf("Failed to Describe the User Scram Credentials: %s\n", err) + fmt.Printf("Failed to Describe the User Scram Credentials: %s\n", Describeerr) os.Exit(1) } else { @@ -80,13 +78,11 @@ func main() { } } } - var alterations []kafka.UserScramCredentialUpsertion - alterations = append(alterations, kafka.UserScramCredentialUpsertion{User: "adhitya", Salt: "salt", Password: "password", Scram_Credential_Info: kafka.ScramCredentialInfo{Mechanism: kafka.Scram_SHA_256, Iterations: 10000}}) - alterations = append(alterations, kafka.UserScramCredentialUpsertion{User: "pranav", Salt: "salt", Password: "password", Scram_Credential_Info: kafka.ScramCredentialInfo{Mechanism: kafka.Scram_SHA_256, Iterations: 10000}}) - - Alterres, Altererr := ac.AlterUserScramCredentials(ctx, alterations, nil) + var upsertions []kafka.UserScramCredentialUpsertion + upsertions = append(upsertions, kafka.UserScramCredentialUpsertion{User: "non-existent", Salt: "salt", Password: "password", Scram_Credential_Info: kafka.ScramCredentialInfo{Mechanism: kafka.Scram_SHA_256, Iterations: 10000}}) + Alterres, Altererr := ac.AlterUserScramCredentials(ctx, upsertions, nil) if Altererr != nil { - fmt.Printf("Failed to Alter the User Scram Credentials: %s\n", err) + fmt.Printf("Failed to Alter the User Scram Credentials: %s\n", Altererr) os.Exit(1) } else { @@ -101,7 +97,7 @@ func main() { } Describeres, Describeerr = ac.DescribeUserScramCredentials(ctx, users) if Describeerr != nil { - fmt.Printf("Failed to Describe the User Scram Credentials: %s\n", err) + fmt.Printf("Failed to Describe the User Scram Credentials: %s\n", Describeerr) os.Exit(1) } else { @@ -117,4 +113,39 @@ func main() { } } + var deletions []kafka.UserScramCredentialDeletion + deletions = append(deletions, kafka.UserScramCredentialDeletion{User: "non-existent", Mechanism: kafka.Scram_SHA_256}) + + Alterres, Altererr = ac.AlterUserScramCredentials(ctx, nil, deletions) + if Altererr != nil { + fmt.Printf("Failed to Alter the User Scram Credentials: %s\n", Altererr) + os.Exit(1) + + } else { + for username, err := range Alterres { + fmt.Printf("Username : %s \n", username) + if err.Code() == 0 { + fmt.Printf(" Success\n") + } else { + fmt.Printf(" Error[%d] : %s\n", err.Code(), err.String()) + } + } + } + Describeres, Describeerr = ac.DescribeUserScramCredentials(ctx, users) + if Describeerr != nil { + fmt.Printf("Failed to Describe the User Scram Credentials: %s\n", Describeerr) + os.Exit(1) + + } else { + for username, description := range Describeres { + fmt.Printf("Username : %s \n", username) + if description.Err.Code() == 0 { + for i := 0; i < len(description.Scram_Credential_Infos); i++ { + fmt.Printf(" Mechansim : %s Iterations : %d\n", mechanismstring[description.Scram_Credential_Infos[i].Mechanism], description.Scram_Credential_Infos[i].Iterations) + } + } else { + fmt.Printf(" Error[%d] : %s\n", description.Err.Code(), description.Err.String()) + } + } + } } diff --git a/kafka/adminapi.go b/kafka/adminapi.go index 55076b7e2..352cc670c 100644 --- a/kafka/adminapi.go +++ b/kafka/adminapi.go @@ -2405,13 +2405,15 @@ func (a *AdminClient) DescribeUserScramCredentials( defer C.rd_kafka_queue_destroy(cQueue) // Call rd_kafka_DescribeConsumerGroups (asynchronous). - C.rd_kafka_DescribeUserScramCredentials( + api_error := C.rd_kafka_DescribeUserScramCredentials( a.handle.rk, cUserListPtr, cUserCount, cOptions, cQueue) - + if api_error != 0 { + return result, newError(api_error) + } // Wait for result, error or context timeout. rkev, err := a.waitResult( ctx, cQueue, C.RD_KAFKA_EVENT_DESCRIBEUSERSCRAMCREDENTIALS_RESULT) @@ -2516,13 +2518,15 @@ func (a *AdminClient) AlterUserScramCredentials( defer C.rd_kafka_queue_destroy(cQueue) // Call rd_kafka_DescribeConsumerGroups (asynchronous). - C.rd_kafka_AlterUserScramCredentials( + api_error := C.rd_kafka_AlterUserScramCredentials( a.handle.rk, cAlterationListPtr, cAlterationCount, cOptions, cQueue) - + if api_error != 0 { + return result, newError(api_error) + } // Wait for result, error or context timeout. rkev, err := a.waitResult( ctx, cQueue, C.RD_KAFKA_EVENT_ALTERUSERSCRAMCREDENTIALS_RESULT) diff --git a/kafka/adminapi_test.go b/kafka/adminapi_test.go index 331872c64..7efc373b1 100644 --- a/kafka/adminapi_test.go +++ b/kafka/adminapi_test.go @@ -555,7 +555,37 @@ func testAdminAPIsAlterConsumerGroupOffsets( t.Fatalf("Expected DeadlineExceeded, not %v", ctx.Err()) } } +func testAdminAPIsBrokerScram(what string, a *AdminClient, expDuration time.Duration, t *testing.T) { + var users []string + users = append(users, "non-existent") + users = append(users, "non-existent") + ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) + defer cancel() + _, Describeerr := a.DescribeUserScramCredentials(ctx, users) + if Describeerr == nil { + t.Fatalf("Duplicate Resource Should give an error\n") + } + var upsertions []UserScramCredentialUpsertion + upsertions = append(upsertions, UserScramCredentialUpsertion{User: "non-existent", Salt: "salt", Password: "password", Scram_Credential_Info: ScramCredentialInfo{Mechanism: Scram_SHA_256, Iterations: 10000}}) + var deletions []UserScramCredentialDeletion + deletions = append(deletions, UserScramCredentialDeletion{User: "non-existent", Mechanism: Scram_SHA_256}) + _, Altererr := a.AlterUserScramCredentials(ctx, upsertions, deletions) + if Altererr == nil { + t.Fatalf("Duplicate Resource Should give an error\n") + } + upsertions = append(upsertions, UserScramCredentialUpsertion{User: "non-existent", Salt: "sa2lt", Password: "password", Scram_Credential_Info: ScramCredentialInfo{Mechanism: Scram_SHA_256, Iterations: 10000}}) + _, Altererr = a.AlterUserScramCredentials(ctx, upsertions, nil) + if Altererr == nil { + t.Fatalf("Duplicate Resource Should give an error\n") + } + deletions = append(deletions, UserScramCredentialDeletion{User: "non-existent", Mechanism: Scram_SHA_256}) + _, Altererr = a.AlterUserScramCredentials(ctx, upsertions, nil) + if Altererr == nil { + t.Fatalf("Duplicate Resource Should give an error\n") + } + +} func testAdminAPIs(what string, a *AdminClient, t *testing.T) { t.Logf("AdminClient API testing on %s: %s", a, what) diff --git a/kafka/integration_test.go b/kafka/integration_test.go index 63d641c8f..b158d2cd2 100644 --- a/kafka/integration_test.go +++ b/kafka/integration_test.go @@ -2395,6 +2395,79 @@ func (its *IntegrationTestSuite) TestProducerConsumerHeaders() { } +func (its *IntegrationTestSuite) TestBrokerScramAPI() { + t := its.T() + ac, err := NewAdminClient(&ConfigMap{ + "bootstrap.servers": testconf.Brokers, + }) + if err != nil { + t.Fatalf("Failed to create Admin Client: %s\n", err) + } + defer ac.Close() + + var users []string + users = append(users, "non-existent") + ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) + defer cancel() + + Describeres, Describeerr := ac.DescribeUserScramCredentials(ctx, users) + if Describeerr != nil { + t.Fatalf("Failed to Describe the User Scram Credentials: %s\n", Describeerr) + } else { + for _, description := range Describeres { + if description.Err.Code() != 91 { + t.Fatalf("Error code should be 91 instead it is %d", description.Err.Code()) + } + } + } + var upsertions []UserScramCredentialUpsertion + upsertions = append(upsertions, UserScramCredentialUpsertion{User: "non-existent", Salt: "salt", Password: "password", Scram_Credential_Info: ScramCredentialInfo{Mechanism: Scram_SHA_256, Iterations: 10000}}) + Alterres, Altererr := ac.AlterUserScramCredentials(ctx, upsertions, nil) + if Altererr != nil { + t.Fatalf("Failed to Alter the User Scram Credentials: %s\n", Altererr) + } else { + for _, err := range Alterres { + if err.Code() != 0 { + t.Fatalf("Error code should be 0 instead it is %d", err.Code()) + } + } + } + Describeres, Describeerr = ac.DescribeUserScramCredentials(ctx, users) + if Describeerr != nil { + t.Fatalf("Failed to Describe the User Scram Credentials: %s\n", Describeerr) + } else { + for _, description := range Describeres { + if description.Err.Code() != 0 { + t.Fatalf("Error code should be 0 instead it is %d", description.Err.Code()) + } + if (description.Scram_Credential_Infos[0].Iterations != 10000) || (description.Scram_Credential_Infos[0].Mechanism != Scram_SHA_256) { + t.Fatalf("The Scram Mechanism does not match the upserted mechanism") + } + } + } + var deletions []UserScramCredentialDeletion + deletions = append(deletions, UserScramCredentialDeletion{User: "non-existent", Mechanism: Scram_SHA_256}) + Alterres, Altererr = ac.AlterUserScramCredentials(ctx, nil, deletions) + if Altererr != nil { + t.Fatalf("Failed to Alter the User Scram Credentials: %s\n", Altererr) + } else { + for _, err := range Alterres { + if err.Code() != 0 { + t.Fatalf("Error code should be 0 instead it is %d", err.Code()) + } + } + } + Describeres, Describeerr = ac.DescribeUserScramCredentials(ctx, users) + if Describeerr != nil { + t.Fatalf("Failed to Describe the User Scram Credentials: %s\n", Describeerr) + } else { + for _, description := range Describeres { + if description.Err.Code() != 91 { + t.Fatalf("Error code should be 91 instead it is %d", description.Err.Code()) + } + } + } +} func TestIntegration(t *testing.T) { its := new(IntegrationTestSuite) testconfInit() From 06400103491139291f9ad8dd9e98f9a16fb9e4c8 Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Wed, 28 Jun 2023 20:02:15 +0200 Subject: [PATCH 07/13] Reflect librdkafka changes, Naming Ignore example file --- examples/.gitignore | 1 + .../admin_user_scram_credentials.go} | 38 +++---- kafka/adminapi.go | 98 +++++++++++-------- kafka/adminapi_test.go | 41 ++++---- kafka/integration_test.go | 77 ++++++++------- 5 files changed, 140 insertions(+), 115 deletions(-) rename examples/{admin_user_scram_example/admin_user_scram_example.go => admin_user_scram_credentials/admin_user_scram_credentials.go} (69%) diff --git a/examples/.gitignore b/examples/.gitignore index 3519e7b27..6f4bab875 100644 --- a/examples/.gitignore +++ b/examples/.gitignore @@ -9,6 +9,7 @@ admin_describe_config/admin_describe_config admin_describe_consumer_groups/admin_describe_consumer_groups admin_list_consumer_groups/admin_list_consumer_groups admin_list_consumer_group_offsets/admin_list_consumer_group_offsets +admin_user_scram_credentials/admin_user_scram_credentials avro_generic_consumer_example/avro_generic_consumer_example avro_generic_producer_example/avro_generic_producer_example avro_specific_consumer_example/avro_specific_consumer_example diff --git a/examples/admin_user_scram_example/admin_user_scram_example.go b/examples/admin_user_scram_credentials/admin_user_scram_credentials.go similarity index 69% rename from examples/admin_user_scram_example/admin_user_scram_example.go rename to examples/admin_user_scram_credentials/admin_user_scram_credentials.go index c1b39e008..b982732ed 100644 --- a/examples/admin_user_scram_example/admin_user_scram_example.go +++ b/examples/admin_user_scram_credentials/admin_user_scram_credentials.go @@ -1,5 +1,5 @@ /** - * Copyright 2022 Confluent Inc. + * Copyright 2023 Confluent Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -37,11 +37,11 @@ func main() { // os.Args[0]) // os.Exit(1) // } - mechanismstring := make(map[kafka.ScramMechanism]string) + mechanismString := make(map[kafka.ScramMechanism]string) - mechanismstring[kafka.Scram_SHA_256] = "SCRAM-SHA-256" - mechanismstring[kafka.Scram_SHA_512] = "SCRAM-SHA-512" - mechanismstring[kafka.Scram_Unknown] = "UNKWOWN" + mechanismString[kafka.ScramMechanismSHA256] = "SCRAM-SHA-256" + mechanismString[kafka.ScramMechanismSHA512] = "SCRAM-SHA-512" + mechanismString[kafka.ScramMechanismUnknown] = "UNKNOWN" bootstrapServers := "localhost:9092" sigchan := make(chan os.Signal, 1) @@ -69,17 +69,17 @@ func main() { } else { for username, description := range Describeres { fmt.Printf("Username : %s \n", username) - if description.Err.Code() == 0 { - for i := 0; i < len(description.Scram_Credential_Infos); i++ { - fmt.Printf(" Mechansim : %s Iterations : %d\n", mechanismstring[description.Scram_Credential_Infos[i].Mechanism], description.Scram_Credential_Infos[i].Iterations) + if description.Error.Code() == 0 { + for i := 0; i < len(description.ScramCredentialInfos); i++ { + fmt.Printf(" Mechanism : %s Iterations : %d\n", mechanismString[description.ScramCredentialInfos[i].Mechanism], description.ScramCredentialInfos[i].Iterations) } } else { - fmt.Printf(" Error[%d] : %s\n", description.Err.Code(), description.Err.String()) + fmt.Printf(" Error[%d] : %s\n", description.Error.Code(), description.Error.String()) } } } var upsertions []kafka.UserScramCredentialUpsertion - upsertions = append(upsertions, kafka.UserScramCredentialUpsertion{User: "non-existent", Salt: "salt", Password: "password", Scram_Credential_Info: kafka.ScramCredentialInfo{Mechanism: kafka.Scram_SHA_256, Iterations: 10000}}) + upsertions = append(upsertions, kafka.UserScramCredentialUpsertion{User: "non-existent", Salt: []byte("salt"), Password: []byte("password"), ScramCredentialInfo: kafka.ScramCredentialInfo{Mechanism: kafka.ScramMechanismSHA256, Iterations: 10000}}) Alterres, Altererr := ac.AlterUserScramCredentials(ctx, upsertions, nil) if Altererr != nil { fmt.Printf("Failed to Alter the User Scram Credentials: %s\n", Altererr) @@ -103,18 +103,18 @@ func main() { } else { for username, description := range Describeres { fmt.Printf("Username : %s \n", username) - if description.Err.Code() == 0 { - for i := 0; i < len(description.Scram_Credential_Infos); i++ { - fmt.Printf(" Mechansim : %s Iterations : %d\n", mechanismstring[description.Scram_Credential_Infos[i].Mechanism], description.Scram_Credential_Infos[i].Iterations) + if description.Error.Code() == 0 { + for i := 0; i < len(description.ScramCredentialInfos); i++ { + fmt.Printf(" Mechanism : %s Iterations : %d\n", mechanismString[description.ScramCredentialInfos[i].Mechanism], description.ScramCredentialInfos[i].Iterations) } } else { - fmt.Printf(" Error[%d] : %s\n", description.Err.Code(), description.Err.String()) + fmt.Printf(" Error[%d] : %s\n", description.Error.Code(), description.Error.String()) } } } var deletions []kafka.UserScramCredentialDeletion - deletions = append(deletions, kafka.UserScramCredentialDeletion{User: "non-existent", Mechanism: kafka.Scram_SHA_256}) + deletions = append(deletions, kafka.UserScramCredentialDeletion{User: "non-existent", Mechanism: kafka.ScramMechanismSHA256}) Alterres, Altererr = ac.AlterUserScramCredentials(ctx, nil, deletions) if Altererr != nil { @@ -139,12 +139,12 @@ func main() { } else { for username, description := range Describeres { fmt.Printf("Username : %s \n", username) - if description.Err.Code() == 0 { - for i := 0; i < len(description.Scram_Credential_Infos); i++ { - fmt.Printf(" Mechansim : %s Iterations : %d\n", mechanismstring[description.Scram_Credential_Infos[i].Mechanism], description.Scram_Credential_Infos[i].Iterations) + if description.Error.Code() == 0 { + for i := 0; i < len(description.ScramCredentialInfos); i++ { + fmt.Printf(" Mechanism : %s Iterations : %d\n", mechanismString[description.ScramCredentialInfos[i].Mechanism], description.ScramCredentialInfos[i].Iterations) } } else { - fmt.Printf(" Error[%d] : %s\n", description.Err.Code(), description.Err.String()) + fmt.Printf(" Error[%d] : %s\n", description.Error.Code(), description.Error.String()) } } } diff --git a/kafka/adminapi.go b/kafka/adminapi.go index 352cc670c..44d3fc6c1 100644 --- a/kafka/adminapi.go +++ b/kafka/adminapi.go @@ -92,6 +92,20 @@ ConsumerGroupDescription_by_idx(const rd_kafka_ConsumerGroupDescription_t **resu return result_groups[idx]; } +static const rd_kafka_UserScramCredentialsDescription_t * +DescribeUserScramCredentials_result_description_by_idx(const rd_kafka_UserScramCredentialsDescription_t **descriptions, size_t cnt, size_t idx) { + if (idx >= cnt) + return NULL; + return descriptions[idx]; +} + +static const rd_kafka_AlterUserScramCredentials_result_response_t* +AlterUserScramCredentials_result_response_by_idx(const rd_kafka_AlterUserScramCredentials_result_response_t **responses, size_t cnt, size_t idx) { + if (idx >= cnt) + return NULL; + return responses[idx]; +} + static const rd_kafka_error_t * error_by_idx(const rd_kafka_error_t **errors, size_t cnt, size_t idx) { if (idx >= cnt) @@ -2349,9 +2363,9 @@ func (a *AdminClient) AlterConsumerGroupOffsets( type ScramMechanism int const ( - Scram_Unknown = ScramMechanism(C.RD_KAFKA_SCRAM_MECHANISM_UNKNOWN) - Scram_SHA_256 = ScramMechanism(C.RD_KAFKA_SCRAM_MECHANISM_SHA_256) - Scram_SHA_512 = ScramMechanism(C.RD_KAFKA_SCRAM_MECHANISM_SHA_512) + ScramMechanismUnknown = ScramMechanism(C.RD_KAFKA_SCRAM_MECHANISM_UNKNOWN) + ScramMechanismSHA256 = ScramMechanism(C.RD_KAFKA_SCRAM_MECHANISM_SHA_256) + ScramMechanismSHA512 = ScramMechanism(C.RD_KAFKA_SCRAM_MECHANISM_SHA_512) ) type ScramCredentialInfo struct { @@ -2359,9 +2373,9 @@ type ScramCredentialInfo struct { Mechanism ScramMechanism } type UserScramCredentialsDescription struct { - User string - Scram_Credential_Infos []ScramCredentialInfo - Err Error + User string + ScramCredentialInfos []ScramCredentialInfo + Error Error } func (a *AdminClient) DescribeUserScramCredentials( @@ -2405,15 +2419,13 @@ func (a *AdminClient) DescribeUserScramCredentials( defer C.rd_kafka_queue_destroy(cQueue) // Call rd_kafka_DescribeConsumerGroups (asynchronous). - api_error := C.rd_kafka_DescribeUserScramCredentials( + C.rd_kafka_DescribeUserScramCredentials( a.handle.rk, cUserListPtr, cUserCount, cOptions, cQueue) - if api_error != 0 { - return result, newError(api_error) - } + // Wait for result, error or context timeout. rkev, err := a.waitResult( ctx, cQueue, C.RD_KAFKA_EVENT_DESCRIBEUSERSCRAMCREDENTIALS_RESULT) @@ -2426,34 +2438,35 @@ func (a *AdminClient) DescribeUserScramCredentials( // Convert result from C to Go. var cDescriptionCount C.size_t - cDescriptionCount = C.rd_kafka_DescribeUserScramCredentials_result_get_count(cRes) + var cDescriptions **C.rd_kafka_UserScramCredentialsDescription_t + cDescriptions = C.rd_kafka_DescribeUserScramCredentials_result_descriptions(cRes, &cDescriptionCount) for i := 0; i < int(cDescriptionCount); i++ { var cDescription *C.rd_kafka_UserScramCredentialsDescription_t var cError *C.rd_kafka_error_t - cDescription = C.rd_kafka_DescribeUserScramCredentials_result_get_description(cRes, C.size_t(i)) - goUser := C.GoString(C.rd_kafka_UserScramCredentialsDescription_get_user(cDescription)) + cDescription = C.DescribeUserScramCredentials_result_description_by_idx(cDescriptions, cDescriptionCount, C.size_t(i)) + goUser := C.GoString(C.rd_kafka_UserScramCredentialsDescription_user(cDescription)) goUserDescription := UserScramCredentialsDescription{User: goUser} // If Errored Populate the Error - cError = C.rd_kafka_UserScramCredentialsDescription_get_error(cDescription) + cError = C.rd_kafka_UserScramCredentialsDescription_error(cDescription) if C.rd_kafka_error_code(cError) != 0 { // populate the error - goUserDescription.Err = newError(C.rd_kafka_error_code(cError)) + goUserDescription.Error = newError(C.rd_kafka_error_code(cError)) } else { var cCredentialCount C.size_t - cCredentialCount = C.rd_kafka_UserScramCredentialsDescription_get_scramcredentialinfo_cnt(cDescription) - var scram_credential_infos []ScramCredentialInfo + cCredentialCount = C.rd_kafka_UserScramCredentialsDescription_scramcredentialinfo_count(cDescription) + var scramCredentialInfos []ScramCredentialInfo for j := 0; j < int(cCredentialCount); j++ { - var scram_credential_info *C.rd_kafka_ScramCredentialInfo_t - scram_credential_info = C.rd_kafka_UserScramCredentialsDescription_get_scramcredentialinfo(cDescription, C.size_t(j)) + var scramCredentialInfo *C.rd_kafka_ScramCredentialInfo_t + scramCredentialInfo = C.rd_kafka_UserScramCredentialsDescription_scramcredentialinfo(cDescription, C.size_t(j)) var cmechanism C.rd_kafka_ScramMechanism_t var citerations int - cmechanism = C.rd_kafka_ScramCredentialInfo_get_mechanism(scram_credential_info) - citerations = int(C.rd_kafka_ScramCredentialInfo_get_iterations(scram_credential_info)) + cmechanism = C.rd_kafka_ScramCredentialInfo_mechanism(scramCredentialInfo) + citerations = int(C.rd_kafka_ScramCredentialInfo_iterations(scramCredentialInfo)) cred := ScramCredentialInfo{Mechanism: ScramMechanism(cmechanism), Iterations: int(citerations)} - scram_credential_infos = append(scram_credential_infos, cred) + scramCredentialInfos = append(scramCredentialInfos, cred) } - goUserDescription.Scram_Credential_Infos = scram_credential_infos + goUserDescription.ScramCredentialInfos = scramCredentialInfos } result[goUser] = goUserDescription } @@ -2465,10 +2478,10 @@ type UserScramCredentialDeletion struct { Mechanism ScramMechanism } type UserScramCredentialUpsertion struct { - User string - Salt string - Password string - Scram_Credential_Info ScramCredentialInfo + User string + Salt []byte + Password []byte + ScramCredentialInfo ScramCredentialInfo } func (a *AdminClient) AlterUserScramCredentials( @@ -2486,7 +2499,11 @@ func (a *AdminClient) AlterUserScramCredentials( idx := 0 for itr := 0; itr < len(upsertions); itr++ { - cAlterationList[idx] = C.rd_kafka_UserScramCredentialUpsertion_new(C.CString(upsertions[itr].User), C.CString(upsertions[itr].Salt), C.CString(upsertions[itr].Password), C.rd_kafka_ScramMechanism_t(upsertions[itr].Scram_Credential_Info.Mechanism), C.int(upsertions[itr].Scram_Credential_Info.Iterations)) + cAlterationList[idx] = C.rd_kafka_UserScramCredentialUpsertion_new(C.CString(upsertions[itr].User), + (*C.uchar)(&upsertions[itr].Salt[0]), C.size_t(len(upsertions[itr].Salt)), + (*C.uchar)(&upsertions[itr].Password[0]), C.size_t(len(upsertions[itr].Password)), + C.rd_kafka_ScramMechanism_t(upsertions[itr].ScramCredentialInfo.Mechanism), + C.int(upsertions[itr].ScramCredentialInfo.Iterations)) defer C.free(unsafe.Pointer(cAlterationList[idx])) idx = idx + 1 } @@ -2517,16 +2534,14 @@ func (a *AdminClient) AlterUserScramCredentials( cQueue := C.rd_kafka_queue_new(a.handle.rk) defer C.rd_kafka_queue_destroy(cQueue) - // Call rd_kafka_DescribeConsumerGroups (asynchronous). - api_error := C.rd_kafka_AlterUserScramCredentials( + // Call rd_kafka_AlterUserScramCredentials (asynchronous). + C.rd_kafka_AlterUserScramCredentials( a.handle.rk, cAlterationListPtr, cAlterationCount, cOptions, cQueue) - if api_error != 0 { - return result, newError(api_error) - } + // Wait for result, error or context timeout. rkev, err := a.waitResult( ctx, cQueue, C.RD_KAFKA_EVENT_ALTERUSERSCRAMCREDENTIALS_RESULT) @@ -2538,15 +2553,14 @@ func (a *AdminClient) AlterUserScramCredentials( cRes := C.rd_kafka_event_AlterUserScramCredentials_result(rkev) // Convert result from C to Go. - var cUserCount C.size_t - cUserCount = C.rd_kafka_AlterUserScramCredentials_result_get_count(cRes) - for i := 0; i < int(cUserCount); i++ { - var cUserResult *C.rd_kafka_UserScramCredentialAlterationResultElement_t - - cUserResult = C.rd_kafka_AlterUserScramCredentials_result_get_element(cRes, C.size_t(i)) - goUser := C.GoString(C.rd_kafka_UserScramCredentialAlterationResultElement_get_user(cUserResult)) - - goerr := newError(C.rd_kafka_error_code(C.rd_kafka_UserScramCredentialAlterationResultElement_get_error(cUserResult))) + var cResponses **C.rd_kafka_AlterUserScramCredentials_result_response_t + var cResponseSize C.size_t + cResponses = C.rd_kafka_AlterUserScramCredentials_result_responses(cRes, &cResponseSize) + for i := 0; i < int(cResponseSize); i++ { + var cResponse *C.rd_kafka_AlterUserScramCredentials_result_response_t + cResponse = C.AlterUserScramCredentials_result_response_by_idx(cResponses, cResponseSize, C.size_t(i)) + goUser := C.GoString(C.rd_kafka_AlterUserScramCredentials_result_response_user(cResponse)) + goerr := newError(C.rd_kafka_error_code(C.rd_kafka_AlterUserScramCredentials_result_response_error(cResponse))) result[goUser] = goerr } return result, nil diff --git a/kafka/adminapi_test.go b/kafka/adminapi_test.go index 7efc373b1..de8dc027c 100644 --- a/kafka/adminapi_test.go +++ b/kafka/adminapi_test.go @@ -555,37 +555,38 @@ func testAdminAPIsAlterConsumerGroupOffsets( t.Fatalf("Expected DeadlineExceeded, not %v", ctx.Err()) } } -func testAdminAPIsBrokerScram(what string, a *AdminClient, expDuration time.Duration, t *testing.T) { +func testAdminAPIsUserScramCredentials(what string, a *AdminClient, expDuration time.Duration, t *testing.T) { var users []string users = append(users, "non-existent") users = append(users, "non-existent") - ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() - _, Describeerr := a.DescribeUserScramCredentials(ctx, users) - if Describeerr == nil { - t.Fatalf("Duplicate Resource Should give an error\n") + _, describeErr := a.DescribeUserScramCredentials(ctx, users) + if describeErr == nil { + t.Fatalf("Duplicate user should give an error\n") } + var upsertions []UserScramCredentialUpsertion - upsertions = append(upsertions, UserScramCredentialUpsertion{User: "non-existent", Salt: "salt", Password: "password", Scram_Credential_Info: ScramCredentialInfo{Mechanism: Scram_SHA_256, Iterations: 10000}}) + upsertions = append(upsertions, UserScramCredentialUpsertion{User: "non-existent", Salt: []byte("salt"), Password: []byte("password"), ScramCredentialInfo: ScramCredentialInfo{Mechanism: ScramMechanismSHA256, Iterations: 10000}}) var deletions []UserScramCredentialDeletion - deletions = append(deletions, UserScramCredentialDeletion{User: "non-existent", Mechanism: Scram_SHA_256}) - _, Altererr := a.AlterUserScramCredentials(ctx, upsertions, deletions) - if Altererr == nil { - t.Fatalf("Duplicate Resource Should give an error\n") - } - upsertions = append(upsertions, UserScramCredentialUpsertion{User: "non-existent", Salt: "sa2lt", Password: "password", Scram_Credential_Info: ScramCredentialInfo{Mechanism: Scram_SHA_256, Iterations: 10000}}) - _, Altererr = a.AlterUserScramCredentials(ctx, upsertions, nil) - if Altererr == nil { - t.Fatalf("Duplicate Resource Should give an error\n") + deletions = append(deletions, UserScramCredentialDeletion{User: "non-existent", Mechanism: ScramMechanismSHA256}) + _, alterErr := a.AlterUserScramCredentials(ctx, upsertions, deletions) + if alterErr == nil { + t.Fatalf("Expected context deadline exceeded, got not error\n") } - deletions = append(deletions, UserScramCredentialDeletion{User: "non-existent", Mechanism: Scram_SHA_256}) - _, Altererr = a.AlterUserScramCredentials(ctx, upsertions, nil) - if Altererr == nil { - t.Fatalf("Duplicate Resource Should give an error\n") + + _, alterErr = a.AlterUserScramCredentials(ctx, upsertions, nil) + if alterErr == nil { + t.Fatalf("Expected context deadline exceeded, got not error\n") } + _, alterErr = a.AlterUserScramCredentials(ctx, nil, deletions) + if alterErr == nil { + t.Fatalf("Expected context deadline exceeded, got not error\n") + } } + func testAdminAPIs(what string, a *AdminClient, t *testing.T) { t.Logf("AdminClient API testing on %s: %s", a, what) @@ -822,6 +823,8 @@ func testAdminAPIs(what string, a *AdminClient, t *testing.T) { testAdminAPIsListConsumerGroupOffsets(what, a, expDuration, t) testAdminAPIsAlterConsumerGroupOffsets(what, a, expDuration, t) + + testAdminAPIsUserScramCredentials(what, a, expDuration, t) } // TestAdminAPIs dry-tests most Admin APIs, no broker is needed. diff --git a/kafka/integration_test.go b/kafka/integration_test.go index b158d2cd2..4ad1196de 100644 --- a/kafka/integration_test.go +++ b/kafka/integration_test.go @@ -2395,7 +2395,7 @@ func (its *IntegrationTestSuite) TestProducerConsumerHeaders() { } -func (its *IntegrationTestSuite) TestBrokerScramAPI() { +func (its *IntegrationTestSuite) TestUserScramCredentialsAPI() { t := its.T() ac, err := NewAdminClient(&ConfigMap{ "bootstrap.servers": testconf.Brokers, @@ -2410,60 +2410,67 @@ func (its *IntegrationTestSuite) TestBrokerScramAPI() { ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) defer cancel() - Describeres, Describeerr := ac.DescribeUserScramCredentials(ctx, users) - if Describeerr != nil { - t.Fatalf("Failed to Describe the User Scram Credentials: %s\n", Describeerr) + describeRes, describeErr := ac.DescribeUserScramCredentials(ctx, users) + if describeErr != nil { + t.Fatalf("Failed to Describe the User Scram Credentials: %s\n", describeErr) } else { - for _, description := range Describeres { - if description.Err.Code() != 91 { - t.Fatalf("Error code should be 91 instead it is %d", description.Err.Code()) + for _, description := range describeRes { + if description.Error.Code() != ErrResourceNotFound { + t.Fatalf("Error should be ErrResourceNotFound instead it is %s", description.Error.Code()) } } } var upsertions []UserScramCredentialUpsertion - upsertions = append(upsertions, UserScramCredentialUpsertion{User: "non-existent", Salt: "salt", Password: "password", Scram_Credential_Info: ScramCredentialInfo{Mechanism: Scram_SHA_256, Iterations: 10000}}) - Alterres, Altererr := ac.AlterUserScramCredentials(ctx, upsertions, nil) - if Altererr != nil { - t.Fatalf("Failed to Alter the User Scram Credentials: %s\n", Altererr) + upsertions = append(upsertions, UserScramCredentialUpsertion{User: "non-existent", Salt: []byte("salt"), Password: []byte("password"), ScramCredentialInfo: ScramCredentialInfo{Mechanism: ScramMechanismSHA256, Iterations: 10000}}) + alterRes, alterErr := ac.AlterUserScramCredentials(ctx, upsertions, nil) + if alterErr != nil { + t.Fatalf("Failed to Alter the User Scram Credentials: %s\n", alterErr) } else { - for _, err := range Alterres { - if err.Code() != 0 { - t.Fatalf("Error code should be 0 instead it is %d", err.Code()) + for _, err := range alterRes { + if err.Code() != ErrNoError { + t.Fatalf("Error code should be ErrNoError instead it is %d", err.Code()) } } } - Describeres, Describeerr = ac.DescribeUserScramCredentials(ctx, users) - if Describeerr != nil { - t.Fatalf("Failed to Describe the User Scram Credentials: %s\n", Describeerr) + describeRes, describeErr = ac.DescribeUserScramCredentials(ctx, users) + if describeErr != nil { + t.Fatalf("Failed to Describe the User Scram Credentials: %s\n", describeErr) } else { - for _, description := range Describeres { - if description.Err.Code() != 0 { - t.Fatalf("Error code should be 0 instead it is %d", description.Err.Code()) + for _, description := range describeRes { + if description.Error.Code() != ErrNoError { + t.Fatalf("Error code should be ErrNoError instead it is %d", description.Error.Code()) } - if (description.Scram_Credential_Infos[0].Iterations != 10000) || (description.Scram_Credential_Infos[0].Mechanism != Scram_SHA_256) { - t.Fatalf("The Scram Mechanism does not match the upserted mechanism") + if description.ScramCredentialInfos[0].Iterations != 10000 { + t.Fatalf("Iterations field doesn't match the upserted value. Expected 10000, got %d", + description.ScramCredentialInfos[0].Iterations) + } + if description.ScramCredentialInfos[0].Mechanism != ScramMechanismSHA256 { + t.Fatalf("Mechanism field doesn't match the upserted value. Expected %d, got %d", + ScramMechanismSHA256, description.ScramCredentialInfos[0].Mechanism) } } } + var deletions []UserScramCredentialDeletion - deletions = append(deletions, UserScramCredentialDeletion{User: "non-existent", Mechanism: Scram_SHA_256}) - Alterres, Altererr = ac.AlterUserScramCredentials(ctx, nil, deletions) - if Altererr != nil { - t.Fatalf("Failed to Alter the User Scram Credentials: %s\n", Altererr) + deletions = append(deletions, UserScramCredentialDeletion{User: "non-existent", Mechanism: ScramMechanismSHA256}) + alterRes, alterErr = ac.AlterUserScramCredentials(ctx, nil, deletions) + if alterErr != nil { + t.Fatalf("Failed to alter user scram credentials: %s\n", alterErr) } else { - for _, err := range Alterres { - if err.Code() != 0 { - t.Fatalf("Error code should be 0 instead it is %d", err.Code()) + for _, err := range alterRes { + if err.Code() != ErrNoError { + t.Fatalf("Error code should be ErrNoError instead it is %d", err.Code()) } } } - Describeres, Describeerr = ac.DescribeUserScramCredentials(ctx, users) - if Describeerr != nil { - t.Fatalf("Failed to Describe the User Scram Credentials: %s\n", Describeerr) + + describeRes, describeErr = ac.DescribeUserScramCredentials(ctx, users) + if describeErr != nil { + t.Fatalf("Failed to Describe the User Scram Credentials: %s\n", describeErr) } else { - for _, description := range Describeres { - if description.Err.Code() != 91 { - t.Fatalf("Error code should be 91 instead it is %d", description.Err.Code()) + for _, description := range describeRes { + if description.Error.Code() != ErrResourceNotFound { + t.Fatalf("Error should be ErrResourceNotFound instead it is %s", description.Error) } } } From 89dbfe1bcee29fcadaabdb3b5afce302665c8e1c Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Thu, 29 Jun 2023 11:57:54 +0200 Subject: [PATCH 08/13] Improve examples Fix memory leaks Make salt optional --- examples/.gitignore | 3 +- .../admin_alter_user_scram_credentials.go | 147 +++++++++++++++++ .../admin_describe_user_scram_credentials.go | 84 ++++++++++ .../admin_user_scram_credentials.go | 151 ------------------ kafka/adminapi.go | 20 ++- 5 files changed, 248 insertions(+), 157 deletions(-) create mode 100644 examples/admin_alter_user_scram_credentials/admin_alter_user_scram_credentials.go create mode 100644 examples/admin_describe_user_scram_credentials/admin_describe_user_scram_credentials.go delete mode 100644 examples/admin_user_scram_credentials/admin_user_scram_credentials.go diff --git a/examples/.gitignore b/examples/.gitignore index 6f4bab875..df267fe13 100644 --- a/examples/.gitignore +++ b/examples/.gitignore @@ -9,7 +9,8 @@ admin_describe_config/admin_describe_config admin_describe_consumer_groups/admin_describe_consumer_groups admin_list_consumer_groups/admin_list_consumer_groups admin_list_consumer_group_offsets/admin_list_consumer_group_offsets -admin_user_scram_credentials/admin_user_scram_credentials +admin_describe_user_scram_credentials/admin_describe_user_scram_credentials +admin_alter_user_scram_credentials/admin_alter_user_scram_credentials avro_generic_consumer_example/avro_generic_consumer_example avro_generic_producer_example/avro_generic_producer_example avro_specific_consumer_example/avro_specific_consumer_example diff --git a/examples/admin_alter_user_scram_credentials/admin_alter_user_scram_credentials.go b/examples/admin_alter_user_scram_credentials/admin_alter_user_scram_credentials.go new file mode 100644 index 000000000..40a96aa81 --- /dev/null +++ b/examples/admin_alter_user_scram_credentials/admin_alter_user_scram_credentials.go @@ -0,0 +1,147 @@ +/** + * Copyright 2023 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Alter user SCRAM credentials example +package main + +import ( + "context" + "fmt" + "os" + "strconv" + "time" + + "github.com/confluentinc/confluent-kafka-go/v2/kafka" +) + +func usage(reason string) { + fmt.Fprintf(os.Stderr, + "Error: %s\n", + reason) + fmt.Fprintf(os.Stderr, + "Usage: %s "+ + "(UPSERT "+ + " |DELETE ) "+ + "[(UPSERT "+ + " |DELETE ) ...]\n", + os.Args[0]) + os.Exit(1) +} + +func main() { + + // 2 + variable arguments + nArgs := len(os.Args) + + if nArgs < 2 { + usage("bootstrap-servers required") + } + + mechanismString := make(map[kafka.ScramMechanism]string) + mechanismString[kafka.ScramMechanismSHA256] = "SCRAM-SHA-256" + mechanismString[kafka.ScramMechanismSHA512] = "SCRAM-SHA-512" + mechanismString[kafka.ScramMechanismUnknown] = "UNKNOWN" + stringMechanism := make(map[string]kafka.ScramMechanism) + stringMechanism["SCRAM-SHA-256"] = kafka.ScramMechanismSHA256 + stringMechanism["SCRAM-SHA-512"] = kafka.ScramMechanismSHA512 + + bootstrapServers := os.Args[1] + + // Create new AdminClient. + ac, err := kafka.NewAdminClient(&kafka.ConfigMap{ + "bootstrap.servers": bootstrapServers, + }) + if err != nil { + fmt.Printf("Failed to create Admin client: %s\n", err) + os.Exit(1) + } + defer ac.Close() + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) + defer cancel() + + var upsertions []kafka.UserScramCredentialUpsertion + var deletions []kafka.UserScramCredentialDeletion + + i := 2 + nAlterations := 0 + for i < nArgs { + switch os.Args[i] { + case "UPSERT": + if i+5 >= nArgs { + usage(fmt.Sprintf( + "wrong argument count for alteration %d: expected 6, found %d", + nAlterations, nArgs-i)) + } + + user := os.Args[i+1] + mechanism := stringMechanism[os.Args[i+2]] + iterations, err := strconv.Atoi(os.Args[i+3]) + if err != nil { + usage(err.Error()) + } + salt := []byte(os.Args[i+4]) + password := []byte(os.Args[i+5]) + if len(salt) == 0 { + salt = nil + } + upsertions = append(upsertions, + kafka.UserScramCredentialUpsertion{ + User: user, + Salt: salt, + Password: password, + ScramCredentialInfo: kafka.ScramCredentialInfo{ + Mechanism: mechanism, + Iterations: iterations, + }, + }) + i += 6 + case "DELETE": + if i+2 >= nArgs { + usage(fmt.Sprintf( + "wrong argument count for alteration %d: expected 3, found %d", + nAlterations, nArgs-i)) + } + + user := os.Args[i+1] + mechanism := stringMechanism[os.Args[i+2]] + deletions = append(deletions, + kafka.UserScramCredentialDeletion{ + User: user, + Mechanism: mechanism, + }) + i += 3 + default: + usage(fmt.Sprintf("unknown alteration %s", os.Args[i])) + } + nAlterations++ + } + + alterRes, alterErr := ac.AlterUserScramCredentials(ctx, upsertions, deletions) + if alterErr != nil { + fmt.Printf("Failed to alter user scram credentials: %s\n", alterErr) + os.Exit(1) + } else { + for username, err := range alterRes { + fmt.Printf("Username: %s\n", username) + if err.Code() == 0 { + fmt.Printf(" Success\n") + } else { + fmt.Printf(" Error[%d]: %s\n", err.Code(), err.String()) + } + } + } +} diff --git a/examples/admin_describe_user_scram_credentials/admin_describe_user_scram_credentials.go b/examples/admin_describe_user_scram_credentials/admin_describe_user_scram_credentials.go new file mode 100644 index 000000000..d514c531e --- /dev/null +++ b/examples/admin_describe_user_scram_credentials/admin_describe_user_scram_credentials.go @@ -0,0 +1,84 @@ +/** + * Copyright 2023 Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Describe user SCRAM credentials example +package main + +import ( + "context" + "fmt" + "os" + "time" + + "github.com/confluentinc/confluent-kafka-go/v2/kafka" +) + +func usage(reason string) { + fmt.Fprintf(os.Stderr, + "Error: %s\n", + reason) + fmt.Fprintf(os.Stderr, + "Usage: %s ...\n", + os.Args[0]) + os.Exit(1) +} + +func main() { + + // 2 + n arguments + nArgs := len(os.Args) + + if nArgs < 2 { + usage("bootstrap-servers required") + } + + mechanismString := make(map[kafka.ScramMechanism]string) + mechanismString[kafka.ScramMechanismSHA256] = "SCRAM-SHA-256" + mechanismString[kafka.ScramMechanismSHA512] = "SCRAM-SHA-512" + mechanismString[kafka.ScramMechanismUnknown] = "UNKNOWN" + + bootstrapServers := os.Args[1] + + // Create new AdminClient. + ac, err := kafka.NewAdminClient(&kafka.ConfigMap{ + "bootstrap.servers": bootstrapServers, + }) + if err != nil { + fmt.Printf("Failed to create Admin client: %s\n", err) + os.Exit(1) + } + defer ac.Close() + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) + defer cancel() + + describeRes, describeErr := ac.DescribeUserScramCredentials(ctx, os.Args[2:]) + if describeErr != nil { + fmt.Printf("Failed to describe user scram credentials: %s\n", describeErr) + os.Exit(1) + } else { + for username, description := range describeRes { + fmt.Printf("Username: %s \n", username) + if description.Error.Code() == 0 { + for i := 0; i < len(description.ScramCredentialInfos); i++ { + fmt.Printf(" Mechanism: %s Iterations: %d\n", mechanismString[description.ScramCredentialInfos[i].Mechanism], description.ScramCredentialInfos[i].Iterations) + } + } else { + fmt.Printf(" Error[%d]: %s\n", description.Error.Code(), description.Error.String()) + } + } + } +} diff --git a/examples/admin_user_scram_credentials/admin_user_scram_credentials.go b/examples/admin_user_scram_credentials/admin_user_scram_credentials.go deleted file mode 100644 index b982732ed..000000000 --- a/examples/admin_user_scram_credentials/admin_user_scram_credentials.go +++ /dev/null @@ -1,151 +0,0 @@ -/** - * Copyright 2023 Confluent Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -// Example function-based high-level Apache Kafka consumer -package main - -// consumer_example implements a consumer using the non-channel Poll() API -// to retrieve messages and events. - -import ( - "context" - "fmt" - "os" - "os/signal" - "syscall" - "time" - - "github.com/confluentinc/confluent-kafka-go/v2/kafka" -) - -func main() { - // if len(os.Args) < 1 { - // fmt.Fprintf(os.Stderr, "Usage: %s \n", - // os.Args[0]) - // os.Exit(1) - // } - mechanismString := make(map[kafka.ScramMechanism]string) - - mechanismString[kafka.ScramMechanismSHA256] = "SCRAM-SHA-256" - mechanismString[kafka.ScramMechanismSHA512] = "SCRAM-SHA-512" - mechanismString[kafka.ScramMechanismUnknown] = "UNKNOWN" - - bootstrapServers := "localhost:9092" - sigchan := make(chan os.Signal, 1) - signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM) - // Create new AdminClient. - ac, err := kafka.NewAdminClient(&kafka.ConfigMap{ - "bootstrap.servers": bootstrapServers, - }) - if err != nil { - fmt.Printf("Failed to create Admin client: %s\n", err) - os.Exit(1) - } - defer ac.Close() - - var users []string - users = append(users, "non-existent") - ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) - defer cancel() - - Describeres, Describeerr := ac.DescribeUserScramCredentials(ctx, users) - if Describeerr != nil { - fmt.Printf("Failed to Describe the User Scram Credentials: %s\n", Describeerr) - os.Exit(1) - - } else { - for username, description := range Describeres { - fmt.Printf("Username : %s \n", username) - if description.Error.Code() == 0 { - for i := 0; i < len(description.ScramCredentialInfos); i++ { - fmt.Printf(" Mechanism : %s Iterations : %d\n", mechanismString[description.ScramCredentialInfos[i].Mechanism], description.ScramCredentialInfos[i].Iterations) - } - } else { - fmt.Printf(" Error[%d] : %s\n", description.Error.Code(), description.Error.String()) - } - } - } - var upsertions []kafka.UserScramCredentialUpsertion - upsertions = append(upsertions, kafka.UserScramCredentialUpsertion{User: "non-existent", Salt: []byte("salt"), Password: []byte("password"), ScramCredentialInfo: kafka.ScramCredentialInfo{Mechanism: kafka.ScramMechanismSHA256, Iterations: 10000}}) - Alterres, Altererr := ac.AlterUserScramCredentials(ctx, upsertions, nil) - if Altererr != nil { - fmt.Printf("Failed to Alter the User Scram Credentials: %s\n", Altererr) - os.Exit(1) - - } else { - for username, err := range Alterres { - fmt.Printf("Username : %s \n", username) - if err.Code() == 0 { - fmt.Printf(" Success\n") - } else { - fmt.Printf(" Error[%d] : %s\n", err.Code(), err.String()) - } - } - } - Describeres, Describeerr = ac.DescribeUserScramCredentials(ctx, users) - if Describeerr != nil { - fmt.Printf("Failed to Describe the User Scram Credentials: %s\n", Describeerr) - os.Exit(1) - - } else { - for username, description := range Describeres { - fmt.Printf("Username : %s \n", username) - if description.Error.Code() == 0 { - for i := 0; i < len(description.ScramCredentialInfos); i++ { - fmt.Printf(" Mechanism : %s Iterations : %d\n", mechanismString[description.ScramCredentialInfos[i].Mechanism], description.ScramCredentialInfos[i].Iterations) - } - } else { - fmt.Printf(" Error[%d] : %s\n", description.Error.Code(), description.Error.String()) - } - - } - } - var deletions []kafka.UserScramCredentialDeletion - deletions = append(deletions, kafka.UserScramCredentialDeletion{User: "non-existent", Mechanism: kafka.ScramMechanismSHA256}) - - Alterres, Altererr = ac.AlterUserScramCredentials(ctx, nil, deletions) - if Altererr != nil { - fmt.Printf("Failed to Alter the User Scram Credentials: %s\n", Altererr) - os.Exit(1) - - } else { - for username, err := range Alterres { - fmt.Printf("Username : %s \n", username) - if err.Code() == 0 { - fmt.Printf(" Success\n") - } else { - fmt.Printf(" Error[%d] : %s\n", err.Code(), err.String()) - } - } - } - Describeres, Describeerr = ac.DescribeUserScramCredentials(ctx, users) - if Describeerr != nil { - fmt.Printf("Failed to Describe the User Scram Credentials: %s\n", Describeerr) - os.Exit(1) - - } else { - for username, description := range Describeres { - fmt.Printf("Username : %s \n", username) - if description.Error.Code() == 0 { - for i := 0; i < len(description.ScramCredentialInfos); i++ { - fmt.Printf(" Mechanism : %s Iterations : %d\n", mechanismString[description.ScramCredentialInfos[i].Mechanism], description.ScramCredentialInfos[i].Iterations) - } - } else { - fmt.Printf(" Error[%d] : %s\n", description.Error.Code(), description.Error.String()) - } - } - } -} diff --git a/kafka/adminapi.go b/kafka/adminapi.go index 44d3fc6c1..cde155bd7 100644 --- a/kafka/adminapi.go +++ b/kafka/adminapi.go @@ -2499,18 +2499,28 @@ func (a *AdminClient) AlterUserScramCredentials( idx := 0 for itr := 0; itr < len(upsertions); itr++ { - cAlterationList[idx] = C.rd_kafka_UserScramCredentialUpsertion_new(C.CString(upsertions[itr].User), - (*C.uchar)(&upsertions[itr].Salt[0]), C.size_t(len(upsertions[itr].Salt)), + user := C.CString(upsertions[itr].User) + var salt *C.uchar = nil + var saltSize C.size_t = 0 + if upsertions[itr].Salt != nil { + salt = (*C.uchar)(&upsertions[itr].Salt[0]) + saltSize = C.size_t(len(upsertions[itr].Salt)) + } + defer C.free(unsafe.Pointer(user)) + cAlterationList[idx] = C.rd_kafka_UserScramCredentialUpsertion_new(user, + salt, saltSize, (*C.uchar)(&upsertions[itr].Password[0]), C.size_t(len(upsertions[itr].Password)), C.rd_kafka_ScramMechanism_t(upsertions[itr].ScramCredentialInfo.Mechanism), C.int(upsertions[itr].ScramCredentialInfo.Iterations)) - defer C.free(unsafe.Pointer(cAlterationList[idx])) + defer C.rd_kafka_UserScramCredentialAlteration_destroy(cAlterationList[idx]) idx = idx + 1 } for itr := 0; itr < len(deletions); itr++ { - cAlterationList[idx] = C.rd_kafka_UserScramCredentialDeletion_new(C.CString(deletions[itr].User), C.rd_kafka_ScramMechanism_t(deletions[itr].Mechanism)) // problem the end point expects rd_kafka_ScramMechanism , will it type cast itself - defer C.free(unsafe.Pointer(cAlterationList[idx])) + user := C.CString(deletions[itr].User) + defer C.free(unsafe.Pointer(user)) + cAlterationList[idx] = C.rd_kafka_UserScramCredentialDeletion_new(user, C.rd_kafka_ScramMechanism_t(deletions[itr].Mechanism)) + defer C.rd_kafka_UserScramCredentialAlteration_destroy(cAlterationList[idx]) idx = idx + 1 } var cAlterationListPtr **C.rd_kafka_UserScramCredentialAlteration_t From 8b2b6c47947124ab64646a96b17dfc08f7e591d2 Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Thu, 29 Jun 2023 12:49:02 +0200 Subject: [PATCH 09/13] Add documentation, making golint pass --- kafka/adminapi.go | 69 +++++++++++++++++++++++++++++++++++++------ kafka/adminoptions.go | 6 ++++ 2 files changed, 66 insertions(+), 9 deletions(-) diff --git a/kafka/adminapi.go b/kafka/adminapi.go index cde155bd7..433a2a865 100644 --- a/kafka/adminapi.go +++ b/kafka/adminapi.go @@ -2360,24 +2360,52 @@ func (a *AdminClient) AlterConsumerGroupOffsets( return acgor, nil } +// ScramMechanism enumerates SASL/SCRAM mechanisms. +// Used by `AdminClient.AlterUserScramCredentials` and `AdminClient.DescribeUserScramCredentials`. type ScramMechanism int const ( + // ScramMechanismUnknown - Unknown SASL/SCRAM mechanism ScramMechanismUnknown = ScramMechanism(C.RD_KAFKA_SCRAM_MECHANISM_UNKNOWN) - ScramMechanismSHA256 = ScramMechanism(C.RD_KAFKA_SCRAM_MECHANISM_SHA_256) - ScramMechanismSHA512 = ScramMechanism(C.RD_KAFKA_SCRAM_MECHANISM_SHA_512) + // ScramMechanismSHA256 - SCRAM-SHA-256 mechanism + ScramMechanismSHA256 = ScramMechanism(C.RD_KAFKA_SCRAM_MECHANISM_SHA_256) + // ScramMechanismSHA512 - SCRAM-SHA-512 mechanism + ScramMechanismSHA512 = ScramMechanism(C.RD_KAFKA_SCRAM_MECHANISM_SHA_512) ) +// ScramCredentialInfo contains Mechanism and Iterations for a +// SASL/SCRAM credential associated with a user. type ScramCredentialInfo struct { + // Iterations - positive number of iterations used when creating the credential Iterations int - Mechanism ScramMechanism + // Mechanism - SASL/SCRAM mechanism + Mechanism ScramMechanism } + +// UserScramCredentialsDescription represent all SASL/SCRAM credentials +// associated with a user that can be retrieved, or an error indicating +// why credentials could not be retrieved. type UserScramCredentialsDescription struct { - User string + // User - the user name. + User string + // ScramCredentialInfos - SASL/SCRAM credential representations for the user. ScramCredentialInfos []ScramCredentialInfo - Error Error + // Error - error corresponding to this user description. + Error Error } +// DescribeUserScramCredentials describe SASL/SCRAM credentials for the +// specified user names. +// +// Parameters: +// - `ctx` - context with the maximum amount of time to block, or nil for +// indefinite. +// - `users` - a slice of string, each one correspond to a user name, no +// duplicates are allowed +// - `options` - DescribeUserScramCredentialsAdminOption options. +// +// Returns a map from user name to user SCRAM credentials description. +// Each description can have an individual error. func (a *AdminClient) DescribeUserScramCredentials( ctx context.Context, users []string, options ...DescribeUserScramCredentialsAdminOption) (result map[string]UserScramCredentialsDescription, err error) { @@ -2473,17 +2501,40 @@ func (a *AdminClient) DescribeUserScramCredentials( return result, nil } +// UserScramCredentialDeletion is a request to delete +// a SASL/SCRAM credential for a user. type UserScramCredentialDeletion struct { - User string + // User - user name + User string + // Mechanism - SASL/SCRAM mechanism. Mechanism ScramMechanism } + +// UserScramCredentialUpsertion is a request to update/insert +// a SASL/SCRAM credential for a user. type UserScramCredentialUpsertion struct { - User string - Salt []byte - Password []byte + // User - user name + User string + // User - salt to use. Will be generated randomly if nil. (optional) + Salt []byte + // Password - password to HMAC before storage. + Password []byte + // ScramCredentialInfo - the mechanism and iterations. ScramCredentialInfo ScramCredentialInfo } +// AlterUserScramCredentials alters SASL/SCRAM credentials. +// The pair (user, mechanism) must be unique among upsertions and deletions. +// +// Parameters: +// - `ctx` - context with the maximum amount of time to block, or nil for +// indefinite. +// - `upsertions` - a slice of user credential upsertions +// - `deletions` - a slice of user credential deletions +// - `options` - AlterUserScramCredentialsAdminOption options. +// +// Returns a map from user name to the corresponding Error, with error code +// ErrNoError when the request succeeded. func (a *AdminClient) AlterUserScramCredentials( ctx context.Context, upsertions []UserScramCredentialUpsertion, deletions []UserScramCredentialDeletion, options ...AlterUserScramCredentialsAdminOption) (result map[string]Error, err error) { diff --git a/kafka/adminoptions.go b/kafka/adminoptions.go index 16b5efdc1..0681f0072 100644 --- a/kafka/adminoptions.go +++ b/kafka/adminoptions.go @@ -418,11 +418,17 @@ type AlterConsumerGroupOffsetsAdminOption interface { apply(cOptions *C.rd_kafka_AdminOptions_t) error } +// DescribeUserScramCredentialsAdminOption - see setter. +// +// See SetAdminRequestTimeout. type DescribeUserScramCredentialsAdminOption interface { supportsDescribeUserScramCredentials() apply(cOptions *C.rd_kafka_AdminOptions_t) error } +// AlterUserScramCredentialsAdminOption - see setter. +// +// See SetAdminRequestTimeout. type AlterUserScramCredentialsAdminOption interface { supportsAlterUserScramCredentials() apply(cOptions *C.rd_kafka_AdminOptions_t) error From a51716ced50140cb3330ccb80f397a78ab047337 Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Thu, 29 Jun 2023 12:56:23 +0200 Subject: [PATCH 10/13] Update changelog --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 01cca6e45..d4e79c349 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,8 @@ This is a feature release. * Fixes a nil pointer bug in the protobuf `Serializer.Serialize()`, caused due to an unchecked error (#997, @baganokodo2022). * Fixes incorrect protofbuf FileDescriptor references (#989, @Mrmann87). + * [KIP-554](https://cwiki.apache.org/confluence/display/KAFKA/KIP-554%3A+Add+Broker-side+SCRAM+Config+API): + User SASL/SCRAM credentials alteration and description (#1004). ## v2.1.1 From 0532899b647cfe6f4aa13c595284d0caef063741 Mon Sep 17 00:00:00 2001 From: Milind L Date: Fri, 30 Jun 2023 14:27:44 +0530 Subject: [PATCH 11/13] Add stylistic changes and more modular code --- .../admin_alter_user_scram_credentials.go | 38 +-- .../admin_describe_user_scram_credentials.go | 27 +- kafka/adminapi.go | 274 ++++++++++-------- kafka/adminapi_test.go | 63 +++- kafka/integration_test.go | 141 ++++++--- 5 files changed, 337 insertions(+), 206 deletions(-) diff --git a/examples/admin_alter_user_scram_credentials/admin_alter_user_scram_credentials.go b/examples/admin_alter_user_scram_credentials/admin_alter_user_scram_credentials.go index 40a96aa81..12283c42f 100644 --- a/examples/admin_alter_user_scram_credentials/admin_alter_user_scram_credentials.go +++ b/examples/admin_alter_user_scram_credentials/admin_alter_user_scram_credentials.go @@ -42,7 +42,6 @@ func usage(reason string) { } func main() { - // 2 + variable arguments nArgs := len(os.Args) @@ -50,13 +49,9 @@ func main() { usage("bootstrap-servers required") } - mechanismString := make(map[kafka.ScramMechanism]string) - mechanismString[kafka.ScramMechanismSHA256] = "SCRAM-SHA-256" - mechanismString[kafka.ScramMechanismSHA512] = "SCRAM-SHA-512" - mechanismString[kafka.ScramMechanismUnknown] = "UNKNOWN" - stringMechanism := make(map[string]kafka.ScramMechanism) - stringMechanism["SCRAM-SHA-256"] = kafka.ScramMechanismSHA256 - stringMechanism["SCRAM-SHA-512"] = kafka.ScramMechanismSHA512 + if nArgs == 2 { + usage("at least one upsert/delete required") + } bootstrapServers := os.Args[1] @@ -88,7 +83,10 @@ func main() { } user := os.Args[i+1] - mechanism := stringMechanism[os.Args[i+2]] + mechanism, err := kafka.ScramMechanismFromString(os.Args[i+2]) + if err != nil { + usage(err.Error()) + } iterations, err := strconv.Atoi(os.Args[i+3]) if err != nil { usage(err.Error()) @@ -117,7 +115,10 @@ func main() { } user := os.Args[i+1] - mechanism := stringMechanism[os.Args[i+2]] + mechanism, err := kafka.ScramMechanismFromString(os.Args[i+2]) + if err != nil { + usage(err.Error()) + } deletions = append(deletions, kafka.UserScramCredentialDeletion{ User: user, @@ -134,14 +135,15 @@ func main() { if alterErr != nil { fmt.Printf("Failed to alter user scram credentials: %s\n", alterErr) os.Exit(1) - } else { - for username, err := range alterRes { - fmt.Printf("Username: %s\n", username) - if err.Code() == 0 { - fmt.Printf(" Success\n") - } else { - fmt.Printf(" Error[%d]: %s\n", err.Code(), err.String()) - } + } + + for username, err := range alterRes { + fmt.Printf("Username: %s\n", username) + if err.Code() == kafka.ErrNoError { + fmt.Printf(" Success\n") + } else { + fmt.Printf(" Error[%d]: %s\n", err.Code(), err.String()) } } + } diff --git a/examples/admin_describe_user_scram_credentials/admin_describe_user_scram_credentials.go b/examples/admin_describe_user_scram_credentials/admin_describe_user_scram_credentials.go index d514c531e..fb076285d 100644 --- a/examples/admin_describe_user_scram_credentials/admin_describe_user_scram_credentials.go +++ b/examples/admin_describe_user_scram_credentials/admin_describe_user_scram_credentials.go @@ -45,11 +45,6 @@ func main() { usage("bootstrap-servers required") } - mechanismString := make(map[kafka.ScramMechanism]string) - mechanismString[kafka.ScramMechanismSHA256] = "SCRAM-SHA-256" - mechanismString[kafka.ScramMechanismSHA512] = "SCRAM-SHA-512" - mechanismString[kafka.ScramMechanismUnknown] = "UNKNOWN" - bootstrapServers := os.Args[1] // Create new AdminClient. @@ -69,16 +64,20 @@ func main() { if describeErr != nil { fmt.Printf("Failed to describe user scram credentials: %s\n", describeErr) os.Exit(1) - } else { - for username, description := range describeRes { - fmt.Printf("Username: %s \n", username) - if description.Error.Code() == 0 { - for i := 0; i < len(description.ScramCredentialInfos); i++ { - fmt.Printf(" Mechanism: %s Iterations: %d\n", mechanismString[description.ScramCredentialInfos[i].Mechanism], description.ScramCredentialInfos[i].Iterations) - } - } else { - fmt.Printf(" Error[%d]: %s\n", description.Error.Code(), description.Error.String()) + } + + for username, description := range describeRes { + fmt.Printf("Username: %s \n", username) + if description.Error.Code() == kafka.ErrNoError { + for i := 0; i < len(description.ScramCredentialInfos); i++ { + fmt.Printf(" Mechanism: %s Iterations: %d\n", + description.ScramCredentialInfos[i].Mechanism, + description.ScramCredentialInfos[i].Iterations) } + } else { + fmt.Printf(" Error[%d]: %s\n", + description.Error.Code(), description.Error.String()) } } + } diff --git a/kafka/adminapi.go b/kafka/adminapi.go index 2a016338e..51ad195a6 100644 --- a/kafka/adminapi.go +++ b/kafka/adminapi.go @@ -748,6 +748,89 @@ type DescribeACLsResult struct { // DeleteACLsResult provides delete ACLs result or error information. type DeleteACLsResult = DescribeACLsResult +// ScramMechanism enumerates SASL/SCRAM mechanisms. +// Used by `AdminClient.AlterUserScramCredentials` +// and `AdminClient.DescribeUserScramCredentials`. +type ScramMechanism int + +const ( + // ScramMechanismUnknown - Unknown SASL/SCRAM mechanism + ScramMechanismUnknown = ScramMechanism(C.RD_KAFKA_SCRAM_MECHANISM_UNKNOWN) + // ScramMechanismSHA256 - SCRAM-SHA-256 mechanism + ScramMechanismSHA256 = ScramMechanism(C.RD_KAFKA_SCRAM_MECHANISM_SHA_256) + // ScramMechanismSHA512 - SCRAM-SHA-512 mechanism + ScramMechanismSHA512 = ScramMechanism(C.RD_KAFKA_SCRAM_MECHANISM_SHA_512) +) + +// String returns the human-readable representation of an ScramMechanism +func (o ScramMechanism) String() string { + switch o { + case ScramMechanismSHA256: + return "SCRAM-SHA-256" + case ScramMechanismSHA512: + return "SCRAM-SHA-512" + default: + return "UNKNOWN" + } +} + +// ScramMechanismFromString translates a Scram Mechanism name to +// a ScramMechanism value. +func ScramMechanismFromString(mechanism string) (ScramMechanism, error) { + switch strings.ToUpper(mechanism) { + case "SCRAM-SHA-256": + return ScramMechanismSHA256, nil + case "SCRAM-SHA-512": + return ScramMechanismSHA512, nil + default: + return ScramMechanismUnknown, + NewError(ErrInvalidArg, "Unknown SCRAM mechanism", false) + } +} + +// ScramCredentialInfo contains Mechanism and Iterations for a +// SASL/SCRAM credential associated with a user. +type ScramCredentialInfo struct { + // Iterations - positive number of iterations used when creating the credential + Iterations int + // Mechanism - SASL/SCRAM mechanism + Mechanism ScramMechanism +} + +// UserScramCredentialsDescription represent all SASL/SCRAM credentials +// associated with a user that can be retrieved, or an error indicating +// why credentials could not be retrieved. +type UserScramCredentialsDescription struct { + // User - the user name. + User string + // ScramCredentialInfos - SASL/SCRAM credential representations for the user. + ScramCredentialInfos []ScramCredentialInfo + // Error - error corresponding to this user description. + Error Error +} + +// UserScramCredentialDeletion is a request to delete +// a SASL/SCRAM credential for a user. +type UserScramCredentialDeletion struct { + // User - user name + User string + // Mechanism - SASL/SCRAM mechanism. + Mechanism ScramMechanism +} + +// UserScramCredentialUpsertion is a request to update/insert +// a SASL/SCRAM credential for a user. +type UserScramCredentialUpsertion struct { + // User - user name + User string + // User - salt to use. Will be generated randomly if nil. (optional) + Salt []byte + // Password - password to HMAC before storage. + Password []byte + // ScramCredentialInfo - the mechanism and iterations. + ScramCredentialInfo ScramCredentialInfo +} + // waitResult waits for a result event on cQueue or the ctx to be cancelled, whichever happens // first. // The returned result event is checked for errors its error is returned if set. @@ -910,6 +993,52 @@ func (a *AdminClient) cToConsumerGroupDescriptions( return result } +// cToDescribeUserScramCredentialsResult converts a C +// rd_kafka_DescribeUserScramCredentials_result_t to a Go map of users to +// UserScramCredentialsDescription. +func cToDescribeUserScramCredentialsResult( + cRes *C.rd_kafka_DescribeUserScramCredentials_result_t) map[string]UserScramCredentialsDescription { + result := make(map[string]UserScramCredentialsDescription) + var cDescriptionCount C.size_t + cDescriptions := + C.rd_kafka_DescribeUserScramCredentials_result_descriptions(cRes, + &cDescriptionCount) + + for i := 0; i < int(cDescriptionCount); i++ { + cDescription := + C.DescribeUserScramCredentials_result_description_by_idx( + cDescriptions, cDescriptionCount, C.size_t(i)) + user := C.GoString(C.rd_kafka_UserScramCredentialsDescription_user(cDescription)) + userDescription := UserScramCredentialsDescription{User: user} + + // Populate the error if required. + cError := C.rd_kafka_UserScramCredentialsDescription_error(cDescription) + if C.rd_kafka_error_code(cError) != C.RD_KAFKA_RESP_ERR_NO_ERROR { + userDescription.Error = newError(C.rd_kafka_error_code(cError)) + result[user] = userDescription + continue + } + + cCredentialCount := C.rd_kafka_UserScramCredentialsDescription_scramcredentialinfo_count(cDescription) + scramCredentialInfos := make([]ScramCredentialInfo, int(cCredentialCount)) + for j := 0; j < int(cCredentialCount); j++ { + cScramCredentialInfo := + C.rd_kafka_UserScramCredentialsDescription_scramcredentialinfo( + cDescription, C.size_t(j)) + cMechanism := C.rd_kafka_ScramCredentialInfo_mechanism(cScramCredentialInfo) + cIterations := C.rd_kafka_ScramCredentialInfo_iterations(cScramCredentialInfo) + scramCredentialInfos[i] = ScramCredentialInfo{ + Mechanism: ScramMechanism(cMechanism), + Iterations: int(cIterations), + } + } + userDescription.ScramCredentialInfos = scramCredentialInfos + result[user] = userDescription + } + + return result +} + // ConsumerGroupDescription converts a C rd_kafka_ConsumerGroupListing_t array // to a Go ConsumerGroupListing slice. func (a *AdminClient) cToConsumerGroupListings( @@ -2365,40 +2494,6 @@ func (a *AdminClient) AlterConsumerGroupOffsets( return acgor, nil } -// ScramMechanism enumerates SASL/SCRAM mechanisms. -// Used by `AdminClient.AlterUserScramCredentials` and `AdminClient.DescribeUserScramCredentials`. -type ScramMechanism int - -const ( - // ScramMechanismUnknown - Unknown SASL/SCRAM mechanism - ScramMechanismUnknown = ScramMechanism(C.RD_KAFKA_SCRAM_MECHANISM_UNKNOWN) - // ScramMechanismSHA256 - SCRAM-SHA-256 mechanism - ScramMechanismSHA256 = ScramMechanism(C.RD_KAFKA_SCRAM_MECHANISM_SHA_256) - // ScramMechanismSHA512 - SCRAM-SHA-512 mechanism - ScramMechanismSHA512 = ScramMechanism(C.RD_KAFKA_SCRAM_MECHANISM_SHA_512) -) - -// ScramCredentialInfo contains Mechanism and Iterations for a -// SASL/SCRAM credential associated with a user. -type ScramCredentialInfo struct { - // Iterations - positive number of iterations used when creating the credential - Iterations int - // Mechanism - SASL/SCRAM mechanism - Mechanism ScramMechanism -} - -// UserScramCredentialsDescription represent all SASL/SCRAM credentials -// associated with a user that can be retrieved, or an error indicating -// why credentials could not be retrieved. -type UserScramCredentialsDescription struct { - // User - the user name. - User string - // ScramCredentialInfos - SASL/SCRAM credential representations for the user. - ScramCredentialInfos []ScramCredentialInfo - // Error - error corresponding to this user description. - Error Error -} - // DescribeUserScramCredentials describe SASL/SCRAM credentials for the // specified user names. // @@ -2406,7 +2501,7 @@ type UserScramCredentialsDescription struct { // - `ctx` - context with the maximum amount of time to block, or nil for // indefinite. // - `users` - a slice of string, each one correspond to a user name, no -// duplicates are allowed +// duplicates are allowed // - `options` - DescribeUserScramCredentialsAdminOption options. // // Returns a map from user name to user SCRAM credentials description. @@ -2414,11 +2509,9 @@ type UserScramCredentialsDescription struct { func (a *AdminClient) DescribeUserScramCredentials( ctx context.Context, users []string, options ...DescribeUserScramCredentialsAdminOption) (result map[string]UserScramCredentialsDescription, err error) { - - result = make(map[string]UserScramCredentialsDescription) err = a.verifyClient() if err != nil { - return result, err + return nil, err } // Convert user names into char** required by the implementation. @@ -2441,9 +2534,10 @@ func (a *AdminClient) DescribeUserScramCredentials( genericOptions[i] = options[i] } cOptions, err := adminOptionsSetup( - a.handle, C.RD_KAFKA_ADMIN_OP_DESCRIBEUSERSCRAMCREDENTIALS, genericOptions) + a.handle, + C.RD_KAFKA_ADMIN_OP_DESCRIBEUSERSCRAMCREDENTIALS, genericOptions) if err != nil { - return result, err + return nil, err } defer C.rd_kafka_AdminOptions_destroy(cOptions) @@ -2463,71 +2557,17 @@ func (a *AdminClient) DescribeUserScramCredentials( rkev, err := a.waitResult( ctx, cQueue, C.RD_KAFKA_EVENT_DESCRIBEUSERSCRAMCREDENTIALS_RESULT) if err != nil { - return result, err + return nil, err } defer C.rd_kafka_event_destroy(rkev) cRes := C.rd_kafka_event_DescribeUserScramCredentials_result(rkev) // Convert result from C to Go. - var cDescriptionCount C.size_t - var cDescriptions **C.rd_kafka_UserScramCredentialsDescription_t - cDescriptions = C.rd_kafka_DescribeUserScramCredentials_result_descriptions(cRes, &cDescriptionCount) - for i := 0; i < int(cDescriptionCount); i++ { - var cDescription *C.rd_kafka_UserScramCredentialsDescription_t - var cError *C.rd_kafka_error_t - cDescription = C.DescribeUserScramCredentials_result_description_by_idx(cDescriptions, cDescriptionCount, C.size_t(i)) - goUser := C.GoString(C.rd_kafka_UserScramCredentialsDescription_user(cDescription)) - goUserDescription := UserScramCredentialsDescription{User: goUser} - // If Errored Populate the Error - cError = C.rd_kafka_UserScramCredentialsDescription_error(cDescription) - - if C.rd_kafka_error_code(cError) != 0 { - // populate the error - goUserDescription.Error = newError(C.rd_kafka_error_code(cError)) - } else { - var cCredentialCount C.size_t - cCredentialCount = C.rd_kafka_UserScramCredentialsDescription_scramcredentialinfo_count(cDescription) - var scramCredentialInfos []ScramCredentialInfo - for j := 0; j < int(cCredentialCount); j++ { - var scramCredentialInfo *C.rd_kafka_ScramCredentialInfo_t - scramCredentialInfo = C.rd_kafka_UserScramCredentialsDescription_scramcredentialinfo(cDescription, C.size_t(j)) - var cmechanism C.rd_kafka_ScramMechanism_t - var citerations int - cmechanism = C.rd_kafka_ScramCredentialInfo_mechanism(scramCredentialInfo) - citerations = int(C.rd_kafka_ScramCredentialInfo_iterations(scramCredentialInfo)) - cred := ScramCredentialInfo{Mechanism: ScramMechanism(cmechanism), Iterations: int(citerations)} - scramCredentialInfos = append(scramCredentialInfos, cred) - } - goUserDescription.ScramCredentialInfos = scramCredentialInfos - } - result[goUser] = goUserDescription - } + result = cToDescribeUserScramCredentialsResult(cRes) return result, nil } -// UserScramCredentialDeletion is a request to delete -// a SASL/SCRAM credential for a user. -type UserScramCredentialDeletion struct { - // User - user name - User string - // Mechanism - SASL/SCRAM mechanism. - Mechanism ScramMechanism -} - -// UserScramCredentialUpsertion is a request to update/insert -// a SASL/SCRAM credential for a user. -type UserScramCredentialUpsertion struct { - // User - user name - User string - // User - salt to use. Will be generated randomly if nil. (optional) - Salt []byte - // Password - password to HMAC before storage. - Password []byte - // ScramCredentialInfo - the mechanism and iterations. - ScramCredentialInfo ScramCredentialInfo -} - // AlterUserScramCredentials alters SASL/SCRAM credentials. // The pair (user, mechanism) must be unique among upsertions and deletions. // @@ -2554,31 +2594,35 @@ func (a *AdminClient) AlterUserScramCredentials( cAlterationCount := C.size_t(len(upsertions) + len(deletions)) idx := 0 - for itr := 0; itr < len(upsertions); itr++ { - user := C.CString(upsertions[itr].User) + for _, upsertion := range upsertions { + user := C.CString(upsertion.User) + defer C.free(unsafe.Pointer(user)) + var salt *C.uchar = nil var saltSize C.size_t = 0 - if upsertions[itr].Salt != nil { - salt = (*C.uchar)(&upsertions[itr].Salt[0]) - saltSize = C.size_t(len(upsertions[itr].Salt)) + if upsertion.Salt != nil { + salt = (*C.uchar)(&upsertion.Salt[0]) + saltSize = C.size_t(len(upsertion.Salt)) } - defer C.free(unsafe.Pointer(user)) + cAlterationList[idx] = C.rd_kafka_UserScramCredentialUpsertion_new(user, salt, saltSize, - (*C.uchar)(&upsertions[itr].Password[0]), C.size_t(len(upsertions[itr].Password)), - C.rd_kafka_ScramMechanism_t(upsertions[itr].ScramCredentialInfo.Mechanism), - C.int(upsertions[itr].ScramCredentialInfo.Iterations)) + (*C.uchar)(&upsertion.Password[0]), C.size_t(len(upsertion.Password)), + C.rd_kafka_ScramMechanism_t(upsertion.ScramCredentialInfo.Mechanism), + C.int(upsertion.ScramCredentialInfo.Iterations)) defer C.rd_kafka_UserScramCredentialAlteration_destroy(cAlterationList[idx]) idx = idx + 1 } - for itr := 0; itr < len(deletions); itr++ { - user := C.CString(deletions[itr].User) + for _, deletion := range deletions { + user := C.CString(deletion.User) defer C.free(unsafe.Pointer(user)) - cAlterationList[idx] = C.rd_kafka_UserScramCredentialDeletion_new(user, C.rd_kafka_ScramMechanism_t(deletions[itr].Mechanism)) + cAlterationList[idx] = C.rd_kafka_UserScramCredentialDeletion_new( + user, C.rd_kafka_ScramMechanism_t(deletion.Mechanism)) defer C.rd_kafka_UserScramCredentialAlteration_destroy(cAlterationList[idx]) idx = idx + 1 } + var cAlterationListPtr **C.rd_kafka_UserScramCredentialAlteration_t if cAlterationCount > 0 { cAlterationListPtr = ((**C.rd_kafka_UserScramCredentialAlteration_t)(&cAlterationList[0])) @@ -2619,16 +2663,16 @@ func (a *AdminClient) AlterUserScramCredentials( cRes := C.rd_kafka_event_AlterUserScramCredentials_result(rkev) // Convert result from C to Go. - var cResponses **C.rd_kafka_AlterUserScramCredentials_result_response_t var cResponseSize C.size_t - cResponses = C.rd_kafka_AlterUserScramCredentials_result_responses(cRes, &cResponseSize) + cResponses := C.rd_kafka_AlterUserScramCredentials_result_responses(cRes, &cResponseSize) for i := 0; i < int(cResponseSize); i++ { - var cResponse *C.rd_kafka_AlterUserScramCredentials_result_response_t - cResponse = C.AlterUserScramCredentials_result_response_by_idx(cResponses, cResponseSize, C.size_t(i)) - goUser := C.GoString(C.rd_kafka_AlterUserScramCredentials_result_response_user(cResponse)) - goerr := newError(C.rd_kafka_error_code(C.rd_kafka_AlterUserScramCredentials_result_response_error(cResponse))) - result[goUser] = goerr + cResponse := C.AlterUserScramCredentials_result_response_by_idx( + cResponses, cResponseSize, C.size_t(i)) + user := C.GoString(C.rd_kafka_AlterUserScramCredentials_result_response_user(cResponse)) + err := newErrorFromCError(C.rd_kafka_AlterUserScramCredentials_result_response_error(cResponse)) + result[user] = err } + return result, nil } diff --git a/kafka/adminapi_test.go b/kafka/adminapi_test.go index f49b9f79b..c48627d37 100644 --- a/kafka/adminapi_test.go +++ b/kafka/adminapi_test.go @@ -597,33 +597,70 @@ func testAdminAPIsAlterConsumerGroupOffsets( } func testAdminAPIsUserScramCredentials(what string, a *AdminClient, expDuration time.Duration, t *testing.T) { var users []string - users = append(users, "non-existent") - users = append(users, "non-existent") + + // With nil users ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() - _, describeErr := a.DescribeUserScramCredentials(ctx, users) - if describeErr == nil { - t.Fatalf("Duplicate user should give an error\n") + if describeErr == nil || ctx.Err() != context.DeadlineExceeded { + t.Fatalf("Expected context deadline exceeded, got %s and %s\n", + describeErr, ctx.Err()) + } + + // With one user + ctx, cancel = context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + users = append(users, "non-existent") + _, describeErr = a.DescribeUserScramCredentials(ctx, users) + if describeErr == nil || ctx.Err() != context.DeadlineExceeded { + t.Fatalf("Expected context deadline exceeded, got %s and %s\n", + describeErr, ctx.Err()) + } + + // With duplicate users + users = append(users, "non-existent") + ctx, cancel = context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + users = append(users, "non-existent") + _, describeErr = a.DescribeUserScramCredentials(ctx, users) + if describeErr == nil || describeErr.(Error).Code() != ErrInvalidArg { + t.Fatalf("Duplicate user should give an InvalidArgument error, got %s\n", describeErr) } var upsertions []UserScramCredentialUpsertion - upsertions = append(upsertions, UserScramCredentialUpsertion{User: "non-existent", Salt: []byte("salt"), Password: []byte("password"), ScramCredentialInfo: ScramCredentialInfo{Mechanism: ScramMechanismSHA256, Iterations: 10000}}) + upsertions = append(upsertions, + UserScramCredentialUpsertion{ + User: "non-existent", + Salt: []byte("salt"), + Password: []byte("password"), + ScramCredentialInfo: ScramCredentialInfo{ + Mechanism: ScramMechanismSHA256, Iterations: 10000}}) var deletions []UserScramCredentialDeletion - deletions = append(deletions, UserScramCredentialDeletion{User: "non-existent", Mechanism: ScramMechanismSHA256}) + deletions = append(deletions, UserScramCredentialDeletion{ + User: "non-existent", Mechanism: ScramMechanismSHA256}) + + ctx, cancel = context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() _, alterErr := a.AlterUserScramCredentials(ctx, upsertions, deletions) - if alterErr == nil { - t.Fatalf("Expected context deadline exceeded, got not error\n") + if alterErr == nil || ctx.Err() != context.DeadlineExceeded { + t.Fatalf("Expected context deadline exceeded, got %s and %s\n", + alterErr, ctx.Err()) } + ctx, cancel = context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() _, alterErr = a.AlterUserScramCredentials(ctx, upsertions, nil) - if alterErr == nil { - t.Fatalf("Expected context deadline exceeded, got not error\n") + if alterErr == nil || ctx.Err() != context.DeadlineExceeded { + t.Fatalf("Expected context deadline exceeded, got %s and %s\n", + alterErr, ctx.Err()) } + ctx, cancel = context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() _, alterErr = a.AlterUserScramCredentials(ctx, nil, deletions) - if alterErr == nil { - t.Fatalf("Expected context deadline exceeded, got not error\n") + if alterErr == nil || ctx.Err() != context.DeadlineExceeded { + t.Fatalf("Expected context deadline exceeded, got %s and %s\n", + alterErr, ctx.Err()) } } diff --git a/kafka/integration_test.go b/kafka/integration_test.go index fe4fd8942..e14794dbb 100644 --- a/kafka/integration_test.go +++ b/kafka/integration_test.go @@ -2519,7 +2519,11 @@ func (its *IntegrationTestSuite) TestProducerConsumerHeaders() { } -func (its *IntegrationTestSuite) TestUserScramCredentialsAPI() { +// TestUserScramTestAdminClient_UserScramCredentialsCredentialsAPI describes +// the SCRAM credentials for a user, upserts some credentials, describes them +// again to check insertion, deletes them, and finally describes them once again +// to check deletion. +func (its *IntegrationTestSuite) TestAdminClient_UserScramCredentials() { t := its.T() ac, err := NewAdminClient(&ConfigMap{ "bootstrap.servers": testconf.Brokers, @@ -2529,76 +2533,121 @@ func (its *IntegrationTestSuite) TestUserScramCredentialsAPI() { } defer ac.Close() - var users []string - users = append(users, "non-existent") + users := []string{"non-existent"} + + // Call DescribeUserScramCredentials ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) defer cancel() - describeRes, describeErr := ac.DescribeUserScramCredentials(ctx, users) if describeErr != nil { t.Fatalf("Failed to Describe the User Scram Credentials: %s\n", describeErr) - } else { - for _, description := range describeRes { - if description.Error.Code() != ErrResourceNotFound { - t.Fatalf("Error should be ErrResourceNotFound instead it is %s", description.Error.Code()) - } - } } - var upsertions []UserScramCredentialUpsertion - upsertions = append(upsertions, UserScramCredentialUpsertion{User: "non-existent", Salt: []byte("salt"), Password: []byte("password"), ScramCredentialInfo: ScramCredentialInfo{Mechanism: ScramMechanismSHA256, Iterations: 10000}}) + + // Check Describe result + if len(describeRes) != 1 { + t.Fatalf("Expected 1 user in Describe Result, got %d\n", len(describeRes)) + } + description, ok := describeRes[users[0]] + if !ok { + t.Fatalf("Did not find expected user %s in results\n", users[0]) + } + + if description.Error.Code() != ErrResourceNotFound { + t.Fatalf("Error should be ErrResourceNotFound instead it is %s", description.Error.Code()) + } + + // Call AlterUserScramCredentials for Upsert + upsertions := []UserScramCredentialUpsertion{ + { + User: "non-existent", + Salt: []byte("salt"), + Password: []byte("password"), + ScramCredentialInfo: ScramCredentialInfo{ + Mechanism: ScramMechanismSHA256, Iterations: 10000}, + }} + + ctx, cancel = context.WithTimeout(context.Background(), time.Second*30) + defer cancel() alterRes, alterErr := ac.AlterUserScramCredentials(ctx, upsertions, nil) + + // Check Upsert result if alterErr != nil { t.Fatalf("Failed to Alter the User Scram Credentials: %s\n", alterErr) - } else { - for _, err := range alterRes { - if err.Code() != ErrNoError { - t.Fatalf("Error code should be ErrNoError instead it is %d", err.Code()) - } - } } + if len(alterRes) != 1 { + t.Fatalf("Expected 1 user in Alter Result, got %d\n", len(alterRes)) + } + kErr, ok := alterRes[upsertions[0].User] + if !ok { + t.Fatalf("Did not find expected user %s in results\n", users[0]) + } + if kErr.Code() != ErrNoError { + t.Fatalf("Error code should be ErrNoError instead it is %d", kErr.Code()) + } + + // Call DescribeUserScramCredentials to verify upsert + ctx, cancel = context.WithTimeout(context.Background(), time.Second*30) + defer cancel() describeRes, describeErr = ac.DescribeUserScramCredentials(ctx, users) + + // Check Describe result if describeErr != nil { t.Fatalf("Failed to Describe the User Scram Credentials: %s\n", describeErr) - } else { - for _, description := range describeRes { - if description.Error.Code() != ErrNoError { - t.Fatalf("Error code should be ErrNoError instead it is %d", description.Error.Code()) - } - if description.ScramCredentialInfos[0].Iterations != 10000 { - t.Fatalf("Iterations field doesn't match the upserted value. Expected 10000, got %d", - description.ScramCredentialInfos[0].Iterations) - } - if description.ScramCredentialInfos[0].Mechanism != ScramMechanismSHA256 { - t.Fatalf("Mechanism field doesn't match the upserted value. Expected %d, got %d", - ScramMechanismSHA256, description.ScramCredentialInfos[0].Mechanism) - } - } + } + description, ok = describeRes[users[0]] + if !ok { + t.Fatalf("Did not find expected user %s in results\n", users[0]) + } + if description.Error.Code() != ErrNoError { + t.Fatalf("Error code should be ErrNoError instead it is %s", description.Error.Code()) + } + if description.ScramCredentialInfos[0].Iterations != 10000 { + t.Fatalf("Iterations field doesn't match the upserted value. Expected 10000, got %d", + description.ScramCredentialInfos[0].Iterations) + } + if description.ScramCredentialInfos[0].Mechanism != ScramMechanismSHA256 { + t.Fatalf("Mechanism field doesn't match the upserted value. Expected %s, got %s", + ScramMechanismSHA256, description.ScramCredentialInfos[0].Mechanism) } - var deletions []UserScramCredentialDeletion - deletions = append(deletions, UserScramCredentialDeletion{User: "non-existent", Mechanism: ScramMechanismSHA256}) + // Call AlterUserScramCredentials for Delete + deletions := []UserScramCredentialDeletion{ + {User: "non-existent", Mechanism: ScramMechanismSHA256}} + ctx, cancel = context.WithTimeout(context.Background(), time.Second*30) + defer cancel() alterRes, alterErr = ac.AlterUserScramCredentials(ctx, nil, deletions) + + // Check Delete result if alterErr != nil { t.Fatalf("Failed to alter user scram credentials: %s\n", alterErr) - } else { - for _, err := range alterRes { - if err.Code() != ErrNoError { - t.Fatalf("Error code should be ErrNoError instead it is %d", err.Code()) - } - } + } + kErr, ok = alterRes[upsertions[0].User] + if !ok { + t.Fatalf("Did not find expected user %s in results\n", users[0]) + } + if kErr.Code() != ErrNoError { + t.Fatalf("Error code should be ErrNoError instead it is %d", kErr.Code()) } + // Call DescribeUserScramCredentials to verify delete + ctx, cancel = context.WithTimeout(context.Background(), time.Second*30) + defer cancel() describeRes, describeErr = ac.DescribeUserScramCredentials(ctx, users) + + // Check Describe result if describeErr != nil { t.Fatalf("Failed to Describe the User Scram Credentials: %s\n", describeErr) - } else { - for _, description := range describeRes { - if description.Error.Code() != ErrResourceNotFound { - t.Fatalf("Error should be ErrResourceNotFound instead it is %s", description.Error) - } - } + } + description, ok = describeRes[users[0]] + if !ok { + t.Fatalf("Did not find expected user %s in results\n", users[0]) + } + + if description.Error.Code() != ErrResourceNotFound { + t.Fatalf("Error should be ErrResourceNotFound instead it is %s", description.Error.Code()) } } + func TestIntegration(t *testing.T) { its := new(IntegrationTestSuite) testconfInit() From 60a069c6e2bc3d93d7f62f4783e346c405e89538 Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Tue, 4 Jul 2023 15:14:08 +0200 Subject: [PATCH 12/13] Add result structs --- .../admin_alter_user_scram_credentials.go | 2 +- .../admin_describe_user_scram_credentials.go | 2 +- kafka/adminapi.go | 39 ++++++++++++++----- kafka/integration_test.go | 18 ++++----- 4 files changed, 41 insertions(+), 20 deletions(-) diff --git a/examples/admin_alter_user_scram_credentials/admin_alter_user_scram_credentials.go b/examples/admin_alter_user_scram_credentials/admin_alter_user_scram_credentials.go index 12283c42f..a5cc0db35 100644 --- a/examples/admin_alter_user_scram_credentials/admin_alter_user_scram_credentials.go +++ b/examples/admin_alter_user_scram_credentials/admin_alter_user_scram_credentials.go @@ -137,7 +137,7 @@ func main() { os.Exit(1) } - for username, err := range alterRes { + for username, err := range alterRes.Errors { fmt.Printf("Username: %s\n", username) if err.Code() == kafka.ErrNoError { fmt.Printf(" Success\n") diff --git a/examples/admin_describe_user_scram_credentials/admin_describe_user_scram_credentials.go b/examples/admin_describe_user_scram_credentials/admin_describe_user_scram_credentials.go index fb076285d..27896bb07 100644 --- a/examples/admin_describe_user_scram_credentials/admin_describe_user_scram_credentials.go +++ b/examples/admin_describe_user_scram_credentials/admin_describe_user_scram_credentials.go @@ -66,7 +66,7 @@ func main() { os.Exit(1) } - for username, description := range describeRes { + for username, description := range describeRes.Descriptions { fmt.Printf("Username: %s \n", username) if description.Error.Code() == kafka.ErrNoError { for i := 0; i < len(description.ScramCredentialInfos); i++ { diff --git a/kafka/adminapi.go b/kafka/adminapi.go index 51ad195a6..2a170da1f 100644 --- a/kafka/adminapi.go +++ b/kafka/adminapi.go @@ -831,6 +831,22 @@ type UserScramCredentialUpsertion struct { ScramCredentialInfo ScramCredentialInfo } +// DescribeUserScramCredentialsResult represents the result of a +// DescribeUserScramCredentials call. +type DescribeUserScramCredentialsResult struct { + // ConsumerGroupDescriptions - Map from user name + // to UserScramCredentialsDescription + Descriptions map[string]UserScramCredentialsDescription +} + +// AlterUserScramCredentialsResult represents the result of a +// AlterUserScramCredentials call. +type AlterUserScramCredentialsResult struct { + // Errors - Map from user name + // to an Error, with ErrNoError code on success. + Errors map[string]Error +} + // waitResult waits for a result event on cQueue or the ctx to be cancelled, whichever happens // first. // The returned result event is checked for errors its error is returned if set. @@ -1027,7 +1043,7 @@ func cToDescribeUserScramCredentialsResult( cDescription, C.size_t(j)) cMechanism := C.rd_kafka_ScramCredentialInfo_mechanism(cScramCredentialInfo) cIterations := C.rd_kafka_ScramCredentialInfo_iterations(cScramCredentialInfo) - scramCredentialInfos[i] = ScramCredentialInfo{ + scramCredentialInfos[j] = ScramCredentialInfo{ Mechanism: ScramMechanism(cMechanism), Iterations: int(cIterations), } @@ -2508,10 +2524,13 @@ func (a *AdminClient) AlterConsumerGroupOffsets( // Each description can have an individual error. func (a *AdminClient) DescribeUserScramCredentials( ctx context.Context, users []string, - options ...DescribeUserScramCredentialsAdminOption) (result map[string]UserScramCredentialsDescription, err error) { + options ...DescribeUserScramCredentialsAdminOption) (result DescribeUserScramCredentialsResult, err error) { + result = DescribeUserScramCredentialsResult{ + Descriptions: make(map[string]UserScramCredentialsDescription), + } err = a.verifyClient() if err != nil { - return nil, err + return result, err } // Convert user names into char** required by the implementation. @@ -2537,7 +2556,7 @@ func (a *AdminClient) DescribeUserScramCredentials( a.handle, C.RD_KAFKA_ADMIN_OP_DESCRIBEUSERSCRAMCREDENTIALS, genericOptions) if err != nil { - return nil, err + return result, err } defer C.rd_kafka_AdminOptions_destroy(cOptions) @@ -2557,14 +2576,14 @@ func (a *AdminClient) DescribeUserScramCredentials( rkev, err := a.waitResult( ctx, cQueue, C.RD_KAFKA_EVENT_DESCRIBEUSERSCRAMCREDENTIALS_RESULT) if err != nil { - return nil, err + return result, err } defer C.rd_kafka_event_destroy(rkev) cRes := C.rd_kafka_event_DescribeUserScramCredentials_result(rkev) // Convert result from C to Go. - result = cToDescribeUserScramCredentialsResult(cRes) + result.Descriptions = cToDescribeUserScramCredentialsResult(cRes) return result, nil } @@ -2582,8 +2601,10 @@ func (a *AdminClient) DescribeUserScramCredentials( // ErrNoError when the request succeeded. func (a *AdminClient) AlterUserScramCredentials( ctx context.Context, upsertions []UserScramCredentialUpsertion, deletions []UserScramCredentialDeletion, - options ...AlterUserScramCredentialsAdminOption) (result map[string]Error, err error) { - result = make(map[string]Error) + options ...AlterUserScramCredentialsAdminOption) (result AlterUserScramCredentialsResult, err error) { + result = AlterUserScramCredentialsResult{ + Errors: make(map[string]Error), + } err = a.verifyClient() if err != nil { return result, err @@ -2670,7 +2691,7 @@ func (a *AdminClient) AlterUserScramCredentials( cResponses, cResponseSize, C.size_t(i)) user := C.GoString(C.rd_kafka_AlterUserScramCredentials_result_response_user(cResponse)) err := newErrorFromCError(C.rd_kafka_AlterUserScramCredentials_result_response_error(cResponse)) - result[user] = err + result.Errors[user] = err } return result, nil diff --git a/kafka/integration_test.go b/kafka/integration_test.go index e14794dbb..de1293b44 100644 --- a/kafka/integration_test.go +++ b/kafka/integration_test.go @@ -2544,10 +2544,10 @@ func (its *IntegrationTestSuite) TestAdminClient_UserScramCredentials() { } // Check Describe result - if len(describeRes) != 1 { - t.Fatalf("Expected 1 user in Describe Result, got %d\n", len(describeRes)) + if len(describeRes.Descriptions) != 1 { + t.Fatalf("Expected 1 user in Describe Result, got %d\n", len(describeRes.Descriptions)) } - description, ok := describeRes[users[0]] + description, ok := describeRes.Descriptions[users[0]] if !ok { t.Fatalf("Did not find expected user %s in results\n", users[0]) } @@ -2574,10 +2574,10 @@ func (its *IntegrationTestSuite) TestAdminClient_UserScramCredentials() { if alterErr != nil { t.Fatalf("Failed to Alter the User Scram Credentials: %s\n", alterErr) } - if len(alterRes) != 1 { - t.Fatalf("Expected 1 user in Alter Result, got %d\n", len(alterRes)) + if len(alterRes.Errors) != 1 { + t.Fatalf("Expected 1 user in Alter Result, got %d\n", len(alterRes.Errors)) } - kErr, ok := alterRes[upsertions[0].User] + kErr, ok := alterRes.Errors[upsertions[0].User] if !ok { t.Fatalf("Did not find expected user %s in results\n", users[0]) } @@ -2594,7 +2594,7 @@ func (its *IntegrationTestSuite) TestAdminClient_UserScramCredentials() { if describeErr != nil { t.Fatalf("Failed to Describe the User Scram Credentials: %s\n", describeErr) } - description, ok = describeRes[users[0]] + description, ok = describeRes.Descriptions[users[0]] if !ok { t.Fatalf("Did not find expected user %s in results\n", users[0]) } @@ -2621,7 +2621,7 @@ func (its *IntegrationTestSuite) TestAdminClient_UserScramCredentials() { if alterErr != nil { t.Fatalf("Failed to alter user scram credentials: %s\n", alterErr) } - kErr, ok = alterRes[upsertions[0].User] + kErr, ok = alterRes.Errors[upsertions[0].User] if !ok { t.Fatalf("Did not find expected user %s in results\n", users[0]) } @@ -2638,7 +2638,7 @@ func (its *IntegrationTestSuite) TestAdminClient_UserScramCredentials() { if describeErr != nil { t.Fatalf("Failed to Describe the User Scram Credentials: %s\n", describeErr) } - description, ok = describeRes[users[0]] + description, ok = describeRes.Descriptions[users[0]] if !ok { t.Fatalf("Did not find expected user %s in results\n", users[0]) } From a4b2702b655f2ab057ad0124cd9ca98cac9856f9 Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Fri, 7 Jul 2023 15:49:54 +0200 Subject: [PATCH 13/13] Change UserScramCredentialUpsertion parameters order --- .../admin_alter_user_scram_credentials.go | 20 ++++++++++--------- kafka/adminapi.go | 14 ++++++------- kafka/adminapi_test.go | 9 +++++---- kafka/integration_test.go | 6 +++--- 4 files changed, 26 insertions(+), 23 deletions(-) diff --git a/examples/admin_alter_user_scram_credentials/admin_alter_user_scram_credentials.go b/examples/admin_alter_user_scram_credentials/admin_alter_user_scram_credentials.go index a5cc0db35..e693d5ef2 100644 --- a/examples/admin_alter_user_scram_credentials/admin_alter_user_scram_credentials.go +++ b/examples/admin_alter_user_scram_credentials/admin_alter_user_scram_credentials.go @@ -33,10 +33,10 @@ func usage(reason string) { reason) fmt.Fprintf(os.Stderr, "Usage: %s "+ - "(UPSERT "+ - " |DELETE ) "+ - "[(UPSERT "+ - " |DELETE ) ...]\n", + "UPSERT "+ + " "+ + "[UPSERT "+ + " DELETE ...]\n", os.Args[0]) os.Exit(1) } @@ -91,20 +91,22 @@ func main() { if err != nil { usage(err.Error()) } - salt := []byte(os.Args[i+4]) - password := []byte(os.Args[i+5]) + password := []byte(os.Args[i+4]) + salt := []byte(os.Args[i+5]) + // if salt is an empty string, + // set it to nil to generate it randomly. if len(salt) == 0 { salt = nil } upsertions = append(upsertions, kafka.UserScramCredentialUpsertion{ - User: user, - Salt: salt, - Password: password, + User: user, ScramCredentialInfo: kafka.ScramCredentialInfo{ Mechanism: mechanism, Iterations: iterations, }, + Password: password, + Salt: salt, }) i += 6 case "DELETE": diff --git a/kafka/adminapi.go b/kafka/adminapi.go index 2a170da1f..9c6caf71c 100644 --- a/kafka/adminapi.go +++ b/kafka/adminapi.go @@ -823,12 +823,12 @@ type UserScramCredentialDeletion struct { type UserScramCredentialUpsertion struct { // User - user name User string - // User - salt to use. Will be generated randomly if nil. (optional) - Salt []byte - // Password - password to HMAC before storage. - Password []byte // ScramCredentialInfo - the mechanism and iterations. ScramCredentialInfo ScramCredentialInfo + // Password - password to HMAC before storage. + Password []byte + // Salt - salt to use. Will be generated randomly if nil. (optional) + Salt []byte } // DescribeUserScramCredentialsResult represents the result of a @@ -2627,10 +2627,10 @@ func (a *AdminClient) AlterUserScramCredentials( } cAlterationList[idx] = C.rd_kafka_UserScramCredentialUpsertion_new(user, - salt, saltSize, - (*C.uchar)(&upsertion.Password[0]), C.size_t(len(upsertion.Password)), C.rd_kafka_ScramMechanism_t(upsertion.ScramCredentialInfo.Mechanism), - C.int(upsertion.ScramCredentialInfo.Iterations)) + C.int(upsertion.ScramCredentialInfo.Iterations), + (*C.uchar)(&upsertion.Password[0]), C.size_t(len(upsertion.Password)), + salt, saltSize) defer C.rd_kafka_UserScramCredentialAlteration_destroy(cAlterationList[idx]) idx = idx + 1 } diff --git a/kafka/adminapi_test.go b/kafka/adminapi_test.go index c48627d37..1ab48a6e8 100644 --- a/kafka/adminapi_test.go +++ b/kafka/adminapi_test.go @@ -630,11 +630,12 @@ func testAdminAPIsUserScramCredentials(what string, a *AdminClient, expDuration var upsertions []UserScramCredentialUpsertion upsertions = append(upsertions, UserScramCredentialUpsertion{ - User: "non-existent", - Salt: []byte("salt"), - Password: []byte("password"), + User: "non-existent", ScramCredentialInfo: ScramCredentialInfo{ - Mechanism: ScramMechanismSHA256, Iterations: 10000}}) + Mechanism: ScramMechanismSHA256, Iterations: 10000}, + Password: []byte("password"), + Salt: []byte("salt"), + }) var deletions []UserScramCredentialDeletion deletions = append(deletions, UserScramCredentialDeletion{ User: "non-existent", Mechanism: ScramMechanismSHA256}) diff --git a/kafka/integration_test.go b/kafka/integration_test.go index de1293b44..394632f79 100644 --- a/kafka/integration_test.go +++ b/kafka/integration_test.go @@ -2559,11 +2559,11 @@ func (its *IntegrationTestSuite) TestAdminClient_UserScramCredentials() { // Call AlterUserScramCredentials for Upsert upsertions := []UserScramCredentialUpsertion{ { - User: "non-existent", - Salt: []byte("salt"), - Password: []byte("password"), + User: "non-existent", ScramCredentialInfo: ScramCredentialInfo{ Mechanism: ScramMechanismSHA256, Iterations: 10000}, + Password: []byte("password"), + Salt: []byte("salt"), }} ctx, cancel = context.WithTimeout(context.Background(), time.Second*30)