Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

astraea-720 Add embedded connector worker for test #737

Merged
merged 5 commits into from
Sep 22, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions it/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ dependencies {
implementation "mysql:mysql-connector-java:${versions["mysql"]}"
implementation "org.apache.zookeeper:zookeeper:${versions["zookeeper"]}"
implementation "org.apache.kafka:kafka_2.12:${versions["kafka"]}"
implementation "org.apache.kafka:connect-runtime:${versions["kafka"]}"
implementation "org.apache.kafka:connect-json:${versions["kafka"]}"
}

java {
Expand Down
4 changes: 2 additions & 2 deletions it/src/main/java/org/astraea/it/RequireBrokerCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import org.junit.jupiter.api.AfterAll;

/**
* This class offers a way to have single node embedded kafka cluster. It is useful to test code
* which is depended on true cluster.
* This class offers a way to have 3 node embedded kafka cluster. It is useful to test code which is
* depended on true cluster.
*/
public abstract class RequireBrokerCluster extends RequireJmxServer {
private static final int NUMBER_OF_BROKERS = 3;
Expand Down
48 changes: 48 additions & 0 deletions it/src/main/java/org/astraea/it/RequireSingleWorkerCluster.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.astraea.it;

import org.junit.jupiter.api.AfterAll;

/**
* This class offers a way to have single node embedded kafka worker. It is useful to test code
* which is depended on true cluster.
*/
public abstract class RequireSingleWorkerCluster extends RequireJmxServer {
private static final int NUMBER_OF_BROKERS = 3;
private static final ZookeeperCluster ZOOKEEPER_CLUSTER = Services.zookeeperCluster();
private static final BrokerCluster BROKER_CLUSTER =
Services.brokerCluster(ZOOKEEPER_CLUSTER, NUMBER_OF_BROKERS);

private static final WorkerCluster WORKER_CLUSTER =
Services.workerCluster(BROKER_CLUSTER, new int[] {0});

protected static String bootstrapServers() {
return BROKER_CLUSTER.bootstrapServers();
}

protected static String workerUrl() {
return WORKER_CLUSTER.workerUrls().get(0);
}

@AfterAll
static void shutdownClusters() throws Exception {
WORKER_CLUSTER.close();
BROKER_CLUSTER.close();
ZOOKEEPER_CLUSTER.close();
}
}
49 changes: 49 additions & 0 deletions it/src/main/java/org/astraea/it/RequireWorkerCluster.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.astraea.it;

import java.util.List;
import org.junit.jupiter.api.AfterAll;

/**
* This class offers a way to have 3 node embedded kafka worker. It is useful to test code which is
* depended on true cluster.
*/
public abstract class RequireWorkerCluster extends RequireJmxServer {
private static final int NUMBER_OF_BROKERS = 3;
private static final ZookeeperCluster ZOOKEEPER_CLUSTER = Services.zookeeperCluster();
private static final BrokerCluster BROKER_CLUSTER =
Services.brokerCluster(ZOOKEEPER_CLUSTER, NUMBER_OF_BROKERS);

private static final WorkerCluster WORKER_CLUSTER =
Services.workerCluster(BROKER_CLUSTER, new int[] {0, 0, 0});

protected static String bootstrapServers() {
return BROKER_CLUSTER.bootstrapServers();
}

protected static List<String> workerUrls() {
return WORKER_CLUSTER.workerUrls();
}

@AfterAll
static void shutdownClusters() throws Exception {
WORKER_CLUSTER.close();
BROKER_CLUSTER.close();
ZOOKEEPER_CLUSTER.close();
}
}
75 changes: 75 additions & 0 deletions it/src/main/java/org/astraea/it/Services.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@

import java.io.File;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
Expand All @@ -28,15 +32,86 @@
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.KafkaServer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.connect.cli.ConnectDistributed;
import org.apache.kafka.connect.runtime.Connect;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.zookeeper.server.NIOServerCnxnFactory;
import org.apache.zookeeper.server.ZooKeeperServer;

public final class Services {

private Services() {}

static WorkerCluster workerCluster(BrokerCluster bk, int[] ports) {
List<Connect> connects =
Arrays.stream(ports)
.mapToObj(
port -> {
var realPort = Utils.resolvePort(port);
Map<String, String> config = new HashMap<>();
// reduce the number from partitions and replicas to speedup the mini cluster
// for setting storage. the partition from setting topic is always 1 so we
// needn't to set it to 1 here.
config.put(DistributedConfig.CONFIG_TOPIC_CONFIG, "connect-config");
config.put(DistributedConfig.CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG, "1");
// for offset storage
config.put(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, "connect-offsets");
config.put(DistributedConfig.OFFSET_STORAGE_PARTITIONS_CONFIG, "1");
config.put(DistributedConfig.OFFSET_STORAGE_REPLICATION_FACTOR_CONFIG, "1");
// for status storage
config.put(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG, "connect-status");
config.put(DistributedConfig.STATUS_STORAGE_PARTITIONS_CONFIG, "1");
config.put(DistributedConfig.STATUS_STORAGE_REPLICATION_FACTOR_CONFIG, "1");
// set the brokers info
config.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, bk.bootstrapServers());
config.put(ConsumerConfig.GROUP_ID_CONFIG, "connect");
// set the normal converter
config.put(
ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG,
"org.apache.kafka.connect.converters.ByteArrayConverter");
config.put(
ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG,
"org.apache.kafka.connect.converters.ByteArrayConverter");
config.put(
WorkerConfig.LISTENERS_CONFIG,
// the worker hostname is a part of information used by restful apis.
// the 0.0.0.0 make all connector say that they are executed by 0.0.0.0
// and it does make sense in production. With a view to testing the
// related codes in other modules, we have to define the "really" hostname
// in starting worker cluster.
"http://" + Utils.hostname() + ":" + realPort);
config.put(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(500));
// enable us to override the connector configs
config.put(WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG, "All");
return new ConnectDistributed().startConnect(config);
})
.collect(Collectors.toUnmodifiableList());

return new WorkerCluster() {
@Override
public List<String> workerUrls() {
return connects.stream()
.map(Connect::restUrl)
.map(URI::toString)
.collect(Collectors.toList());
}

@Override
public void close() {
connects.forEach(
connect -> {
connect.stop();
connect.awaitStop();
});
}
};
}

static BrokerCluster brokerCluster(ZookeeperCluster zk, int numberOfBrokers) {
var tempFolders =
IntStream.range(0, numberOfBrokers)
Expand Down
25 changes: 25 additions & 0 deletions it/src/main/java/org/astraea/it/WorkerCluster.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.astraea.it;

import java.util.List;

public interface WorkerCluster extends AutoCloseable {

/** @return worker information. the form is "http://host_a:port_a" */
List<String> workerUrls();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.astraea.it;

import static org.junit.jupiter.api.Assertions.assertNotNull;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.net.URL;
import org.junit.jupiter.api.Test;

class RequireSingleWorkerClusterTest extends RequireSingleWorkerCluster {

@Test
void testProperties() {
assertNotNull(workerUrl());
assertNotNull(bootstrapServers());
}

@Test
void testConnection() throws Exception {
var jsonTree = new ObjectMapper().readTree(new URL(workerUrl()));
assertNotNull(jsonTree.get("version"));
assertNotNull(jsonTree.get("kafka_cluster_id"));
}
}
42 changes: 42 additions & 0 deletions it/src/test/java/org/astraea/it/RequireWorkerClusterTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.astraea.it;

import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.net.URL;
import org.junit.jupiter.api.Test;

class RequireWorkerClusterTest extends RequireWorkerCluster {

@Test
void testProperties() {
assertTrue(workerUrls().size() > 0);
assertNotNull(bootstrapServers());
}

@Test
void testConnection() throws Exception {
for (String x : workerUrls()) {
var jsonTree = new ObjectMapper().readTree(new URL(x));
assertNotNull(jsonTree.get("version"));
assertNotNull(jsonTree.get("kafka_cluster_id"));
}
}
}