Skip to content

Commit

Permalink
#198 - support flags and default properties files
Browse files Browse the repository at this point in the history
  • Loading branch information
bluemonk3y committed Oct 3, 2023
1 parent d45e432 commit 74a27ad
Show file tree
Hide file tree
Showing 12 changed files with 204 additions and 34 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -135,3 +135,4 @@ channels:
# Developer Notes

1. Install the intellij checkstyle plugin and load the config from config/checkstyle.xml
1. build using: ./gradlew spotlessApply build
74 changes: 57 additions & 17 deletions cli/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,27 @@ This page also contains a simple docker guide for local testing.

This command will provision Kafka resources using AsyncApi spec (aka. App, or data product) and publish to the configured cluster and schema registry environment. It can be run manually, and also as part of a GitOps workflow, and/or build promotion of the spec into different environments where cluster and SR endpoints are configured as environment variables.

### Common config
`provision` will look for a `provision.properties` file in the docker /app/ folder (i.e. /app/provision.properties)
A sample config can contain default data config.


File provision.properties (will be automatically loaded from docker `/app/provision.properties`)
```properties
spec=/app/simple_schema_demo-api.yaml
acl.enabled=true
sr.enabled=true
dry.run=true
bootstrap.server=broker1:9092
username=admin
secret=nothing
schema.registry=http://schema-registry:8081
schema.path=/app/1
sr.api.key=admin
sr.api.secret=nothing

```

### Usage

