Skip to content

Commit

Permalink
Use PLAIN as default SASL if not specified (knative-extensions#2139) (k…
Browse files Browse the repository at this point in the history
…native-extensions#244)

* Use PLAIN as default SASL if not specified
Fixes knative-extensions#2117

* control plane test to default SASL PLAIN

* Dedault to SASL PLAIN when doing connection not before

* Added data-plane unit tests for defaulted SASL PLAIN

* Reconcilation tests for Kafka source SASL

Co-authored-by: Aleksander Slominski <aslom@us.ibm.com>
  • Loading branch information
openshift-cherrypick-robot and aslom authored May 9, 2022
1 parent 0304e2f commit a73de50
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 12 deletions.
8 changes: 4 additions & 4 deletions control-plane/pkg/security/secret.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,12 @@ func secretData(data map[string][]byte) kafka.ConfigOption {
func saslConfig(protocol string, data map[string][]byte) kafka.ConfigOption {
return func(config *sarama.Config) error {

// Supported mechanism SASL/PLAIN or SASL/SCRAM.
// Supported mechanism SASL/PLAIN (default if not specified) or SASL/SCRAM.
saslMechanism := SaslPlain
givenSASLMechanism, ok := data[SaslMechanismKey]
if !ok {
return fmt.Errorf("[protocol %s] SASL mechanism required (key: %s)", protocol, SaslMechanismKey)
if ok {
saslMechanism = string(givenSASLMechanism)
}
saslMechanism := string(givenSASLMechanism)

user, ok := data[SaslUserKey]
if !ok || len(user) == 0 {
Expand Down
18 changes: 18 additions & 0 deletions control-plane/pkg/security/secret_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,24 @@ func TestSASLPlain(t *testing.T) {
assert.Equal(t, "my-user-password", config.Net.SASL.Password)
}

func TestSASLPlainDefaulted(t *testing.T) {
secret := map[string][]byte{
"protocol": []byte("SASL_PLAINTEXT"),
"user": []byte("my-user-name"),
"password": []byte("my-user-password"),
}
config := sarama.NewConfig()

err := kafka.Options(config, secretData(secret))

assert.Nil(t, err)
assert.True(t, config.Net.SASL.Enable)
assert.True(t, config.Net.SASL.Handshake)
assert.Equal(t, sarama.SASLMechanism(sarama.SASLTypePlaintext), config.Net.SASL.Mechanism)
assert.Equal(t, "my-user-name", config.Net.SASL.User)
assert.Equal(t, "my-user-password", config.Net.SASL.Password)
}

func TestSASLPlainLSCRAM256(t *testing.T) {
secret := map[string][]byte{
"protocol": []byte("SASL_PLAINTEXT"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ static String validate(final Credentials credentials) {

final var SASLMechanism = credentials.SASLMechanism();
if (is(SecurityProtocol.SASL_PLAINTEXT, securityProtocol)) {
if (isInvalidSASLMechanism(SASLMechanism)) {
if (SASLMechanism != null && isInvalidSASLMechanism(SASLMechanism)) {
return "Security protocol " + securityProtocol.name + ": invalid SASL mechanism, expected SCRAM-SHA-256, SCRAM-SHA-512 or PLAIN got " + SASLMechanism;
}
if (anyBlank(credentials.SASLUsername(), credentials.SASLPassword())) {
Expand All @@ -55,7 +55,7 @@ static String validate(final Credentials credentials) {
}

if (is(SecurityProtocol.SASL_SSL, securityProtocol)) {
if (isInvalidSASLMechanism(SASLMechanism)) {
if (SASLMechanism != null && isInvalidSASLMechanism(SASLMechanism)) {
return "Security protocol " + securityProtocol.name + ": invalid SASL mechanism, expected SCRAM-SHA-256, SCRAM-SHA-512 or PLAIN got " + SASLMechanism;
}
if (anyBlank(credentials.SASLUsername(), credentials.SASLPassword())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,12 @@ private static void clientsProperties(final BiConsumer<String, String> propertie
}

private static void sasl(final BiConsumer<String, String> propertiesSetter, final Credentials credentials) {
final var mechanism = credentials.SASLMechanism();
if (mechanism == null) {
throw new IllegalStateException("SASL mechanism required");
final String mechanism;
final var saslMechanism = credentials.SASLMechanism();
if (saslMechanism == null) {
mechanism = "PLAIN";
} else {
mechanism = saslMechanism;
}
propertiesSetter.accept(SaslConfigs.SASL_MECHANISM, mechanism);
if ("PLAIN".equals(mechanism)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,19 @@ public void securityProtocolSaslPlaintextPlainValid() {
securityProtocolSaslPlaintextValid("PLAIN");
}

@Test
public void securityProtocolSaslPlaintextDefauledPlainValid() {
securityProtocolSaslPlaintextValid(null);
}

private static void securityProtocolSaslPlaintextValid(final String mechanism) {

final var credential = mock(Credentials.class);

when(credential.securityProtocol()).thenReturn(SecurityProtocol.SASL_PLAINTEXT);
when(credential.SASLMechanism()).thenReturn(mechanism);
if(mechanism != null) {
when(credential.SASLMechanism()).thenReturn(mechanism);
}
when(credential.SASLUsername()).thenReturn("aaa");
when(credential.SASLPassword()).thenReturn("bbb");

Expand Down Expand Up @@ -184,6 +191,18 @@ public void securityProtocolSaslPlainSslValid() {
assertThat(CredentialsValidator.validate(credential)).isNull();
}

@Test
public void securityProtocolSaslDefaultedPlainSslValid() {
final var credential = mock(Credentials.class);

when(credential.securityProtocol()).thenReturn(SecurityProtocol.SASL_SSL);
when(credential.caCertificates()).thenReturn("xyz");
when(credential.SASLUsername()).thenReturn("aaa");
when(credential.SASLPassword()).thenReturn("bbb");

assertThat(CredentialsValidator.validate(credential)).isNull();
}

@Test
public void securityProtocolSaslSslScramSha513InValid() {
final var credential = mock(Credentials.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,20 @@ public void shouldConfigureSaslPlainSsl() {
shouldConfigureSaslSsl(PlainLoginModule.class, "PLAIN");
}

@Test
public void shouldConfigureSaslDefaultedPlainSsl() {
shouldConfigureSaslSsl(PlainLoginModule.class, null);
}

private static void shouldConfigureSaslSsl(final Class<? extends LoginModule> module, final String mechanism) {
final var props = new Properties();

final var credentials = mock(Credentials.class);
when(credentials.securityProtocol()).thenReturn(SecurityProtocol.SASL_SSL);
when(credentials.caCertificates()).thenReturn("xyz");
when(credentials.SASLMechanism()).thenReturn(mechanism);
if(mechanism != null) {
when(credentials.SASLMechanism()).thenReturn(mechanism);
}
when(credentials.SASLUsername()).thenReturn("aaa");
when(credentials.SASLPassword()).thenReturn("bbb");

Expand All @@ -65,7 +72,12 @@ private static void shouldConfigureSaslSsl(final Class<? extends LoginModule> mo
expected.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_SSL.name());
expected.setProperty(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, DefaultSslEngineFactory.PEM_TYPE);
expected.setProperty(SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG, "xyz");
expected.setProperty(SaslConfigs.SASL_MECHANISM, mechanism);
if(mechanism != null) {
expected.setProperty(SaslConfigs.SASL_MECHANISM, mechanism);
} else {
expected.setProperty(SaslConfigs.SASL_MECHANISM, "PLAIN"); // it is defaulted

}
expected.setProperty(
SaslConfigs.SASL_JAAS_CONFIG,
module.getName() + " required username=\"" + credentials.SASLUsername() + "\" password=\"" +
Expand Down

0 comments on commit a73de50

Please sign in to comment.