Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove Scala procedure syntax #4964

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/Kafka.scala
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,12 @@ object Kafka extends Logging {
private def registerLoggingSignalHandler(): Unit = {
val jvmSignalHandlers = new ConcurrentHashMap[String, SignalHandler]().asScala
val handler = new SignalHandler() {
override def handle(signal: Signal) {
override def handle(signal: Signal): Unit = {
info(s"Terminating process due to signal $signal")
jvmSignalHandlers.get(signal.getName).foreach(_.handle(signal))
}
}
def registerHandler(signalName: String) {
def registerHandler(signalName: String): Unit = {
val oldHandler = Signal.handle(new Signal(signalName), handler)
if (oldHandler != null)
jvmSignalHandlers.put(signalName, oldHandler)
Expand Down
12 changes: 6 additions & 6 deletions core/src/main/scala/kafka/admin/AclCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ object AclCommand extends Logging {
DelegationToken -> Set(Describe, All)
)

def main(args: Array[String]) {
def main(args: Array[String]): Unit = {

val opts = new AclCommandOptions(args)

Expand All @@ -62,7 +62,7 @@ object AclCommand extends Logging {
}
}

def withAuthorizer(opts: AclCommandOptions)(f: Authorizer => Unit) {
def withAuthorizer(opts: AclCommandOptions)(f: Authorizer => Unit): Unit = {
val defaultProps = Map(KafkaConfig.ZkEnableSecureAclsProp -> JaasUtils.isZkSecurityEnabled)
val authorizerProperties =
if (opts.options.has(opts.authorizerPropertiesOpt)) {
Expand All @@ -81,7 +81,7 @@ object AclCommand extends Logging {
finally CoreUtils.swallow(authZ.close(), this)
}

private def addAcl(opts: AclCommandOptions) {
private def addAcl(opts: AclCommandOptions): Unit = {
withAuthorizer(opts) { authorizer =>
val resourceToAcl = getResourceToAcls(opts)

Expand All @@ -97,7 +97,7 @@ object AclCommand extends Logging {
}
}

private def removeAcl(opts: AclCommandOptions) {
private def removeAcl(opts: AclCommandOptions): Unit = {
withAuthorizer(opts) { authorizer =>
val resourceToAcl = getResourceToAcls(opts)

Expand All @@ -115,7 +115,7 @@ object AclCommand extends Logging {
}
}

private def listAcl(opts: AclCommandOptions) {
private def listAcl(opts: AclCommandOptions): Unit = {
withAuthorizer(opts) { authorizer =>
val resources = getResource(opts, dieIfNoResourceFound = false)

Expand Down Expand Up @@ -366,7 +366,7 @@ object AclCommand extends Logging {

val options = parser.parse(args: _*)

def checkArgs() {
def checkArgs(): Unit = {
CommandLineUtils.checkRequiredArgs(parser, options, authorizerPropertiesOpt)

val actions = Seq(addOpt, removeOpt, listOpt).count(options.has)
Expand Down
10 changes: 5 additions & 5 deletions core/src/main/scala/kafka/admin/AdminClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class AdminClient(val time: Time,
val pendingFutures = new ConcurrentLinkedQueue[RequestFuture[ClientResponse]]()

val networkThread = new KafkaThread("admin-client-network-thread", new Runnable {
override def run() {
override def run(): Unit = {
try {
while (running)
client.poll(Long.MaxValue)
Expand Down Expand Up @@ -149,7 +149,7 @@ class AdminClient(val time: Time,
/**
* Wait until there is a non-empty list of brokers in the cluster.
*/
def awaitBrokers() {
def awaitBrokers(): Unit = {
var nodes = List[Node]()
do {
nodes = findAllBrokers()
Expand Down Expand Up @@ -260,14 +260,14 @@ class AdminClient(val time: Time,
val future = client.send(node, new DeleteRecordsRequest.Builder(requestTimeoutMs, convertedMap))
pendingFutures.add(future)
future.compose(new RequestFutureAdapter[ClientResponse, Map[TopicPartition, DeleteRecordsResult]]() {
override def onSuccess(response: ClientResponse, future: RequestFuture[Map[TopicPartition, DeleteRecordsResult]]) {
override def onSuccess(response: ClientResponse, future: RequestFuture[Map[TopicPartition, DeleteRecordsResult]]): Unit = {
val deleteRecordsResponse = response.responseBody().asInstanceOf[DeleteRecordsResponse]
val result = deleteRecordsResponse.responses().asScala.mapValues(v => DeleteRecordsResult(v.lowWatermark, v.error.exception())).toMap
future.complete(result)
pendingFutures.remove(future)
}

override def onFailure(e: RuntimeException, future: RequestFuture[Map[TopicPartition, DeleteRecordsResult]]) {
override def onFailure(e: RuntimeException, future: RequestFuture[Map[TopicPartition, DeleteRecordsResult]]): Unit = {
val result = partitionAndOffsets.mapValues(_ => DeleteRecordsResult(DeleteRecordsResponse.INVALID_LOW_WATERMARK, e))
future.complete(result)
pendingFutures.remove(future)
Expand Down Expand Up @@ -384,7 +384,7 @@ class AdminClient(val time: Time,
errors
}

def close() {
def close(): Unit = {
running = false
try {
client.close()
Expand Down
26 changes: 13 additions & 13 deletions core/src/main/scala/kafka/admin/AdminUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ import org.apache.kafka.common.internals.Topic

@deprecated("This class is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
trait AdminUtilities {
def changeTopicConfig(zkUtils: ZkUtils, topic: String, configs: Properties)
def changeClientIdConfig(zkUtils: ZkUtils, clientId: String, configs: Properties)
def changeUserOrUserClientIdConfig(zkUtils: ZkUtils, sanitizedEntityName: String, configs: Properties)
def changeBrokerConfig(zkUtils: ZkUtils, brokerIds: Seq[Int], configs: Properties)
def changeTopicConfig(zkUtils: ZkUtils, topic: String, configs: Properties): Unit
def changeClientIdConfig(zkUtils: ZkUtils, clientId: String, configs: Properties): Unit
def changeUserOrUserClientIdConfig(zkUtils: ZkUtils, sanitizedEntityName: String, configs: Properties): Unit
def changeBrokerConfig(zkUtils: ZkUtils, brokerIds: Seq[Int], configs: Properties): Unit

def changeConfigs(zkUtils: ZkUtils, entityType: String, entityName: String, configs: Properties): Unit = {

Expand Down Expand Up @@ -362,7 +362,7 @@ object AdminUtils extends Logging with AdminUtilities {
}

@deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
def deleteTopic(zkUtils: ZkUtils, topic: String) {
def deleteTopic(zkUtils: ZkUtils, topic: String): Unit = {
if (topicExists(zkUtils, topic)) {
try {
zkUtils.createPersistentPath(getDeleteTopicPath(topic))
Expand Down Expand Up @@ -463,7 +463,7 @@ object AdminUtils extends Logging with AdminUtilities {
partitions: Int,
replicationFactor: Int,
topicConfig: Properties = new Properties,
rackAwareMode: RackAwareMode = RackAwareMode.Enforced) {
rackAwareMode: RackAwareMode = RackAwareMode.Enforced): Unit = {
val brokerMetadatas = getBrokerMetadatas(zkUtils, rackAwareMode)
val replicaAssignment = AdminUtils.assignReplicasToBrokers(brokerMetadatas, partitions, replicationFactor)
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, replicaAssignment, topicConfig)
Expand Down Expand Up @@ -513,7 +513,7 @@ object AdminUtils extends Logging with AdminUtilities {
topic: String,
partitionReplicaAssignment: Map[Int, Seq[Int]],
config: Properties = new Properties,
update: Boolean = false) {
update: Boolean = false): Unit = {
validateCreateOrUpdateTopic(zkUtils, topic, partitionReplicaAssignment, config, update)

// Configs only matter if a topic is being created. Changing configs via AlterTopic is not supported
Expand All @@ -526,7 +526,7 @@ object AdminUtils extends Logging with AdminUtilities {
writeTopicPartitionAssignment(zkUtils, topic, partitionReplicaAssignment, update)
}

private def writeTopicPartitionAssignment(zkUtils: ZkUtils, topic: String, replicaAssignment: Map[Int, Seq[Int]], update: Boolean) {
private def writeTopicPartitionAssignment(zkUtils: ZkUtils, topic: String, replicaAssignment: Map[Int, Seq[Int]], update: Boolean): Unit = {
try {
val zkPath = getTopicPath(topic)
val jsonPartitionData = zkUtils.replicaAssignmentZkData(replicaAssignment.map(e => e._1.toString -> e._2))
Expand Down Expand Up @@ -557,7 +557,7 @@ object AdminUtils extends Logging with AdminUtilities {
*
*/
@deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
def changeClientIdConfig(zkUtils: ZkUtils, sanitizedClientId: String, configs: Properties) {
def changeClientIdConfig(zkUtils: ZkUtils, sanitizedClientId: String, configs: Properties): Unit = {
DynamicConfig.Client.validate(configs)
changeEntityConfig(zkUtils, ConfigType.Client, sanitizedClientId, configs)
}
Expand All @@ -574,7 +574,7 @@ object AdminUtils extends Logging with AdminUtilities {
*
*/
@deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
def changeUserOrUserClientIdConfig(zkUtils: ZkUtils, sanitizedEntityName: String, configs: Properties) {
def changeUserOrUserClientIdConfig(zkUtils: ZkUtils, sanitizedEntityName: String, configs: Properties): Unit = {
if (sanitizedEntityName == ConfigEntityName.Default || sanitizedEntityName.contains("/clients"))
DynamicConfig.Client.validate(configs)
else
Expand All @@ -600,7 +600,7 @@ object AdminUtils extends Logging with AdminUtilities {
*
*/
@deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
def changeTopicConfig(zkUtils: ZkUtils, topic: String, configs: Properties) {
def changeTopicConfig(zkUtils: ZkUtils, topic: String, configs: Properties): Unit = {
validateTopicConfig(zkUtils, topic, configs)
changeEntityConfig(zkUtils, ConfigType.Topic, topic, configs)
}
Expand All @@ -621,7 +621,7 @@ object AdminUtils extends Logging with AdminUtilities {
}
}

private def changeEntityConfig(zkUtils: ZkUtils, rootEntityType: String, fullSanitizedEntityName: String, configs: Properties) {
private def changeEntityConfig(zkUtils: ZkUtils, rootEntityType: String, fullSanitizedEntityName: String, configs: Properties): Unit = {
val sanitizedEntityPath = rootEntityType + '/' + fullSanitizedEntityName
val entityConfigPath = getEntityConfigPath(rootEntityType, fullSanitizedEntityName)
// write the new config--may not exist if there were previously no overrides
Expand All @@ -640,7 +640,7 @@ object AdminUtils extends Logging with AdminUtilities {
/**
* Write out the entity config to zk, if there is any
*/
private def writeEntityConfig(zkUtils: ZkUtils, entityPath: String, config: Properties) {
private def writeEntityConfig(zkUtils: ZkUtils, entityPath: String, config: Properties): Unit = {
val map = Map("version" -> 1, "config" -> config.asScala)
zkUtils.updatePersistentPath(entityPath, Json.legacyEncodeAsString(map))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ object BrokerApiVersionsCommand {
val options = parser.parse(args : _*)
checkArgs()

def checkArgs() {
def checkArgs(): Unit = {
// check required args
CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt)
}
Expand Down
12 changes: 6 additions & 6 deletions core/src/main/scala/kafka/admin/ConfigCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ object ConfigCommand extends Config {
}
}

private[admin] def alterConfig(zkClient: KafkaZkClient, opts: ConfigCommandOptions, adminZkClient: AdminZkClient) {
private[admin] def alterConfig(zkClient: KafkaZkClient, opts: ConfigCommandOptions, adminZkClient: AdminZkClient): Unit = {
val configsToBeAdded = parseConfigsToBeAdded(opts)
val configsToBeDeleted = parseConfigsToBeDeleted(opts)
val entity = parseEntity(opts)
Expand Down Expand Up @@ -135,7 +135,7 @@ object ConfigCommand extends Config {
println(s"Completed Updating config for entity: $entity.")
}

private def preProcessScramCredentials(configsToBeAdded: Properties) {
private def preProcessScramCredentials(configsToBeAdded: Properties): Unit = {
def scramCredential(mechanism: ScramMechanism, credentialStr: String): String = {
val pattern = "(?:iterations=([0-9]*),)?password=(.*)".r
val (iterations, password) = credentialStr match {
Expand All @@ -156,7 +156,7 @@ object ConfigCommand extends Config {
}
}

private def describeConfig(zkClient: KafkaZkClient, opts: ConfigCommandOptions, adminZkClient: AdminZkClient) {
private def describeConfig(zkClient: KafkaZkClient, opts: ConfigCommandOptions, adminZkClient: AdminZkClient): Unit = {
val configEntity = parseEntity(opts)
val describeAllUsers = configEntity.root.entityType == ConfigType.User && !configEntity.root.sanitizedName.isDefined && !configEntity.child.isDefined
val entities = configEntity.getAllEntities(zkClient)
Expand Down Expand Up @@ -230,7 +230,7 @@ object ConfigCommand extends Config {
}
}

private[admin] def alterBrokerConfig(adminClient: JAdminClient, opts: ConfigCommandOptions, entityName: String) {
private[admin] def alterBrokerConfig(adminClient: JAdminClient, opts: ConfigCommandOptions, entityName: String): Unit = {
val configsToBeAdded = parseConfigsToBeAdded(opts).asScala.map { case (k, v) => (k, new ConfigEntry(k, v)) }
val configsToBeDeleted = parseConfigsToBeDeleted(opts)

Expand Down Expand Up @@ -259,7 +259,7 @@ object ConfigCommand extends Config {
println(s"Completed updating default config for brokers in the cluster,")
}

private def describeBrokerConfig(adminClient: JAdminClient, opts: ConfigCommandOptions, entityName: String) {
private def describeBrokerConfig(adminClient: JAdminClient, opts: ConfigCommandOptions, entityName: String): Unit = {
val configs = brokerConfig(adminClient, entityName, includeSynonyms = true)
if (entityName.nonEmpty)
println(s"Configs for broker $entityName are:")
Expand Down Expand Up @@ -440,7 +440,7 @@ object ConfigCommand extends Config {

val allOpts: Set[OptionSpec[_]] = Set(alterOpt, describeOpt, entityType, entityName, addConfig, deleteConfig, helpOpt)

def checkArgs() {
def checkArgs(): Unit = {
// should have exactly one action
val actions = Seq(alterOpt, describeOpt).count(options.has _)
if(actions != 1)
Expand Down
8 changes: 4 additions & 4 deletions core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ import scala.collection.{Seq, Set, mutable}

object ConsumerGroupCommand extends Logging {

def main(args: Array[String]) {
def main(args: Array[String]): Unit = {
val opts = new ConsumerGroupCommandOptions(args)

if (args.length == 0)
Expand Down Expand Up @@ -350,7 +350,7 @@ object ConsumerGroupCommand extends Logging {
ZkUtils(zkUrl, 30000, 30000, JaasUtils.isZkSecurityEnabled)
}

def close() {
def close(): Unit = {
zkUtils.close()
}

Expand Down Expand Up @@ -635,7 +635,7 @@ object ConsumerGroupCommand extends Logging {
successfulLogTimestampOffsets ++ getLogEndOffsets(unsuccessfulOffsetsForTimes.keySet.toSeq)
}

def close() {
def close(): Unit = {
adminClient.close()
if (consumer != null) consumer.close()
}
Expand Down Expand Up @@ -1003,7 +1003,7 @@ object ConsumerGroupCommand extends Logging {
val allResetOffsetScenarioOpts: Set[OptionSpec[_]] = Set(resetToOffsetOpt, resetShiftByOpt,
resetToDatetimeOpt, resetByDurationOpt, resetToEarliestOpt, resetToLatestOpt, resetToCurrentOpt, resetFromFileOpt)

def checkArgs() {
def checkArgs(): Unit = {
// check required args
if (options.has(timeoutMsOpt) && (!describeOptPresent || useOldConsumer))
debug(s"Option $timeoutMsOpt is applicable only when both $bootstrapServerOpt and $describeOpt are used.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ object DelegationTokenCommand extends Logging {

val options = parser.parse(args : _*)

def checkArgs() {
def checkArgs(): Unit = {
// check required args
CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt, commandConfigOpt)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
}

def writePreferredReplicaElectionData(zkClient: KafkaZkClient,
partitionsUndergoingPreferredReplicaElection: Set[TopicPartition]) {
partitionsUndergoingPreferredReplicaElection: Set[TopicPartition]): Unit = {
try {
zkClient.createPreferredReplicaElection(partitionsUndergoingPreferredReplicaElection.toSet)
println("Created preferred replica election path with %s".format(partitionsUndergoingPreferredReplicaElection.mkString(",")))
Expand Down
10 changes: 5 additions & 5 deletions core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ object ReassignPartitionsCommand extends Logging {
}
}

def verifyAssignment(zkClient: KafkaZkClient, adminClientOpt: Option[JAdminClient], opts: ReassignPartitionsCommandOptions) {
def verifyAssignment(zkClient: KafkaZkClient, adminClientOpt: Option[JAdminClient], opts: ReassignPartitionsCommandOptions): Unit = {
val jsonFile = opts.options.valueOf(opts.reassignmentJsonFileOpt)
val jsonString = Utils.readFileAsString(jsonFile)
verifyAssignment(zkClient, adminClientOpt, jsonString)
Expand Down Expand Up @@ -154,7 +154,7 @@ object ReassignPartitionsCommand extends Logging {
}
}

def generateAssignment(zkClient: KafkaZkClient, opts: ReassignPartitionsCommandOptions) {
def generateAssignment(zkClient: KafkaZkClient, opts: ReassignPartitionsCommandOptions): Unit = {
val topicsToMoveJsonFile = opts.options.valueOf(opts.topicsToMoveJsonFileOpt)
val brokerListToReassign = opts.options.valueOf(opts.brokerListOpt).split(',').map(_.toInt)
val duplicateReassignments = CoreUtils.duplicates(brokerListToReassign)
Expand Down Expand Up @@ -190,7 +190,7 @@ object ReassignPartitionsCommand extends Logging {
(partitionsToBeReassigned, currentAssignment)
}

def executeAssignment(zkClient: KafkaZkClient, adminClientOpt: Option[JAdminClient], opts: ReassignPartitionsCommandOptions) {
def executeAssignment(zkClient: KafkaZkClient, adminClientOpt: Option[JAdminClient], opts: ReassignPartitionsCommandOptions): Unit = {
val reassignmentJsonFile = opts.options.valueOf(opts.reassignmentJsonFileOpt)
val reassignmentJsonString = Utils.readFileAsString(reassignmentJsonFile)
val interBrokerThrottle = opts.options.valueOf(opts.interBrokerThrottleOpt)
Expand All @@ -199,7 +199,7 @@ object ReassignPartitionsCommand extends Logging {
executeAssignment(zkClient, adminClientOpt, reassignmentJsonString, Throttle(interBrokerThrottle, replicaAlterLogDirsThrottle), timeoutMs)
}

def executeAssignment(zkClient: KafkaZkClient, adminClientOpt: Option[JAdminClient], reassignmentJsonString: String, throttle: Throttle, timeoutMs: Long = 10000L) {
def executeAssignment(zkClient: KafkaZkClient, adminClientOpt: Option[JAdminClient], reassignmentJsonString: String, throttle: Throttle, timeoutMs: Long = 10000L): Unit = {
val (partitionAssignment, replicaAssignment) = parseAndValidate(zkClient, reassignmentJsonString)
val adminZkClient = new AdminZkClient(zkClient)
val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, adminClientOpt, partitionAssignment.toMap, replicaAssignment, adminZkClient)
Expand Down Expand Up @@ -494,7 +494,7 @@ class ReassignPartitionsCommand(zkClient: KafkaZkClient,
* Limit the throttle on currently moving replicas. Note that this command can use used to alter the throttle, but
* it may not alter all limits originally set, if some of the brokers have completed their rebalance.
*/
def maybeLimit(throttle: Throttle) {
def maybeLimit(throttle: Throttle): Unit = {
if (throttle.interBrokerLimit >= 0 || throttle.replicaAlterLogDirsLimit >= 0) {
val existingBrokers = existingAssignment().values.flatten.toSeq
val proposedBrokers = proposedPartitionAssignment.values.flatten.toSeq ++ proposedReplicaAssignment.keys.toSeq.map(_.brokerId())
Expand Down
Loading