> % docker run --rm --network kafka_network -v "$(pwd)/resources:/app" ghcr.io/specmesh/specmesh-build-cli provision -bs kafka:9092 -sr http://schema-registry:8081 -spec /app/simple_schema_demo-api.yaml -schemaPath /app
Expand All @@ -18,33 +39,52 @@ This command will provision Kafka resources using AsyncApi spec (aka. App, or da
<summary>Long form</summary>

```
provision [-d] [-bs=<brokerUrl>] [-s=<secret>]
[-schemaPath=<schemaPath>] [-spec=<spec>]
Usage: provision [-aclEnabled] [-clean] [-dry] [-srEnabled] [-bs=<brokerUrl>]
[-s=<secret>] [-schemaPath=<schemaPath>] [-spec=<spec>]
[-sr=<schemaRegistryUrl>] [-srKey=<srApiKey>]
[-srSecret=<srApiSecret>] [-u=<username>]
Apply the provided specification to provision kafka resources and permissions
on the cluster
[-D=<String=String>]...
Apply a specification.yaml to provision kafka resources on a cluster.
Use 'provision.properties' for common arguments
Explicit properties file location /app/provision.properties
-aclEnabled, --acl-enabled
True (default) will provision/publish/validate
ACls. False will ignore ACL related operations
-bs, --bootstrap-server=<brokerUrl>
Kafka bootstrap server url
-d, --dry-run Compares the cluster against the spec, outputting
proposed changes if compatible.If the spec
incompatible with the cluster (not sure how it
could be) then will fail with a descriptive
error message.A return value of 0=indicates no
changes needed; 1=changes needed; -1=not
compatible, blah blah
-s, --secret=<secret> secret credential for the cluster connection
-schemaPath, --schemaPath=<schemaPath>
-clean, --clean-unspecified
Compares the cluster resources against the spec,
outputting proposed set of resources that are
unexpected (not specified). Use with '-dry-run'
for non-destructive checks. This operation will
not create resources, it will only remove
unspecified resources
-D, --property=<String=String>
Specify Java runtime properties for Apache Kafka.
-dry, --dry-run Compares the cluster resources against the spec,
outputting proposed changes if compatible. If
the spec incompatible with the cluster then will
fail with a descriptive error message. A return
value of '0' = indicates no changes needed; '1'
= changes needed; '-1' not compatible
-s, --secret=<secret> secret credential for the cluster connection
-schemaPath, --schema-path=<schemaPath>
schemaPath where the set of referenced schemas
will be loaded
-spec, --spec=<spec> specmesh specification file
-sr, --srUrl=<schemaRegistryUrl>
-sr, --schema-registry=<schemaRegistryUrl>
schemaRegistryUrl
-srKey, --srApiKey=<srApiKey>
-srEnabled, --sr-enabled
True (default) will provision/publish/validate
schemas. False will ignore schema related
operations
-srKey, --sr-api-key=<srApiKey>
srApiKey for schema registry
-srSecret, --srApiSecret=<srApiSecret>
-srSecret, --sr-api-secret=<srApiSecret>
srApiSecret for schema secret
-u, --username=<username> username or api key for the cluster connection
-u, --username=<username> username or api key for the cluster connection
```
</details>
Expand Down
1 change: 1 addition & 0 deletions cli/include/bin/wrapper.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ function provision() {
exec java \
-Xms64m -Xmx64m \
-Dlog4j.configurationFile=/log/log4j2.xml \
-Dprovision.properties=/app/provision.properties \
-cp "/opt/specmesh/service/lib/*" \
io.specmesh.cli.Provision "$@"
}
Expand Down
14 changes: 14 additions & 0 deletions cli/provision.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
spec=/app/simple_schema_demo-api.yaml
acl.enabled=true
sr.enabled=true
dry.run=true
bootstrap.server=broker1:9092
username=admin
secret=nothing
schema.registry=http://schema-registry:8081
schema.path=/app/1
sr.api.key=admin
sr.api.secret=nothing



8 changes: 8 additions & 0 deletions cli/resources/provision.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
acl.enabled=true
sr.enabled=true
dry.run=true
bootstrap.server=broker1:9092
schema.registry=http://schema-registry:8081
spec=/app/simple_schema_demo-api.yaml
schema.path=/app/1
username=admin
77 changes: 64 additions & 13 deletions cli/src/main/java/io/specmesh/cli/Provision.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@
import io.specmesh.kafka.KafkaApiSpec;
import io.specmesh.kafka.provision.Provisioner;
import io.specmesh.kafka.provision.Status;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Callable;
import picocli.CommandLine;
import picocli.CommandLine.Option;
Expand All @@ -32,8 +36,9 @@
@Command(
name = "provision",
description =
"Apply the provided specification to provision kafka resources and permissions on"
+ " the cluster")
"Apply a specification.yaml to provision kafka resources on a cluster.\n"
+ "Use 'provision.properties' for common arguments\n"
+ " Explicit properties file location /app/provision.properties\n\n")
public class Provision implements Callable<Integer> {

private Status state;
Expand All @@ -44,14 +49,59 @@ public class Provision implements Callable<Integer> {
* @param args args
*/
public static void main(final String[] args) {
System.exit(new CommandLine(new Provision()).execute(args));

final var properties = new Properties();
final var propertyFilename =
System.getProperty("provision.properties", "provision.properties");
try (FileInputStream fis = new FileInputStream(propertyFilename)) {
System.out.println(
"Loading `"
+ propertyFilename
+ "` from:"
+ new File(propertyFilename).getAbsolutePath());
properties.load(fis);
properties.entrySet().forEach(entry -> {
properties.put(entry.getKey().toString().replace(".", "-"), entry.getValue());
});
System.out.println(
"Loaded `properties` from cwd:" + new File(propertyFilename).getAbsolutePath());
} catch (IOException e) {
System.out.println(
"Missing `"
+ propertyFilename
+ " ` FROM:"
+ new File(propertyFilename).getAbsolutePath()
+ "\nERROR:"
+ e);
e.printStackTrace();
}

final var provider = new CommandLine.PropertiesDefaultProvider(properties);
System.exit(
new CommandLine(new Provision()).setDefaultValueProvider(provider).execute(args));
}

@Option(
names = {"-bs", "--bootstrap-server"},
description = "Kafka bootstrap server url")
private String brokerUrl = "";

@Option(
names = {"-srEnabled", "--sr-enabled"},
fallbackValue = "true",
description =
"True (default) will provision/publish/validate schemas. False will ignore"
+ " schema related operations")
private boolean srEnabled;

@Option(
names = {"-aclEnabled", "--acl-enabled"},
fallbackValue = "true",
description =
"True (default) will provision/publish/validate ACls. False will ignore ACL"
+ " related operations")
private boolean aclEnabled;

@Option(
names = {"-sr", "--schema-registry"},
description = "schemaRegistryUrl")
Expand Down Expand Up @@ -89,16 +139,18 @@ public static void main(final String[] args) {

@Option(
names = {"-dry", "--dry-run"},
fallbackValue = "false",
description =
"Compares the cluster resources against the spec, outputting proposed changes"
+ " if compatible. If the spec incompatible with the cluster (not sure how"
+ " it could be) then will fail with a descriptive error message. A return"
+ " value of '0' = indicates no changes needed; '1' = changes needed; '-1'"
+ " not compatible")
+ " if compatible. If the spec incompatible with the cluster then will"
+ " fail with a descriptive error message. A return value of '0' ="
+ " indicates no changes needed; '1' = changes needed; '-1' not"
+ " compatible")
private boolean dryRun;

@Option(
names = {"-clean", "--clean-unspecified"},
fallbackValue = "false",
description =
"Compares the cluster resources against the spec, outputting proposed set of"
+ " resources that are unexpected (not specified). Use with '-dry-run' for"
Expand All @@ -107,12 +159,9 @@ public static void main(final String[] args) {
private boolean cleanUnspecified;

@Option(
names = "-D",
names = {"-D", "--property"},
mapFallbackValue = "",
description =
"Specify Java runtime system properties for Apache Kafka. Note: bulk properties"
+ " can be set via '-Dconfig.properties=somefile.properties"
+ " ") // allow -Dkey
description = "Specify Java runtime properties for Apache Kafka." + " ") // allow -Dkey
void setProperty(final Map<String, String> props) {
props.forEach((k, v) -> System.setProperty(k, v));
}
Expand All @@ -121,12 +170,14 @@ void setProperty(final Map<String, String> props) {
public Integer call() throws Exception {
final var status =
Provisioner.provision(
aclEnabled,
dryRun,
cleanUnspecified,
specMeshSpec(),
schemaPath,
Clients.adminClient(brokerUrl, username, secret),
Clients.schemaRegistryClient(schemaRegistryUrl, srApiKey, srApiSecret));
Clients.schemaRegistryClient(
srEnabled, schemaRegistryUrl, srApiKey, srApiSecret));

System.out.println(status.toString());
this.state = status;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ class StorageConsumptionFunctionalTest {
void shouldGetStorageAndConsumptionMetrics() throws Exception {

Provisioner.provision(
true,
false,
false,
API_SPEC,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ public static void provision() {
final SchemaRegistryClient schemaRegistryClient =
new CachedSchemaRegistryClient(KAFKA_ENV.schemeRegistryServer(), 5);
Provisioner.provision(
true,
false,
false,
API_SPEC,
Expand Down
47 changes: 46 additions & 1 deletion kafka/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ Extends SpecMesh with the concept of public, protected and public topics.

Topic names in the spec come under the `channel` section. Their names either:

* start with `_public`, `_protected` or `_private` for topics _owned_ by the domain, or
* start with `_public`, `_protected` or `_private` for topics _owned_ by the domain. Sharing can also be specified using 'grant-access:' at the cost of explicit sharing
* are the fully qualified topic name of some other (domain's) topic.

For topic's owned by the domain, their full topic name is that used in the spec, prefixed with the spec's domain id,
Expand Down Expand Up @@ -59,3 +59,48 @@ library creates repartition and changelog topics for stores automatically using
As services are free to make additional private topics, provisioning does _not_ remove existing private topics not in the spec.
This places the responsibility of cleaning up private topics on engineering teams. However, as these are private
topics, it is easy to determine if such topics are or are not actively in use by the domain's services.

### Looking inside ACLs - whats really going on

Too many ACLs can affect cluster performance. Look at the set of ACLs below. The domain owner `simple.provision_demo` has access to everything (CREATE, READ, WRITE, DESCRIBE) below its designated topic prefix - which is also its domain name. Notice how there are READ, DESCRIBE ACls for all _public topic. Protected /`_protected` topics require a set of ACLs for each topic using the 'LITERAL' pattern type - for each 'grant' to another domain owner `User:some.other.domain.acme-A` there will be a set of ACLs created.


The set of ACLs created from the `provisioner-functional-test-api.yaml`
```text
[(pattern=ResourcePattern(resourceType=GROUP, name=simple.provision_demo, patternType=PREFIXED)
=(principal=User:simple.provision_demo, host=*, operation=READ, permissionType=ALLOW)),
(pattern=ResourcePattern(resourceType=TOPIC, name=simple.provision_demo._private, patternType=PREFIXED)
=(principal=User:simple.provision_demo, host=*, operation=CREATE, permissionType=ALLOW)),
(pattern=ResourcePattern(resourceType=TRANSACTIONAL_ID, name=simple.provision_demo, patternType=PREFIXED)
=(principal=User:simple.provision_demo, host=*, operation=WRITE, permissionType=ALLOW)),
(pattern=ResourcePattern(resourceType=TRANSACTIONAL_ID, name=simple.provision_demo, patternType=PREFIXED)
=(principal=User:simple.provision_demo, host=*, operation=DESCRIBE, permissionType=ALLOW)),
(pattern=ResourcePattern(resourceType=TOPIC, name=simple.provision_demo, patternType=PREFIXED)
=(principal=User:simple.provision_demo, host=*, operation=WRITE, permissionType=ALLOW)),
(pattern=ResourcePattern(resourceType=TOPIC, name=simple.provision_demo, patternType=PREFIXED)
=(principal=User:simple.provision_demo, host=*, operation=DESCRIBE, permissionType=ALLOW)),
(pattern=ResourcePattern(resourceType=TOPIC, name=simple.provision_demo, patternType=PREFIXED)
=(principal=User:simple.provision_demo, host=*, operation=READ, permissionType=ALLOW)),
(pattern=ResourcePattern(resourceType=TOPIC, name=simple.provision_demo._public, patternType=PREFIXED)
=(principal=User:*, host=*, operation=READ, permissionType=ALLOW)),
(pattern=ResourcePattern(resourceType=TOPIC, name=simple.provision_demo._public, patternType=PREFIXED)
=(principal=User:*, host=*, operation=DESCRIBE, permissionType=ALLOW)),
(pattern=ResourcePattern(resourceType=TOPIC, name=simple.provision_demo._protected.user_info, patternType=LITERAL)
=(principal=User:some.other.domain.acme-A, host=*, operation=READ, permissionType=ALLOW)),
(pattern=ResourcePattern(resourceType=TOPIC, name=simple.provision_demo._protected.user_info, patternType=LITERAL)
=(principal=User:some.other.domain.acme-A, host=*, operation=DESCRIBE, permissionType=ALLOW))
(pattern=ResourcePattern(resourceType=CLUSTER, name=kafka-cluster, patternType=PREFIXED)
=(principal=User:simple.provision_demo, host=*, operation=IDEMPOTENT_WRITE, permissionType=ALLOW)),
]
```
7 changes: 5 additions & 2 deletions kafka/src/main/java/io/specmesh/kafka/Clients.java
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,11 @@ private static String buildJaasConfig(final String userName, final String passwo
}

public static Optional<SchemaRegistryClient> schemaRegistryClient(
final String schemaRegistryUrl, final String srApiKey, final String srApiSecret) {
if (schemaRegistryUrl != null) {
final boolean srEnabled,
final String schemaRegistryUrl,
final String srApiKey,
final String srApiSecret) {
if (srEnabled && schemaRegistryUrl != null) {
final Map<String, Object> properties = new HashMap<>();
if (srApiKey != null) {
properties.put(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ private Provisioner() {}
/**
* Provision Topics, ACLS and schemas
*
* @param aclEnabled
* @param dryRun test or execute
* @param cleanUnspecified cleanup
* @param apiSpec given spec
Expand All @@ -41,6 +42,7 @@ private Provisioner() {}
* @throws ProvisioningException when cant provision resources
*/
public static Status provision(
final boolean aclEnabled,
final boolean dryRun,
final boolean cleanUnspecified,
final KafkaApiSpec apiSpec,
Expand All @@ -64,7 +66,9 @@ public static Status provision(
apiSpec,
schemaResources,
registryClient)));
status.acls(AclProvisioner.provision(dryRun, cleanUnspecified, apiSpec, adminClient));
if (aclEnabled) {
status.acls(AclProvisioner.provision(dryRun, cleanUnspecified, apiSpec, adminClient));
}
return status.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ void shouldRecordStats() throws ExecutionException, InterruptedException, Timeou
try (Admin adminClient = KAFKA_ENV.adminClient()) {
final var client = SmAdminClient.create(adminClient);
Provisioner.provision(
true,
false,
false,
API_SPEC,
Expand Down

0 comments on commit 74a27ad

Please sign in to comment.