@@ -21,12 +21,14 @@ import java.io.{ByteArrayOutputStream, File, PrintStream}
21
21
import java .nio .charset .StandardCharsets
22
22
import java .nio .file .Files
23
23
import java .util
24
- import java .util .Properties
24
+ import java .util .{ Optional , Properties }
25
25
import kafka .server .KafkaConfig
26
26
import kafka .utils .TestUtils
27
27
import net .sourceforge .argparse4j .inf .ArgumentParserException
28
+ import org .apache .kafka .common .metadata .UserScramCredentialRecord
28
29
import org .apache .kafka .common .utils .Utils
29
30
import org .apache .kafka .server .common .Features
31
+ import org .apache .kafka .metadata .bootstrap .BootstrapDirectory
30
32
import org .apache .kafka .metadata .properties .{MetaPropertiesEnsemble , PropertiesUtils }
31
33
import org .apache .kafka .metadata .storage .FormatterException
32
34
import org .apache .kafka .raft .QuorumConfig
@@ -37,6 +39,7 @@ import org.junit.jupiter.params.ParameterizedTest
37
39
import org .junit .jupiter .params .provider .ValueSource
38
40
39
41
import scala .collection .mutable .ListBuffer
42
+ import scala .jdk .CollectionConverters .IterableHasAsScala
40
43
41
44
@ Timeout (value = 40 )
42
45
class StorageToolTest {
@@ -433,5 +436,49 @@ Found problem:
433
436
contains(" Formatting dynamic metadata voter directory %s" .format(availableDirs.head)),
434
437
" Failed to find content in output: " + stream.toString())
435
438
}
436
- }
437
439
440
+ @ Test
441
+ def testBootstrapScramRecords (): Unit = {
442
+ val availableDirs = Seq (TestUtils .tempDir())
443
+ val properties = new Properties ()
444
+ properties.putAll(defaultDynamicQuorumProperties)
445
+ properties.setProperty(" log.dirs" , availableDirs.mkString(" ," ))
446
+ val stream = new ByteArrayOutputStream ()
447
+ val arguments = ListBuffer [String ](
448
+ " --release-version" , " 3.9-IV0" ,
449
+ " --add-scram" , " SCRAM-SHA-512=[name=alice,password=changeit]" ,
450
+ " --add-scram" , " SCRAM-SHA-512=[name=bob,password=changeit]"
451
+ )
452
+
453
+ assertEquals(0 , runFormatCommand(stream, properties, arguments.toSeq))
454
+
455
+ // Not doing full SCRAM record validation since that's covered elsewhere.
456
+ // Just checking that we generate the correct number of records
457
+ val bootstrapMetadata = new BootstrapDirectory (availableDirs.head.toString, Optional .empty).read
458
+ val scramRecords = bootstrapMetadata.records().asScala
459
+ .filter(apiMessageAndVersion => apiMessageAndVersion.message().isInstanceOf [UserScramCredentialRecord ])
460
+ .map(apiMessageAndVersion => apiMessageAndVersion.message().asInstanceOf [UserScramCredentialRecord ])
461
+ .toList
462
+ assertEquals(2 , scramRecords.size)
463
+ assertEquals(" alice" , scramRecords.head.name())
464
+ assertEquals(" bob" , scramRecords.last.name())
465
+ }
466
+
467
+ @ Test
468
+ def testScramRecordsOldReleaseVersion (): Unit = {
469
+ val availableDirs = Seq (TestUtils .tempDir())
470
+ val properties = new Properties ()
471
+ properties.putAll(defaultDynamicQuorumProperties)
472
+ properties.setProperty(" log.dirs" , availableDirs.mkString(" ," ))
473
+ val stream = new ByteArrayOutputStream ()
474
+ val arguments = ListBuffer [String ](
475
+ " --release-version" , " 3.4" ,
476
+ " --add-scram" , " SCRAM-SHA-512=[name=alice,password=changeit]" ,
477
+ " --add-scram" , " SCRAM-SHA-512=[name=bob,password=changeit]"
478
+ )
479
+
480
+ assertEquals(
481
+ " SCRAM is only supported in metadata.version 3.5-IV2 or later." ,
482
+ assertThrows(classOf [FormatterException ], () => runFormatCommand(stream, properties, arguments.toSeq)).getMessage)
483
+ }
484
+ }
0 commit comments