Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 26 additions & 2 deletions core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
*/
package kafka.zk

import java.util.{Collections, Properties}
import java.util.{Base64, Collections, Properties}
import java.nio.charset.StandardCharsets.UTF_8
import java.security.MessageDigest
import java.util.concurrent.{CountDownLatch, TimeUnit}

import kafka.api.{ApiVersion, LeaderAndIsr}
import kafka.cluster.{Broker, EndPoint}
import kafka.log.LogConfig
Expand Down Expand Up @@ -51,7 +53,7 @@ import org.apache.kafka.common.resource.ResourceType.{GROUP, TOPIC}
import org.apache.kafka.common.security.JaasUtils
import org.apache.zookeeper.ZooDefs
import org.apache.zookeeper.client.ZKClientConfig
import org.apache.zookeeper.data.Stat
import org.apache.zookeeper.data.{ACL, Id, Stat}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource

Expand Down Expand Up @@ -140,6 +142,28 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
}
}

@Test
def testChrootExistsAndRootIsLocked(): Unit = {
// chroot is accessible
val root = "/testChrootExistsAndRootIsLocked"
val chroot = s"$root/chroot"
zkClient.makeSurePersistentPathExists(chroot)
zkClient.setAcl(chroot, ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala)

// root is inaccessible
val scheme = "digest"
val id = "test"
val pwd = "12345"
val digest = Base64.getEncoder.encode(MessageDigest.getInstance("SHA1").digest(s"$id:$pwd".getBytes()))
zkClient.currentZooKeeper.addAuthInfo(scheme, digest)
zkClient.setAcl(root, Seq(new ACL(ZooDefs.Perms.ALL, new Id(scheme, s"$id:$digest"))))

// this client won't have access to the root, but the chroot already exists
val chrootClient = KafkaZkClient(zkConnect + chroot, zkAclsEnabled.getOrElse(JaasUtils.isZkSaslEnabled), zkSessionTimeout,
zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM, createChrootIfNecessary = true)
chrootClient.close()
}

@Test
def testSetAndGetConsumerOffset(): Unit = {
val offset = 123L
Expand Down
63 changes: 0 additions & 63 deletions core/src/test/scala/unit/kafka/zk/ZkClientAclTest.scala

This file was deleted.