From c064a16a54814616ed9bc3ff5e9b70ba5e669472 Mon Sep 17 00:00:00 2001
From: Ryanne Dolan <rdolan@linkedin.com>
Date: Fri, 30 Jun 2023 11:42:52 -0500
Subject: [PATCH 1/3] Report subscription status (#38)

* Report subscription status

* Doc string improvement h/t @hshukla

* Wordsmithing h/t @ehoner
---
 deploy/dev/rbac.yaml                          |   6 +-
 deploy/hoptimator-operator-deployment.yaml    |   1 +
 deploy/rbac.yaml                              |   9 +-
 deploy/subscriptions.crd.yaml                 |  12 ++
 .../hoptimator/models/V1alpha1KafkaTopic.java |   2 +-
 .../models/V1alpha1KafkaTopicList.java        |   2 +-
 .../models/V1alpha1KafkaTopicSpec.java        |   2 +-
 .../V1alpha1KafkaTopicSpecClientConfigs.java  |   2 +-
 .../V1alpha1KafkaTopicSpecConfigMapRef.java   |   2 +-
 .../models/V1alpha1KafkaTopicStatus.java      |   2 +-
 .../models/V1alpha1Subscription.java          |   2 +-
 .../models/V1alpha1SubscriptionList.java      |   2 +-
 .../models/V1alpha1SubscriptionSpec.java      |   2 +-
 .../models/V1alpha1SubscriptionStatus.java    |  74 +++++++-
 .../hoptimator/operator/Operator.java         |  47 +----
 .../subscription/SubscriptionReconciler.java  | 172 ++++++++++++++++--
 16 files changed, 263 insertions(+), 76 deletions(-)

diff --git a/deploy/dev/rbac.yaml b/deploy/dev/rbac.yaml
index 37a5ee87..9595182f 100644
--- a/deploy/dev/rbac.yaml
+++ b/deploy/dev/rbac.yaml
@@ -1,13 +1,13 @@
 apiVersion: rbac.authorization.k8s.io/v1
 kind: RoleBinding
 metadata:
-  name: hoptimator-controller
+  name: hoptimator-operator
   namespace: default
 subjects:
 - kind: ServiceAccount
-  name: default
+  name: hoptimator-operator
   namespace: default
 roleRef:
   kind: Role
-  name: hoptimator-controller
+  name: hoptimator-operator
   apiGroup: rbac.authorization.k8s.io
diff --git a/deploy/hoptimator-operator-deployment.yaml b/deploy/hoptimator-operator-deployment.yaml
index 83e786c8..679bf582 100644
--- a/deploy/hoptimator-operator-deployment.yaml
+++ b/deploy/hoptimator-operator-deployment.yaml
@@ -14,6 +14,7 @@ spec:
       labels:
         app: hoptimator-operator
     spec:
+      serviceAccountName: hoptimator-operator
       containers:
       - name: hoptimator-operator
         image: docker.io/library/hoptimator
diff --git a/deploy/rbac.yaml b/deploy/rbac.yaml
index c3d26141..9d097a13 100644
--- a/deploy/rbac.yaml
+++ b/deploy/rbac.yaml
@@ -2,12 +2,15 @@ apiVersion: rbac.authorization.k8s.io/v1
 kind: Role
 metadata:
   namespace: default
-  name: hoptimator-controller
+  name: hoptimator-operator
 rules:
 - apiGroups: ["hoptimator.linkedin.com"]
   resources: ["kafkatopics", "subscriptions"]
-  verbs: ["get", "watch", "list", "update", "create"]
+  verbs: ["get", "watch", "list", "create"]
+- apiGroups: ["hoptimator.linkedin.com"]
+  resources: ["kafkatopics/status", "subscriptions/status"]
+  verbs: ["get", "patch"]
 - apiGroups: ["flink.apache.org"]
   resources: ["flinkdeployments"]
-  verbs: ["update", "create"]
+  verbs: ["get", "update", "create"]
 
diff --git a/deploy/subscriptions.crd.yaml b/deploy/subscriptions.crd.yaml
index b3bae546..87fcbaf6 100644
--- a/deploy/subscriptions.crd.yaml
+++ b/deploy/subscriptions.crd.yaml
@@ -51,9 +51,21 @@ spec:
                 message:
                   description: Error or success message, for information only.
                   type: string
+                sql:
+                  description: The SQL being implemented by this pipeline.
+                  type: string
+                resources:
+                  description: The YAML generated to implement this pipeline.
+                  type: array
+                  items:
+                    type: string
       subresources:
         status: {}
       additionalPrinterColumns:
+      - name: STATUS
+        type: string
+        description: Status message from the operator.
+        jsonPath: .status.message
       - name: DB
         type: string
         description: The database where the subscription is materialized.
diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopic.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopic.java
index b6bdddb1..51f95165 100644
--- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopic.java
+++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopic.java
@@ -31,7 +31,7 @@
  * Kafka Topic
  */
 @ApiModel(description = "Kafka Topic")
-@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-04-28T19:46:31.976Z[Etc/UTC]")
+@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-06-29T22:51:02.615Z[Etc/UTC]")
 public class V1alpha1KafkaTopic implements io.kubernetes.client.common.KubernetesObject {
   public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
   @SerializedName(SERIALIZED_NAME_API_VERSION)
diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicList.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicList.java
index ac89f643..cac3ee80 100644
--- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicList.java
+++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicList.java
@@ -32,7 +32,7 @@
  * KafkaTopicList is a list of KafkaTopic
  */
 @ApiModel(description = "KafkaTopicList is a list of KafkaTopic")
-@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-04-28T19:46:31.976Z[Etc/UTC]")
+@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-06-29T22:51:02.615Z[Etc/UTC]")
 public class V1alpha1KafkaTopicList implements io.kubernetes.client.common.KubernetesListObject {
   public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
   @SerializedName(SERIALIZED_NAME_API_VERSION)
diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpec.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpec.java
index b278f9ad..57c3eef2 100644
--- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpec.java
+++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpec.java
@@ -33,7 +33,7 @@
  * Desired Kafka topic configuration.
  */
 @ApiModel(description = "Desired Kafka topic configuration.")
-@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-04-28T19:46:31.976Z[Etc/UTC]")
+@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-06-29T22:51:02.615Z[Etc/UTC]")
 public class V1alpha1KafkaTopicSpec {
   public static final String SERIALIZED_NAME_CLIENT_CONFIGS = "clientConfigs";
   @SerializedName(SERIALIZED_NAME_CLIENT_CONFIGS)
diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpecClientConfigs.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpecClientConfigs.java
index ff1a7073..1e9ba004 100644
--- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpecClientConfigs.java
+++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpecClientConfigs.java
@@ -28,7 +28,7 @@
 /**
  * V1alpha1KafkaTopicSpecClientConfigs
  */
-@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-04-28T19:46:31.976Z[Etc/UTC]")
+@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-06-29T22:51:02.615Z[Etc/UTC]")
 public class V1alpha1KafkaTopicSpecClientConfigs {
   public static final String SERIALIZED_NAME_CONFIG_MAP_REF = "configMapRef";
   @SerializedName(SERIALIZED_NAME_CONFIG_MAP_REF)
diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpecConfigMapRef.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpecConfigMapRef.java
index ecf8fab2..03fdb916 100644
--- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpecConfigMapRef.java
+++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpecConfigMapRef.java
@@ -28,7 +28,7 @@
  * Reference to a ConfigMap to use for AdminClient configuration.
  */
 @ApiModel(description = "Reference to a ConfigMap to use for AdminClient configuration.")
-@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-04-28T19:46:31.976Z[Etc/UTC]")
+@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-06-29T22:51:02.615Z[Etc/UTC]")
 public class V1alpha1KafkaTopicSpecConfigMapRef {
   public static final String SERIALIZED_NAME_NAME = "name";
   @SerializedName(SERIALIZED_NAME_NAME)
diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicStatus.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicStatus.java
index 5b0ab4c5..3113c19e 100644
--- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicStatus.java
+++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicStatus.java
@@ -28,7 +28,7 @@
  * Current state of the topic.
  */
 @ApiModel(description = "Current state of the topic.")
-@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-04-28T19:46:31.976Z[Etc/UTC]")
+@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-06-29T22:51:02.615Z[Etc/UTC]")
 public class V1alpha1KafkaTopicStatus {
   public static final String SERIALIZED_NAME_MESSAGE = "message";
   @SerializedName(SERIALIZED_NAME_MESSAGE)
diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1Subscription.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1Subscription.java
index 552a3190..47ef54c6 100644
--- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1Subscription.java
+++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1Subscription.java
@@ -31,7 +31,7 @@
  * Hoptimator Subscription
  */
 @ApiModel(description = "Hoptimator Subscription")
-@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-04-28T19:46:31.976Z[Etc/UTC]")
+@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-06-29T22:51:02.615Z[Etc/UTC]")
 public class V1alpha1Subscription implements io.kubernetes.client.common.KubernetesObject {
   public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
   @SerializedName(SERIALIZED_NAME_API_VERSION)
diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionList.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionList.java
index 7f5185db..d81833f2 100644
--- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionList.java
+++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionList.java
@@ -32,7 +32,7 @@
  * SubscriptionList is a list of Subscription
  */
 @ApiModel(description = "SubscriptionList is a list of Subscription")
-@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-04-28T19:46:31.976Z[Etc/UTC]")
+@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-06-29T22:51:02.615Z[Etc/UTC]")
 public class V1alpha1SubscriptionList implements io.kubernetes.client.common.KubernetesListObject {
   public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
   @SerializedName(SERIALIZED_NAME_API_VERSION)
diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionSpec.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionSpec.java
index 10f189a7..265b0408 100644
--- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionSpec.java
+++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionSpec.java
@@ -28,7 +28,7 @@
  * Subscription spec
  */
 @ApiModel(description = "Subscription spec")
-@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-04-28T19:46:31.976Z[Etc/UTC]")
+@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-06-29T22:51:02.615Z[Etc/UTC]")
 public class V1alpha1SubscriptionSpec {
   public static final String SERIALIZED_NAME_DATABASE = "database";
   @SerializedName(SERIALIZED_NAME_DATABASE)
diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionStatus.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionStatus.java
index 8f41d08c..b13acec5 100644
--- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionStatus.java
+++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionStatus.java
@@ -23,12 +23,14 @@
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 
 /**
  * Filled in by the operator.
  */
 @ApiModel(description = "Filled in by the operator.")
-@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-04-28T19:46:31.976Z[Etc/UTC]")
+@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-06-29T22:51:02.615Z[Etc/UTC]")
 public class V1alpha1SubscriptionStatus {
   public static final String SERIALIZED_NAME_MESSAGE = "message";
   @SerializedName(SERIALIZED_NAME_MESSAGE)
@@ -38,6 +40,14 @@ public class V1alpha1SubscriptionStatus {
   @SerializedName(SERIALIZED_NAME_READY)
   private Boolean ready;
 
+  public static final String SERIALIZED_NAME_RESOURCES = "resources";
+  @SerializedName(SERIALIZED_NAME_RESOURCES)
+  private List<String> resources = null;
+
+  public static final String SERIALIZED_NAME_SQL = "sql";
+  @SerializedName(SERIALIZED_NAME_SQL)
+  private String sql;
+
 
   public V1alpha1SubscriptionStatus message(String message) {
     
@@ -85,6 +95,60 @@ public void setReady(Boolean ready) {
   }
 
 
+  public V1alpha1SubscriptionStatus resources(List<String> resources) {
+    
+    this.resources = resources;
+    return this;
+  }
+
+  public V1alpha1SubscriptionStatus addResourcesItem(String resourcesItem) {
+    if (this.resources == null) {
+      this.resources = new ArrayList<>();
+    }
+    this.resources.add(resourcesItem);
+    return this;
+  }
+
+   /**
+   * The YAML generated to implement this pipeline.
+   * @return resources
+  **/
+  @javax.annotation.Nullable
+  @ApiModelProperty(value = "The YAML generated to implement this pipeline.")
+
+  public List<String> getResources() {
+    return resources;
+  }
+
+
+  public void setResources(List<String> resources) {
+    this.resources = resources;
+  }
+
+
+  public V1alpha1SubscriptionStatus sql(String sql) {
+    
+    this.sql = sql;
+    return this;
+  }
+
+   /**
+   * The SQL being implemented by this pipeline.
+   * @return sql
+  **/
+  @javax.annotation.Nullable
+  @ApiModelProperty(value = "The SQL being implemented by this pipeline.")
+
+  public String getSql() {
+    return sql;
+  }
+
+
+  public void setSql(String sql) {
+    this.sql = sql;
+  }
+
+
   @Override
   public boolean equals(Object o) {
     if (this == o) {
@@ -95,12 +159,14 @@ public boolean equals(Object o) {
     }
     V1alpha1SubscriptionStatus v1alpha1SubscriptionStatus = (V1alpha1SubscriptionStatus) o;
     return Objects.equals(this.message, v1alpha1SubscriptionStatus.message) &&
-        Objects.equals(this.ready, v1alpha1SubscriptionStatus.ready);
+        Objects.equals(this.ready, v1alpha1SubscriptionStatus.ready) &&
+        Objects.equals(this.resources, v1alpha1SubscriptionStatus.resources) &&
+        Objects.equals(this.sql, v1alpha1SubscriptionStatus.sql);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(message, ready);
+    return Objects.hash(message, ready, resources, sql);
   }
 
 
@@ -110,6 +176,8 @@ public String toString() {
     sb.append("class V1alpha1SubscriptionStatus {\n");
     sb.append("    message: ").append(toIndentedString(message)).append("\n");
     sb.append("    ready: ").append(toIndentedString(ready)).append("\n");
+    sb.append("    resources: ").append(toIndentedString(resources)).append("\n");
+    sb.append("    sql: ").append(toIndentedString(sql)).append("\n");
     sb.append("}");
     return sb.toString();
   }
diff --git a/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/Operator.java b/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/Operator.java
index 0005bd70..27fbd96a 100644
--- a/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/Operator.java
+++ b/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/Operator.java
@@ -122,54 +122,17 @@ public DynamicKubernetesApi apiFor(DynamicKubernetesObject obj) {
     return (GenericKubernetesApi<T, L>) apiInfo(groupVersionKind).generic(apiClient);
   }
 
-  public boolean applyResource(Resource resource, V1OwnerReference ownerReference,
-      Resource.TemplateFactory templateFactory) {
-    String yaml = resource.render(templateFactory);
-    DynamicKubernetesObject obj = Dynamics.newFromYaml(yaml);
-    String namespace = obj.getMetadata().getNamespace();
-    String name = obj.getMetadata().getName();
-    KubernetesApiResponse<DynamicKubernetesObject> existing = apiFor(obj).get(namespace, name);
-    if (existing.isSuccess()) {
-      String resourceVersion = existing.getObject().getMetadata().getResourceVersion();
-      log.info("Updating existing downstream resource {}/{} {} as \n{}",
-        namespace, name, resourceVersion, yaml);
-      List<V1OwnerReference> owners = existing.getObject().getMetadata().getOwnerReferences();
-      if (owners == null) {
-        owners = new ArrayList<>();
-      }
-      if (owners.stream().anyMatch(x -> x.getUid().equals(ownerReference.getUid()))) {
-        log.info("Existing downstream resource {}/{} is already owned by {}/{}.",
-          namespace, name, ownerReference.getKind(), ownerReference.getName());
-      } else {
-        log.info("Existing downstream resource {}/{} will be owned by {}/{} and {} others.",
-          namespace, name, ownerReference.getKind(), ownerReference.getName(), owners.size());
-        owners.add(ownerReference);
-      }
-      obj.setMetadata(obj.getMetadata().ownerReferences(owners).resourceVersion(resourceVersion));
-      KubernetesApiResponse<DynamicKubernetesObject> response = apiFor(obj).update(obj);
-      if (!response.isSuccess()) {
-        log.error("Error updating downstream resource {}/{}: {}.", namespace, name, response.getStatus().getMessage());
-        return false;
-      }
-    } else {
-      log.info("Creating downstream resource {}/{} as \n{}", namespace, name, yaml);
-      obj.setMetadata(obj.getMetadata().addOwnerReferencesItem(ownerReference));
-      KubernetesApiResponse<DynamicKubernetesObject> response = apiFor(obj).create(obj);
-      if (!response.isSuccess()) {
-        log.error("Error creating downstream resource {}/{}: {}.", namespace, name, response.getStatus().getMessage());
-        return false;
-      }
-    }
-    return true;
-  }
-
   public Duration failureRetryDuration() {
     return Duration.ofMinutes(5);
   }
 
-  public Duration resyncPeriod() {
+  public Duration pendingRetryDuration() {
     return Duration.ofMinutes(1);
   }
+  
+  public Duration resyncPeriod() {
+    return Duration.ofMinutes(10);
+  }
 
   public static class ApiInfo<T extends KubernetesObject, L extends KubernetesListObject> {
     private final String kind;
diff --git a/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/subscription/SubscriptionReconciler.java b/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/subscription/SubscriptionReconciler.java
index f09ca7dd..918ca5c3 100644
--- a/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/subscription/SubscriptionReconciler.java
+++ b/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/subscription/SubscriptionReconciler.java
@@ -3,8 +3,8 @@
 import com.linkedin.hoptimator.catalog.Resource;
 import com.linkedin.hoptimator.catalog.HopTable;
 import com.linkedin.hoptimator.models.V1alpha1Subscription;
-import com.linkedin.hoptimator.models.V1alpha1SubscriptionStatus;
 import com.linkedin.hoptimator.models.V1alpha1SubscriptionSpec;
+import com.linkedin.hoptimator.models.V1alpha1SubscriptionStatus;
 import com.linkedin.hoptimator.operator.Operator;
 import com.linkedin.hoptimator.operator.ConfigAssembler;
 import com.linkedin.hoptimator.operator.RequestEnvironment;
@@ -27,6 +27,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -35,6 +36,7 @@
 import java.util.Properties;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.CountDownLatch;
+import java.util.stream.Collectors;
 
 public class SubscriptionReconciler implements Reconciler {
   private final static Logger log = LoggerFactory.getLogger(SubscriptionReconciler.class);
@@ -56,38 +58,80 @@ public Result reconcile(Request request) {
     RequestEnvironment env = new RequestEnvironment(request);
     Resource.TemplateFactory templateFactory = new Resource.SimpleTemplateFactory(env);
 
+    Result result = new Result(true, operator.pendingRetryDuration());
     try {
       V1alpha1Subscription object = operator.<V1alpha1Subscription>fetch(SUBSCRIPTION, namespace,
         name);
-  
+ 
       if (object ==  null) {
         log.info("Object {}/{} deleted, skipping.", namespace, name);
         return new Result(false);
       }
-
-      V1OwnerReference ownerReference = new V1OwnerReference();
-      ownerReference.kind(object.getKind());
-      ownerReference.name(object.getMetadata().getName());
-      ownerReference.apiVersion(object.getApiVersion());
-      ownerReference.uid(object.getMetadata().getUid());
-   
-      Pipeline pipeline = pipeline(object);
-      boolean ready = pipeline.resources().stream()
-        .map(x -> operator.applyResource(x, ownerReference, templateFactory)).allMatch(x -> x);
+      
+      String kind = object.getKind();
 
       V1alpha1SubscriptionStatus status = object.getStatus();
       if (status == null) {
         status = new V1alpha1SubscriptionStatus();
+        object.setStatus(status);
       }
-      status.setReady(ready);
 
-      operator.apiFor(SUBSCRIPTION).updateStatus(object, x -> object.getStatus());
+      // We deploy in three phases:
+      // 1. Plan a pipeline, and write the plan to Status.
+      // 2. Deploy the pipeline per plan.
+      // 3. Verify readiness of the entire pipeline.
+      // Each phase should be a separate reconcilation loop to avoid races.
+      // TODO: We should disown orphaned resources when the pipeline changes.
+      if (status.getSql() == null || !status.getSql().equals(object.getSpec().getSql())) {
+        // Phase 1
+        log.info("Planning a new pipeline for {}/{} with SQL `{}`...", kind, name, object.getSpec().getSql());
+
+        Pipeline pipeline = pipeline(object);
+        status.setResources(pipeline.resources().stream()
+          .map(x -> x.render(templateFactory))
+          .collect(Collectors.toList()));
+
+        status.setSql(object.getSpec().getSql());
+        status.setReady(null);  // null indicates that pipeline needs to be deployed
+        status.setMessage("Planned.");
+      } else if (status.getReady() == null && status.getResources() != null) {
+        // Phase 2
+        log.info("Deploying pipeline for {}/{}...", kind, name);
+
+        boolean deployed = status.getResources().stream()
+          .allMatch(x -> apply(x, object));
+
+        if (deployed) {
+          status.setReady(false);
+          status.setMessage("Deployed.");
+        } else {
+          return new Result(true, operator.failureRetryDuration());
+        }
+      } else {
+        log.info("Checking status of pipeline for {}/{}...", kind, name);
+
+        boolean ready = status.getResources().stream()
+          .allMatch(x -> checkStatus(x));
+
+        if (ready) {
+          status.setReady(true);
+          status.setMessage("Ready.");
+          log.info("{}/{} is ready.", kind, name);
+          result = new Result(false);
+        } else {
+          status.setReady(false);
+          status.setMessage("Deployed.");
+          log.info("Pipeline for {}/{} is NOT ready.", kind, name);
+        }
+      }
+
+      operator.apiFor(SUBSCRIPTION).updateStatus(object, x -> object.getStatus())
+        .onFailure((x, y) -> log.error("Failed to update status of {}/{}: {}.", kind, name, y.getMessage()));
     } catch (Exception e) {
       log.error("Encountered exception while reconciling Subscription {}/{}", namespace, name, e);
       return new Result(true, operator.failureRetryDuration());
     }
-    log.info("Done reconciling {}/{}", namespace, name);
-    return new Result(false);
+    return result;
   }
 
   Pipeline pipeline(V1alpha1Subscription object) throws Exception {
@@ -106,6 +150,102 @@ Pipeline pipeline(V1alpha1Subscription object) throws Exception {
     return impl.pipeline(sink);
   }
 
+  private boolean apply(String yaml, V1alpha1Subscription owner) {
+    V1OwnerReference ownerReference = new V1OwnerReference();
+    ownerReference.kind(owner.getKind());
+    ownerReference.name(owner.getMetadata().getName());
+    ownerReference.apiVersion(owner.getApiVersion());
+    ownerReference.uid(owner.getMetadata().getUid());
+
+    DynamicKubernetesObject obj = Dynamics.newFromYaml(yaml);
+    String namespace = obj.getMetadata().getNamespace();
+    String name = obj.getMetadata().getName();
+    KubernetesApiResponse<DynamicKubernetesObject> existing = operator.apiFor(obj).get(namespace, name);
+    if (existing.isSuccess()) {
+      String resourceVersion = existing.getObject().getMetadata().getResourceVersion();
+      log.info("Updating existing downstream resource {}/{} {} as \n{}",
+        namespace, name, resourceVersion, yaml);
+      List<V1OwnerReference> owners = existing.getObject().getMetadata().getOwnerReferences();
+      if (owners == null) {
+        owners = new ArrayList<>();
+      }
+      if (owners.stream().anyMatch(x -> x.getUid().equals(ownerReference.getUid()))) {
+        log.info("Existing downstream resource {}/{} is already owned by {}/{}.",
+          namespace, name, ownerReference.getKind(), ownerReference.getName());
+      } else {
+        log.info("Existing downstream resource {}/{} will be owned by {}/{} and {} others.",
+          namespace, name, ownerReference.getKind(), ownerReference.getName(), owners.size());
+        owners.add(ownerReference);
+      }
+      obj.setMetadata(obj.getMetadata().ownerReferences(owners).resourceVersion(resourceVersion));
+      KubernetesApiResponse<DynamicKubernetesObject> response = operator.apiFor(obj).update(obj);
+      if (!response.isSuccess()) {
+        log.error("Error updating downstream resource {}/{}: {}.", namespace, name, response.getStatus().getMessage());
+        return false;
+      }
+    } else {
+      log.info("Creating downstream resource {}/{} as \n{}", namespace, name, yaml);
+      obj.setMetadata(obj.getMetadata().addOwnerReferencesItem(ownerReference));
+      KubernetesApiResponse<DynamicKubernetesObject> response = operator.apiFor(obj).create(obj);
+      if (!response.isSuccess()) {
+        log.error("Error creating downstream resource {}/{}: {}.", namespace, name, response.getStatus().getMessage());
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private boolean checkStatus(String yaml) {
+    DynamicKubernetesObject obj = Dynamics.newFromYaml(yaml);
+    String namespace = obj.getMetadata().getNamespace();
+    String name = obj.getMetadata().getName();
+    String kind = obj.getKind();
+    try {
+      KubernetesApiResponse<DynamicKubernetesObject> existing = operator.apiFor(obj).get(namespace, name);
+      existing.onFailure((code, status) -> log.warn("Failed to fetch {}/{}: {}.", kind, name, status.getMessage()));
+      if (!existing.isSuccess()) {
+        return false;
+      }
+      if (isReady(existing.getObject())) {
+        log.info("{}/{} is ready.", kind, name);
+        return true;
+      } else {
+        log.info("{}/{} is NOT ready.", kind, name);
+        return false;
+      }
+    } catch (Exception e) {
+      return false;
+    }
+  }
+
+  private static boolean isReady(DynamicKubernetesObject obj) {
+    // We make a best effort to guess the status of the dynamic object. By default, it's ready.
+    if (obj == null || obj.getRaw() == null) {
+      return false;
+    }
+    try {
+      return obj.getRaw().get("status").getAsJsonObject().get("ready").getAsBoolean();
+    } catch (Exception e) {
+      log.debug("Exception looking for .status.ready. Swallowing.", e);
+    }
+    try {
+      return obj.getRaw().get("status").getAsJsonObject().get("state").getAsString()
+        .matches("(?i)READY|RUNNING|FINISHED");
+    } catch (Exception e) {
+      log.debug("Exception looking for .status.state. Swallowing.", e);
+    }
+    try {
+      return obj.getRaw().get("status").getAsJsonObject().get("jobStatus").getAsJsonObject()
+        .get("state").getAsString().matches("(?i)READY|RUNNING|FINISHED");
+    } catch (Exception e) {
+      log.debug("Exception looking for .status.jobStatus.state. Swallowing.", e);
+    }
+    // TODO: Look for common Conditions
+    log.warn("Resource {}/{}/{} considered ready by default.", obj.getMetadata().getNamespace(),
+      obj.getKind(), obj.getMetadata().getName());
+    return true;
+  }
+
   public static Controller controller(Operator operator, HoptimatorPlanner.Factory plannerFactory) {
     Reconciler reconciler = new SubscriptionReconciler(operator, plannerFactory);
     return ControllerBuilder.defaultBuilder(operator.informerFactory())

From df6d48c2af4a9ff6327700552fc8cc8a7566b859 Mon Sep 17 00:00:00 2001
From: Ryanne Dolan <rdolan@linkedin.com>
Date: Wed, 19 Jul 2023 11:48:31 -0500
Subject: [PATCH 2/3] License select files under Apache 2

---
 deploy/dev/kafka.yaml | 18 ++++++++++++++++++
 deploy/dev/mysql.yaml | 18 ++++++++++++++++++
 2 files changed, 36 insertions(+)

diff --git a/deploy/dev/kafka.yaml b/deploy/dev/kafka.yaml
index e5058edf..cc40b5ec 100644
--- a/deploy/dev/kafka.yaml
+++ b/deploy/dev/kafka.yaml
@@ -1,3 +1,21 @@
+//  Copyright (c) 2023, LinkedIn
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+//
+//  Based on examples at:
+//  https://github.com/strimzi/strimzi-kafka-operator/blob/main/examples/kafka
+
+
 apiVersion: kafka.strimzi.io/v1beta2
 kind: Kafka
 metadata:
diff --git a/deploy/dev/mysql.yaml b/deploy/dev/mysql.yaml
index 1df9e426..e19b6f67 100644
--- a/deploy/dev/mysql.yaml
+++ b/deploy/dev/mysql.yaml
@@ -1,3 +1,21 @@
+//  Copyright (c) 2023, LinkedIn
+//
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+//
+//  Based on examples at:
+//  https://debezium.io/documentation/reference/stable/operations/kubernetes.html
+
+
 apiVersion: v1
 kind: Service
 metadata:

From a5dd664bb45395ae74ab704e618026e85bf669c4 Mon Sep 17 00:00:00 2001
From: Ryanne Dolan <rdolan@linkedin.com>
Date: Wed, 19 Jul 2023 13:27:19 -0500
Subject: [PATCH 3/3] Whoops.

---
 deploy/dev/kafka.yaml                         |  32 +-
 deploy/dev/mysql.yaml                         |  32 +-
 .../hoptimator/models/V1alpha1Acl.java        | 219 ++++++++++++++
 .../hoptimator/models/V1alpha1AclList.java    | 195 +++++++++++++
 .../hoptimator/models/V1alpha1AclSpec.java    | 239 +++++++++++++++
 .../models/V1alpha1AclSpecResource.java       | 129 +++++++++
 .../hoptimator/models/V1alpha1AclStatus.java  | 129 +++++++++
 .../hoptimator/models/V1alpha1KafkaTopic.java | 219 ++++++++++++++
 .../models/V1alpha1KafkaTopicList.java        | 195 +++++++++++++
 .../models/V1alpha1KafkaTopicSpec.java        | 273 ++++++++++++++++++
 .../V1alpha1KafkaTopicSpecClientConfigs.java  | 100 +++++++
 .../V1alpha1KafkaTopicSpecConfigMapRef.java   |  99 +++++++
 .../models/V1alpha1KafkaTopicStatus.java      | 158 ++++++++++
 .../models/V1alpha1Subscription.java          | 219 ++++++++++++++
 .../models/V1alpha1SubscriptionList.java      | 195 +++++++++++++
 .../models/V1alpha1SubscriptionSpec.java      | 127 ++++++++
 .../models/V1alpha1SubscriptionStatus.java    | 197 +++++++++++++
 17 files changed, 2725 insertions(+), 32 deletions(-)
 create mode 100644 hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1Acl.java
 create mode 100644 hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclList.java
 create mode 100644 hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclSpec.java
 create mode 100644 hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclSpecResource.java
 create mode 100644 hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclStatus.java
 create mode 100644 hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopic.java
 create mode 100644 hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicList.java
 create mode 100644 hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpec.java
 create mode 100644 hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpecClientConfigs.java
 create mode 100644 hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpecConfigMapRef.java
 create mode 100644 hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicStatus.java
 create mode 100644 hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1Subscription.java
 create mode 100644 hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionList.java
 create mode 100644 hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionSpec.java
 create mode 100644 hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionStatus.java

diff --git a/deploy/dev/kafka.yaml b/deploy/dev/kafka.yaml
index cc40b5ec..aad99078 100644
--- a/deploy/dev/kafka.yaml
+++ b/deploy/dev/kafka.yaml
@@ -1,19 +1,19 @@
-//  Copyright (c) 2023, LinkedIn
-//
-//  Licensed under the Apache License, Version 2.0 (the "License");
-//  you may not use this file except in compliance with the License.
-//  You may obtain a copy of the License at
-//
-//    http://www.apache.org/licenses/LICENSE-2.0
-//
-//  Unless required by applicable law or agreed to in writing, software
-//  distributed under the License is distributed on an "AS IS" BASIS,
-//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-//  See the License for the specific language governing permissions and
-//  limitations under the License.
-//
-//  Based on examples at:
-//  https://github.com/strimzi/strimzi-kafka-operator/blob/main/examples/kafka
+# Copyright (c) 2023, LinkedIn
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Based on examples at:
+# https://github.com/strimzi/strimzi-kafka-operator/blob/main/examples/kafka
 
 
 apiVersion: kafka.strimzi.io/v1beta2
diff --git a/deploy/dev/mysql.yaml b/deploy/dev/mysql.yaml
index e19b6f67..06b7f160 100644
--- a/deploy/dev/mysql.yaml
+++ b/deploy/dev/mysql.yaml
@@ -1,19 +1,19 @@
-//  Copyright (c) 2023, LinkedIn
-//
-//  Licensed under the Apache License, Version 2.0 (the "License");
-//  you may not use this file except in compliance with the License.
-//  You may obtain a copy of the License at
-//
-//    http://www.apache.org/licenses/LICENSE-2.0
-//
-//  Unless required by applicable law or agreed to in writing, software
-//  distributed under the License is distributed on an "AS IS" BASIS,
-//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-//  See the License for the specific language governing permissions and
-//  limitations under the License.
-//
-//  Based on examples at:
-//  https://debezium.io/documentation/reference/stable/operations/kubernetes.html
+# Copyright (c) 2023, LinkedIn
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Based on examples at:
+# https://debezium.io/documentation/reference/stable/operations/kubernetes.html
 
 
 apiVersion: v1
diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1Acl.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1Acl.java
new file mode 100644
index 00000000..78c2c256
--- /dev/null
+++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1Acl.java
@@ -0,0 +1,219 @@
+/*
+ * Kubernetes
+ * No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator)
+ *
+ * The version of the OpenAPI document: v1.21.1
+ * 
+ *
+ * NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech).
+ * https://openapi-generator.tech
+ * Do not edit the class manually.
+ */
+
+
+package com.linkedin.hoptimator.models;
+
+import java.util.Objects;
+import java.util.Arrays;
+import com.google.gson.TypeAdapter;
+import com.google.gson.annotations.JsonAdapter;
+import com.google.gson.annotations.SerializedName;
+import com.google.gson.stream.JsonReader;
+import com.google.gson.stream.JsonWriter;
+import com.linkedin.hoptimator.models.V1alpha1AclSpec;
+import com.linkedin.hoptimator.models.V1alpha1AclStatus;
+import io.kubernetes.client.openapi.models.V1ObjectMeta;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import java.io.IOException;
+
+/**
+ * Access control rule (colloquially, an Acl)
+ */
+@ApiModel(description = "Access control rule (colloquially, an Acl)")
+@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]")
+public class V1alpha1Acl implements io.kubernetes.client.common.KubernetesObject {
+  public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
+  @SerializedName(SERIALIZED_NAME_API_VERSION)
+  private String apiVersion;
+
+  public static final String SERIALIZED_NAME_KIND = "kind";
+  @SerializedName(SERIALIZED_NAME_KIND)
+  private String kind;
+
+  public static final String SERIALIZED_NAME_METADATA = "metadata";
+  @SerializedName(SERIALIZED_NAME_METADATA)
+  private V1ObjectMeta metadata = null;
+
+  public static final String SERIALIZED_NAME_SPEC = "spec";
+  @SerializedName(SERIALIZED_NAME_SPEC)
+  private V1alpha1AclSpec spec;
+
+  public static final String SERIALIZED_NAME_STATUS = "status";
+  @SerializedName(SERIALIZED_NAME_STATUS)
+  private V1alpha1AclStatus status;
+
+
+  public V1alpha1Acl apiVersion(String apiVersion) {
+    
+    this.apiVersion = apiVersion;
+    return this;
+  }
+
+   /**
+   * APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources
+   * @return apiVersion
+  **/
+  @javax.annotation.Nullable
+  @ApiModelProperty(value = "APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources")
+
+  public String getApiVersion() {
+    return apiVersion;
+  }
+
+
+  public void setApiVersion(String apiVersion) {
+    this.apiVersion = apiVersion;
+  }
+
+
+  public V1alpha1Acl kind(String kind) {
+    
+    this.kind = kind;
+    return this;
+  }
+
+   /**
+   * Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds
+   * @return kind
+  **/
+  @javax.annotation.Nullable
+  @ApiModelProperty(value = "Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds")
+
+  public String getKind() {
+    return kind;
+  }
+
+
+  public void setKind(String kind) {
+    this.kind = kind;
+  }
+
+
+  public V1alpha1Acl metadata(V1ObjectMeta metadata) {
+    
+    this.metadata = metadata;
+    return this;
+  }
+
+   /**
+   * Get metadata
+   * @return metadata
+  **/
+  @javax.annotation.Nullable
+  @ApiModelProperty(value = "")
+
+  public V1ObjectMeta getMetadata() {
+    return metadata;
+  }
+
+
+  public void setMetadata(V1ObjectMeta metadata) {
+    this.metadata = metadata;
+  }
+
+
+  public V1alpha1Acl spec(V1alpha1AclSpec spec) {
+    
+    this.spec = spec;
+    return this;
+  }
+
+   /**
+   * Get spec
+   * @return spec
+  **/
+  @javax.annotation.Nullable
+  @ApiModelProperty(value = "")
+
+  public V1alpha1AclSpec getSpec() {
+    return spec;
+  }
+
+
+  public void setSpec(V1alpha1AclSpec spec) {
+    this.spec = spec;
+  }
+
+
+  public V1alpha1Acl status(V1alpha1AclStatus status) {
+    
+    this.status = status;
+    return this;
+  }
+
+   /**
+   * Get status
+   * @return status
+  **/
+  @javax.annotation.Nullable
+  @ApiModelProperty(value = "")
+
+  public V1alpha1AclStatus getStatus() {
+    return status;
+  }
+
+
+  public void setStatus(V1alpha1AclStatus status) {
+    this.status = status;
+  }
+
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    V1alpha1Acl v1alpha1Acl = (V1alpha1Acl) o;
+    return Objects.equals(this.apiVersion, v1alpha1Acl.apiVersion) &&
+        Objects.equals(this.kind, v1alpha1Acl.kind) &&
+        Objects.equals(this.metadata, v1alpha1Acl.metadata) &&
+        Objects.equals(this.spec, v1alpha1Acl.spec) &&
+        Objects.equals(this.status, v1alpha1Acl.status);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(apiVersion, kind, metadata, spec, status);
+  }
+
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("class V1alpha1Acl {\n");
+    sb.append("    apiVersion: ").append(toIndentedString(apiVersion)).append("\n");
+    sb.append("    kind: ").append(toIndentedString(kind)).append("\n");
+    sb.append("    metadata: ").append(toIndentedString(metadata)).append("\n");
+    sb.append("    spec: ").append(toIndentedString(spec)).append("\n");
+    sb.append("    status: ").append(toIndentedString(status)).append("\n");
+    sb.append("}");
+    return sb.toString();
+  }
+
+  /**
+   * Convert the given object to string with each line indented by 4 spaces
+   * (except the first line).
+   */
+  private String toIndentedString(Object o) {
+    if (o == null) {
+      return "null";
+    }
+    return o.toString().replace("\n", "\n    ");
+  }
+
+}
+
diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclList.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclList.java
new file mode 100644
index 00000000..24178fcb
--- /dev/null
+++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclList.java
@@ -0,0 +1,195 @@
+/*
+ * Kubernetes
+ * No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator)
+ *
+ * The version of the OpenAPI document: v1.21.1
+ * 
+ *
+ * NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech).
+ * https://openapi-generator.tech
+ * Do not edit the class manually.
+ */
+
+
+package com.linkedin.hoptimator.models;
+
+import java.util.Objects;
+import java.util.Arrays;
+import com.google.gson.TypeAdapter;
+import com.google.gson.annotations.JsonAdapter;
+import com.google.gson.annotations.SerializedName;
+import com.google.gson.stream.JsonReader;
+import com.google.gson.stream.JsonWriter;
+import com.linkedin.hoptimator.models.V1alpha1Acl;
+import io.kubernetes.client.openapi.models.V1ListMeta;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * AclList is a list of Acl
+ */
+@ApiModel(description = "AclList is a list of Acl")
+@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]")
+public class V1alpha1AclList implements io.kubernetes.client.common.KubernetesListObject {
+  public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
+  @SerializedName(SERIALIZED_NAME_API_VERSION)
+  private String apiVersion;
+
+  public static final String SERIALIZED_NAME_ITEMS = "items";
+  @SerializedName(SERIALIZED_NAME_ITEMS)
+  private List<V1alpha1Acl> items = new ArrayList<>();
+
+  public static final String SERIALIZED_NAME_KIND = "kind";
+  @SerializedName(SERIALIZED_NAME_KIND)
+  private String kind;
+
+  public static final String SERIALIZED_NAME_METADATA = "metadata";
+  @SerializedName(SERIALIZED_NAME_METADATA)
+  private V1ListMeta metadata = null;
+
+
+  public V1alpha1AclList apiVersion(String apiVersion) {
+    
+    this.apiVersion = apiVersion;
+    return this;
+  }
+
+   /**
+   * APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources
+   * @return apiVersion
+  **/
+  @javax.annotation.Nullable
+  @ApiModelProperty(value = "APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources")
+
+  public String getApiVersion() {
+    return apiVersion;
+  }
+
+
+  public void setApiVersion(String apiVersion) {
+    this.apiVersion = apiVersion;
+  }
+
+
+  public V1alpha1AclList items(List<V1alpha1Acl> items) {
+    
+    this.items = items;
+    return this;
+  }
+
+  public V1alpha1AclList addItemsItem(V1alpha1Acl itemsItem) {
+    this.items.add(itemsItem);
+    return this;
+  }
+
+   /**
+   * List of acls. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md
+   * @return items
+  **/
+  @ApiModelProperty(required = true, value = "List of acls. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md")
+
+  public List<V1alpha1Acl> getItems() {
+    return items;
+  }
+
+
+  public void setItems(List<V1alpha1Acl> items) {
+    this.items = items;
+  }
+
+
+  public V1alpha1AclList kind(String kind) {
+    
+    this.kind = kind;
+    return this;
+  }
+
+   /**
+   * Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds
+   * @return kind
+  **/
+  @javax.annotation.Nullable
+  @ApiModelProperty(value = "Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds")
+
+  public String getKind() {
+    return kind;
+  }
+
+
+  public void setKind(String kind) {
+    this.kind = kind;
+  }
+
+
+  public V1alpha1AclList metadata(V1ListMeta metadata) {
+    
+    this.metadata = metadata;
+    return this;
+  }
+
+   /**
+   * Get metadata
+   * @return metadata
+  **/
+  @javax.annotation.Nullable
+  @ApiModelProperty(value = "")
+
+  public V1ListMeta getMetadata() {
+    return metadata;
+  }
+
+
+  public void setMetadata(V1ListMeta metadata) {
+    this.metadata = metadata;
+  }
+
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    V1alpha1AclList v1alpha1AclList = (V1alpha1AclList) o;
+    return Objects.equals(this.apiVersion, v1alpha1AclList.apiVersion) &&
+        Objects.equals(this.items, v1alpha1AclList.items) &&
+        Objects.equals(this.kind, v1alpha1AclList.kind) &&
+        Objects.equals(this.metadata, v1alpha1AclList.metadata);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(apiVersion, items, kind, metadata);
+  }
+
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("class V1alpha1AclList {\n");
+    sb.append("    apiVersion: ").append(toIndentedString(apiVersion)).append("\n");
+    sb.append("    items: ").append(toIndentedString(items)).append("\n");
+    sb.append("    kind: ").append(toIndentedString(kind)).append("\n");
+    sb.append("    metadata: ").append(toIndentedString(metadata)).append("\n");
+    sb.append("}");
+    return sb.toString();
+  }
+
+  /**
+   * Convert the given object to string with each line indented by 4 spaces
+   * (except the first line).
+   */
+  private String toIndentedString(Object o) {
+    if (o == null) {
+      return "null";
+    }
+    return o.toString().replace("\n", "\n    ");
+  }
+
+}
+
diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclSpec.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclSpec.java
new file mode 100644
index 00000000..b873b096
--- /dev/null
+++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclSpec.java
@@ -0,0 +1,239 @@
+/*
+ * Kubernetes
+ * No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator)
+ *
+ * The version of the OpenAPI document: v1.21.1
+ * 
+ *
+ * NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech).
+ * https://openapi-generator.tech
+ * Do not edit the class manually.
+ */
+
+
+package com.linkedin.hoptimator.models;
+
+import java.util.Objects;
+import java.util.Arrays;
+import com.google.gson.TypeAdapter;
+import com.google.gson.annotations.JsonAdapter;
+import com.google.gson.annotations.SerializedName;
+import com.google.gson.stream.JsonReader;
+import com.google.gson.stream.JsonWriter;
+import com.linkedin.hoptimator.models.V1alpha1AclSpecResource;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import java.io.IOException;
+
+/**
+ * A set of related ACL rules.
+ */
+@ApiModel(description = "A set of related ACL rules.")
+@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]")
+public class V1alpha1AclSpec {
+  /**
+   * The resource access method.
+   */
+  @JsonAdapter(MethodEnum.Adapter.class)
+  public enum MethodEnum {
+    ALTER("Alter"),
+    
+    CREATE("Create"),
+    
+    DELETE("Delete"),
+    
+    DESCRIBE("Describe"),
+    
+    READ("Read"),
+    
+    WRITE("Write"),
+    
+    POST("Post"),
+    
+    PUT("Put"),
+    
+    GET("Get"),
+    
+    HEAD("Head"),
+    
+    PATCH("Patch"),
+    
+    TRACE("Trace"),
+    
+    OPTIONS("Options"),
+    
+    GETALL("GetAll"),
+    
+    BATCHGET("BatchGet"),
+    
+    BATCHCREATE("BatchCreate"),
+    
+    BATCHUPDATE("BatchUpdate"),
+    
+    PARTIALUPDATE("PartialUpdate"),
+    
+    BATCHDELETE("BatchDelete"),
+    
+    BATCHPARTIALDELETE("BatchPartialDelete");
+
+    private String value;
+
+    MethodEnum(String value) {
+      this.value = value;
+    }
+
+    public String getValue() {
+      return value;
+    }
+
+    @Override
+    public String toString() {
+      return String.valueOf(value);
+    }
+
+    public static MethodEnum fromValue(String value) {
+      for (MethodEnum b : MethodEnum.values()) {
+        if (b.value.equals(value)) {
+          return b;
+        }
+      }
+      throw new IllegalArgumentException("Unexpected value '" + value + "'");
+    }
+
+    public static class Adapter extends TypeAdapter<MethodEnum> {
+      @Override
+      public void write(final JsonWriter jsonWriter, final MethodEnum enumeration) throws IOException {
+        jsonWriter.value(enumeration.getValue());
+      }
+
+      @Override
+      public MethodEnum read(final JsonReader jsonReader) throws IOException {
+        String value =  jsonReader.nextString();
+        return MethodEnum.fromValue(value);
+      }
+    }
+  }
+
+  public static final String SERIALIZED_NAME_METHOD = "method";
+  @SerializedName(SERIALIZED_NAME_METHOD)
+  private MethodEnum method;
+
+  public static final String SERIALIZED_NAME_PRINCIPAL = "principal";
+  @SerializedName(SERIALIZED_NAME_PRINCIPAL)
+  private String principal;
+
+  public static final String SERIALIZED_NAME_RESOURCE = "resource";
+  @SerializedName(SERIALIZED_NAME_RESOURCE)
+  private V1alpha1AclSpecResource resource;
+
+
+  public V1alpha1AclSpec method(MethodEnum method) {
+    
+    this.method = method;
+    return this;
+  }
+
+   /**
+   * The resource access method.
+   * @return method
+  **/
+  @ApiModelProperty(required = true, value = "The resource access method.")
+
+  public MethodEnum getMethod() {
+    return method;
+  }
+
+
+  public void setMethod(MethodEnum method) {
+    this.method = method;
+  }
+
+
+  public V1alpha1AclSpec principal(String principal) {
+    
+    this.principal = principal;
+    return this;
+  }
+
+   /**
+   * The principal being allowed access. Format depends on principal type.
+   * @return principal
+  **/
+  @ApiModelProperty(required = true, value = "The principal being allowed access. Format depends on principal type.")
+
+  public String getPrincipal() {
+    return principal;
+  }
+
+
+  public void setPrincipal(String principal) {
+    this.principal = principal;
+  }
+
+
+  public V1alpha1AclSpec resource(V1alpha1AclSpecResource resource) {
+    
+    this.resource = resource;
+    return this;
+  }
+
+   /**
+   * Get resource
+   * @return resource
+  **/
+  @ApiModelProperty(required = true, value = "")
+
+  public V1alpha1AclSpecResource getResource() {
+    return resource;
+  }
+
+
+  public void setResource(V1alpha1AclSpecResource resource) {
+    this.resource = resource;
+  }
+
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    V1alpha1AclSpec v1alpha1AclSpec = (V1alpha1AclSpec) o;
+    return Objects.equals(this.method, v1alpha1AclSpec.method) &&
+        Objects.equals(this.principal, v1alpha1AclSpec.principal) &&
+        Objects.equals(this.resource, v1alpha1AclSpec.resource);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(method, principal, resource);
+  }
+
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("class V1alpha1AclSpec {\n");
+    sb.append("    method: ").append(toIndentedString(method)).append("\n");
+    sb.append("    principal: ").append(toIndentedString(principal)).append("\n");
+    sb.append("    resource: ").append(toIndentedString(resource)).append("\n");
+    sb.append("}");
+    return sb.toString();
+  }
+
+  /**
+   * Convert the given object to string with each line indented by 4 spaces
+   * (except the first line).
+   */
+  private String toIndentedString(Object o) {
+    if (o == null) {
+      return "null";
+    }
+    return o.toString().replace("\n", "\n    ");
+  }
+
+}
+
diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclSpecResource.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclSpecResource.java
new file mode 100644
index 00000000..430e70af
--- /dev/null
+++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclSpecResource.java
@@ -0,0 +1,129 @@
+/*
+ * Kubernetes
+ * No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator)
+ *
+ * The version of the OpenAPI document: v1.21.1
+ * 
+ *
+ * NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech).
+ * https://openapi-generator.tech
+ * Do not edit the class manually.
+ */
+
+
+package com.linkedin.hoptimator.models;
+
+import java.util.Objects;
+import java.util.Arrays;
+import com.google.gson.TypeAdapter;
+import com.google.gson.annotations.JsonAdapter;
+import com.google.gson.annotations.SerializedName;
+import com.google.gson.stream.JsonReader;
+import com.google.gson.stream.JsonWriter;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import java.io.IOException;
+
+/**
+ * The resource being controlled.
+ */
+@ApiModel(description = "The resource being controlled.")
+@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]")
+public class V1alpha1AclSpecResource {
+  public static final String SERIALIZED_NAME_KIND = "kind";
+  @SerializedName(SERIALIZED_NAME_KIND)
+  private String kind;
+
+  public static final String SERIALIZED_NAME_NAME = "name";
+  @SerializedName(SERIALIZED_NAME_NAME)
+  private String name;
+
+
+  public V1alpha1AclSpecResource kind(String kind) {
+    
+    this.kind = kind;
+    return this;
+  }
+
+   /**
+   * The kind of resource being controlled.
+   * @return kind
+  **/
+  @javax.annotation.Nullable
+  @ApiModelProperty(value = "The kind of resource being controlled.")
+
+  public String getKind() {
+    return kind;
+  }
+
+
+  public void setKind(String kind) {
+    this.kind = kind;
+  }
+
+
+  public V1alpha1AclSpecResource name(String name) {
+    
+    this.name = name;
+    return this;
+  }
+
+   /**
+   * The name of the resource being controlled.
+   * @return name
+  **/
+  @javax.annotation.Nullable
+  @ApiModelProperty(value = "The name of the resource being controlled.")
+
+  public String getName() {
+    return name;
+  }
+
+
+  public void setName(String name) {
+    this.name = name;
+  }
+
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    V1alpha1AclSpecResource v1alpha1AclSpecResource = (V1alpha1AclSpecResource) o;
+    return Objects.equals(this.kind, v1alpha1AclSpecResource.kind) &&
+        Objects.equals(this.name, v1alpha1AclSpecResource.name);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(kind, name);
+  }
+
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("class V1alpha1AclSpecResource {\n");
+    sb.append("    kind: ").append(toIndentedString(kind)).append("\n");
+    sb.append("    name: ").append(toIndentedString(name)).append("\n");
+    sb.append("}");
+    return sb.toString();
+  }
+
+  /**
+   * Convert the given object to string with each line indented by 4 spaces
+   * (except the first line).
+   */
+  private String toIndentedString(Object o) {
+    if (o == null) {
+      return "null";
+    }
+    return o.toString().replace("\n", "\n    ");
+  }
+
+}
+
diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclStatus.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclStatus.java
new file mode 100644
index 00000000..9b38615f
--- /dev/null
+++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1AclStatus.java
@@ -0,0 +1,129 @@
+/*
+ * Kubernetes
+ * No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator)
+ *
+ * The version of the OpenAPI document: v1.21.1
+ * 
+ *
+ * NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech).
+ * https://openapi-generator.tech
+ * Do not edit the class manually.
+ */
+
+
+package com.linkedin.hoptimator.models;
+
+import java.util.Objects;
+import java.util.Arrays;
+import com.google.gson.TypeAdapter;
+import com.google.gson.annotations.JsonAdapter;
+import com.google.gson.annotations.SerializedName;
+import com.google.gson.stream.JsonReader;
+import com.google.gson.stream.JsonWriter;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import java.io.IOException;
+
+/**
+ * Status, as set by the operator.
+ */
+@ApiModel(description = "Status, as set by the operator.")
+@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]")
+public class V1alpha1AclStatus {
+  public static final String SERIALIZED_NAME_MESSAGE = "message";
+  @SerializedName(SERIALIZED_NAME_MESSAGE)
+  private String message;
+
+  public static final String SERIALIZED_NAME_READY = "ready";
+  @SerializedName(SERIALIZED_NAME_READY)
+  private Boolean ready;
+
+
+  public V1alpha1AclStatus message(String message) {
+    
+    this.message = message;
+    return this;
+  }
+
+   /**
+   * Human-readable status message.
+   * @return message
+  **/
+  @javax.annotation.Nullable
+  @ApiModelProperty(value = "Human-readable status message.")
+
+  public String getMessage() {
+    return message;
+  }
+
+
+  public void setMessage(String message) {
+    this.message = message;
+  }
+
+
+  public V1alpha1AclStatus ready(Boolean ready) {
+    
+    this.ready = ready;
+    return this;
+  }
+
+   /**
+   * Whether the ACL rule has been applied.
+   * @return ready
+  **/
+  @javax.annotation.Nullable
+  @ApiModelProperty(value = "Whether the ACL rule has been applied.")
+
+  public Boolean getReady() {
+    return ready;
+  }
+
+
+  public void setReady(Boolean ready) {
+    this.ready = ready;
+  }
+
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    V1alpha1AclStatus v1alpha1AclStatus = (V1alpha1AclStatus) o;
+    return Objects.equals(this.message, v1alpha1AclStatus.message) &&
+        Objects.equals(this.ready, v1alpha1AclStatus.ready);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(message, ready);
+  }
+
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("class V1alpha1AclStatus {\n");
+    sb.append("    message: ").append(toIndentedString(message)).append("\n");
+    sb.append("    ready: ").append(toIndentedString(ready)).append("\n");
+    sb.append("}");
+    return sb.toString();
+  }
+
+  /**
+   * Convert the given object to string with each line indented by 4 spaces
+   * (except the first line).
+   */
+  private String toIndentedString(Object o) {
+    if (o == null) {
+      return "null";
+    }
+    return o.toString().replace("\n", "\n    ");
+  }
+
+}
+
diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopic.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopic.java
new file mode 100644
index 00000000..68380481
--- /dev/null
+++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopic.java
@@ -0,0 +1,219 @@
+/*
+ * Kubernetes
+ * No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator)
+ *
+ * The version of the OpenAPI document: v1.21.1
+ * 
+ *
+ * NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech).
+ * https://openapi-generator.tech
+ * Do not edit the class manually.
+ */
+
+
+package com.linkedin.hoptimator.models;
+
+import java.util.Objects;
+import java.util.Arrays;
+import com.google.gson.TypeAdapter;
+import com.google.gson.annotations.JsonAdapter;
+import com.google.gson.annotations.SerializedName;
+import com.google.gson.stream.JsonReader;
+import com.google.gson.stream.JsonWriter;
+import com.linkedin.hoptimator.models.V1alpha1KafkaTopicSpec;
+import com.linkedin.hoptimator.models.V1alpha1KafkaTopicStatus;
+import io.kubernetes.client.openapi.models.V1ObjectMeta;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import java.io.IOException;
+
+/**
+ * Kafka Topic
+ */
+@ApiModel(description = "Kafka Topic")
+@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]")
+public class V1alpha1KafkaTopic implements io.kubernetes.client.common.KubernetesObject {
+  public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
+  @SerializedName(SERIALIZED_NAME_API_VERSION)
+  private String apiVersion;
+
+  public static final String SERIALIZED_NAME_KIND = "kind";
+  @SerializedName(SERIALIZED_NAME_KIND)
+  private String kind;
+
+  public static final String SERIALIZED_NAME_METADATA = "metadata";
+  @SerializedName(SERIALIZED_NAME_METADATA)
+  private V1ObjectMeta metadata = null;
+
+  public static final String SERIALIZED_NAME_SPEC = "spec";
+  @SerializedName(SERIALIZED_NAME_SPEC)
+  private V1alpha1KafkaTopicSpec spec;
+
+  public static final String SERIALIZED_NAME_STATUS = "status";
+  @SerializedName(SERIALIZED_NAME_STATUS)
+  private V1alpha1KafkaTopicStatus status;
+
+
+  public V1alpha1KafkaTopic apiVersion(String apiVersion) {
+    
+    this.apiVersion = apiVersion;
+    return this;
+  }
+
+   /**
+   * APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources
+   * @return apiVersion
+  **/
+  @javax.annotation.Nullable
+  @ApiModelProperty(value = "APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources")
+
+  public String getApiVersion() {
+    return apiVersion;
+  }
+
+
+  public void setApiVersion(String apiVersion) {
+    this.apiVersion = apiVersion;
+  }
+
+
+  public V1alpha1KafkaTopic kind(String kind) {
+    
+    this.kind = kind;
+    return this;
+  }
+
+   /**
+   * Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds
+   * @return kind
+  **/
+  @javax.annotation.Nullable
+  @ApiModelProperty(value = "Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds")
+
+  public String getKind() {
+    return kind;
+  }
+
+
+  public void setKind(String kind) {
+    this.kind = kind;
+  }
+
+
+  public V1alpha1KafkaTopic metadata(V1ObjectMeta metadata) {
+    
+    this.metadata = metadata;
+    return this;
+  }
+
+   /**
+   * Get metadata
+   * @return metadata
+  **/
+  @javax.annotation.Nullable
+  @ApiModelProperty(value = "")
+
+  public V1ObjectMeta getMetadata() {
+    return metadata;
+  }
+
+
+  public void setMetadata(V1ObjectMeta metadata) {
+    this.metadata = metadata;
+  }
+
+
+  public V1alpha1KafkaTopic spec(V1alpha1KafkaTopicSpec spec) {
+    
+    this.spec = spec;
+    return this;
+  }
+
+   /**
+   * Get spec
+   * @return spec
+  **/
+  @javax.annotation.Nullable
+  @ApiModelProperty(value = "")
+
+  public V1alpha1KafkaTopicSpec getSpec() {
+    return spec;
+  }
+
+
+  public void setSpec(V1alpha1KafkaTopicSpec spec) {
+    this.spec = spec;
+  }
+
+
+  public V1alpha1KafkaTopic status(V1alpha1KafkaTopicStatus status) {
+    
+    this.status = status;
+    return this;
+  }
+
+   /**
+   * Get status
+   * @return status
+  **/
+  @javax.annotation.Nullable
+  @ApiModelProperty(value = "")
+
+  public V1alpha1KafkaTopicStatus getStatus() {
+    return status;
+  }
+
+
+  public void setStatus(V1alpha1KafkaTopicStatus status) {
+    this.status = status;
+  }
+
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    V1alpha1KafkaTopic v1alpha1KafkaTopic = (V1alpha1KafkaTopic) o;
+    return Objects.equals(this.apiVersion, v1alpha1KafkaTopic.apiVersion) &&
+        Objects.equals(this.kind, v1alpha1KafkaTopic.kind) &&
+        Objects.equals(this.metadata, v1alpha1KafkaTopic.metadata) &&
+        Objects.equals(this.spec, v1alpha1KafkaTopic.spec) &&
+        Objects.equals(this.status, v1alpha1KafkaTopic.status);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(apiVersion, kind, metadata, spec, status);
+  }
+
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("class V1alpha1KafkaTopic {\n");
+    sb.append("    apiVersion: ").append(toIndentedString(apiVersion)).append("\n");
+    sb.append("    kind: ").append(toIndentedString(kind)).append("\n");
+    sb.append("    metadata: ").append(toIndentedString(metadata)).append("\n");
+    sb.append("    spec: ").append(toIndentedString(spec)).append("\n");
+    sb.append("    status: ").append(toIndentedString(status)).append("\n");
+    sb.append("}");
+    return sb.toString();
+  }
+
+  /**
+   * Convert the given object to string with each line indented by 4 spaces
+   * (except the first line).
+   */
+  private String toIndentedString(Object o) {
+    if (o == null) {
+      return "null";
+    }
+    return o.toString().replace("\n", "\n    ");
+  }
+
+}
+
diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicList.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicList.java
new file mode 100644
index 00000000..f4429ac4
--- /dev/null
+++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicList.java
@@ -0,0 +1,195 @@
+/*
+ * Kubernetes
+ * No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator)
+ *
+ * The version of the OpenAPI document: v1.21.1
+ * 
+ *
+ * NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech).
+ * https://openapi-generator.tech
+ * Do not edit the class manually.
+ */
+
+
+package com.linkedin.hoptimator.models;
+
+import java.util.Objects;
+import java.util.Arrays;
+import com.google.gson.TypeAdapter;
+import com.google.gson.annotations.JsonAdapter;
+import com.google.gson.annotations.SerializedName;
+import com.google.gson.stream.JsonReader;
+import com.google.gson.stream.JsonWriter;
+import com.linkedin.hoptimator.models.V1alpha1KafkaTopic;
+import io.kubernetes.client.openapi.models.V1ListMeta;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * KafkaTopicList is a list of KafkaTopic
+ */
+@ApiModel(description = "KafkaTopicList is a list of KafkaTopic")
+@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]")
+public class V1alpha1KafkaTopicList implements io.kubernetes.client.common.KubernetesListObject {
+  public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
+  @SerializedName(SERIALIZED_NAME_API_VERSION)
+  private String apiVersion;
+
+  public static final String SERIALIZED_NAME_ITEMS = "items";
+  @SerializedName(SERIALIZED_NAME_ITEMS)
+  private List<V1alpha1KafkaTopic> items = new ArrayList<>();
+
+  public static final String SERIALIZED_NAME_KIND = "kind";
+  @SerializedName(SERIALIZED_NAME_KIND)
+  private String kind;
+
+  public static final String SERIALIZED_NAME_METADATA = "metadata";
+  @SerializedName(SERIALIZED_NAME_METADATA)
+  private V1ListMeta metadata = null;
+
+
+  public V1alpha1KafkaTopicList apiVersion(String apiVersion) {
+    
+    this.apiVersion = apiVersion;
+    return this;
+  }
+
+   /**
+   * APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources
+   * @return apiVersion
+  **/
+  @javax.annotation.Nullable
+  @ApiModelProperty(value = "APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources")
+
+  public String getApiVersion() {
+    return apiVersion;
+  }
+
+
+  public void setApiVersion(String apiVersion) {
+    this.apiVersion = apiVersion;
+  }
+
+
+  public V1alpha1KafkaTopicList items(List<V1alpha1KafkaTopic> items) {
+    
+    this.items = items;
+    return this;
+  }
+
+  public V1alpha1KafkaTopicList addItemsItem(V1alpha1KafkaTopic itemsItem) {
+    this.items.add(itemsItem);
+    return this;
+  }
+
+   /**
+   * List of kafkatopics. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md
+   * @return items
+  **/
+  @ApiModelProperty(required = true, value = "List of kafkatopics. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md")
+
+  public List<V1alpha1KafkaTopic> getItems() {
+    return items;
+  }
+
+
+  public void setItems(List<V1alpha1KafkaTopic> items) {
+    this.items = items;
+  }
+
+
+  public V1alpha1KafkaTopicList kind(String kind) {
+    
+    this.kind = kind;
+    return this;
+  }
+
+   /**
+   * Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds
+   * @return kind
+  **/
+  @javax.annotation.Nullable
+  @ApiModelProperty(value = "Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds")
+
+  public String getKind() {
+    return kind;
+  }
+
+
+  public void setKind(String kind) {
+    this.kind = kind;
+  }
+
+
+  public V1alpha1KafkaTopicList metadata(V1ListMeta metadata) {
+    
+    this.metadata = metadata;
+    return this;
+  }
+
+   /**
+   * Get metadata
+   * @return metadata
+  **/
+  @javax.annotation.Nullable
+  @ApiModelProperty(value = "")
+
+  public V1ListMeta getMetadata() {
+    return metadata;
+  }
+
+
+  public void setMetadata(V1ListMeta metadata) {
+    this.metadata = metadata;
+  }
+
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    V1alpha1KafkaTopicList v1alpha1KafkaTopicList = (V1alpha1KafkaTopicList) o;
+    return Objects.equals(this.apiVersion, v1alpha1KafkaTopicList.apiVersion) &&
+        Objects.equals(this.items, v1alpha1KafkaTopicList.items) &&
+        Objects.equals(this.kind, v1alpha1KafkaTopicList.kind) &&
+        Objects.equals(this.metadata, v1alpha1KafkaTopicList.metadata);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(apiVersion, items, kind, metadata);
+  }
+
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("class V1alpha1KafkaTopicList {\n");
+    sb.append("    apiVersion: ").append(toIndentedString(apiVersion)).append("\n");
+    sb.append("    items: ").append(toIndentedString(items)).append("\n");
+    sb.append("    kind: ").append(toIndentedString(kind)).append("\n");
+    sb.append("    metadata: ").append(toIndentedString(metadata)).append("\n");
+    sb.append("}");
+    return sb.toString();
+  }
+
+  /**
+   * Convert the given object to string with each line indented by 4 spaces
+   * (except the first line).
+   */
+  private String toIndentedString(Object o) {
+    if (o == null) {
+      return "null";
+    }
+    return o.toString().replace("\n", "\n    ");
+  }
+
+}
+
diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpec.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpec.java
new file mode 100644
index 00000000..8a3588ef
--- /dev/null
+++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpec.java
@@ -0,0 +1,273 @@
+/*
+ * Kubernetes
+ * No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator)
+ *
+ * The version of the OpenAPI document: v1.21.1
+ * 
+ *
+ * NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech).
+ * https://openapi-generator.tech
+ * Do not edit the class manually.
+ */
+
+
+package com.linkedin.hoptimator.models;
+
+import java.util.Objects;
+import java.util.Arrays;
+import com.google.gson.TypeAdapter;
+import com.google.gson.annotations.JsonAdapter;
+import com.google.gson.annotations.SerializedName;
+import com.google.gson.stream.JsonReader;
+import com.google.gson.stream.JsonWriter;
+import com.linkedin.hoptimator.models.V1alpha1KafkaTopicSpecClientConfigs;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Desired Kafka topic configuration.
+ */
+@ApiModel(description = "Desired Kafka topic configuration.")
+@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]")
+public class V1alpha1KafkaTopicSpec {
+  public static final String SERIALIZED_NAME_CLIENT_CONFIGS = "clientConfigs";
+  @SerializedName(SERIALIZED_NAME_CLIENT_CONFIGS)
+  private List<V1alpha1KafkaTopicSpecClientConfigs> clientConfigs = null;
+
+  public static final String SERIALIZED_NAME_CLIENT_OVERRIDES = "clientOverrides";
+  @SerializedName(SERIALIZED_NAME_CLIENT_OVERRIDES)
+  private Map<String, String> clientOverrides = null;
+
+  public static final String SERIALIZED_NAME_CONFIGS = "configs";
+  @SerializedName(SERIALIZED_NAME_CONFIGS)
+  private Map<String, String> configs = null;
+
+  public static final String SERIALIZED_NAME_NUM_PARTITIONS = "numPartitions";
+  @SerializedName(SERIALIZED_NAME_NUM_PARTITIONS)
+  private Integer numPartitions;
+
+  public static final String SERIALIZED_NAME_REPLICATION_FACTOR = "replicationFactor";
+  @SerializedName(SERIALIZED_NAME_REPLICATION_FACTOR)
+  private Integer replicationFactor;
+
+  public static final String SERIALIZED_NAME_TOPIC_NAME = "topicName";
+  @SerializedName(SERIALIZED_NAME_TOPIC_NAME)
+  private String topicName;
+
+
+  public V1alpha1KafkaTopicSpec clientConfigs(List<V1alpha1KafkaTopicSpecClientConfigs> clientConfigs) {
+    
+    this.clientConfigs = clientConfigs;
+    return this;
+  }
+
+  public V1alpha1KafkaTopicSpec addClientConfigsItem(V1alpha1KafkaTopicSpecClientConfigs clientConfigsItem) {
+    if (this.clientConfigs == null) {
+      this.clientConfigs = new ArrayList<>();
+    }
+    this.clientConfigs.add(clientConfigsItem);
+    return this;
+  }
+
+   /**
+   * ConfigMaps for AdminClient configuration.
+   * @return clientConfigs
+  **/
+  @javax.annotation.Nullable
+  @ApiModelProperty(value = "ConfigMaps for AdminClient configuration.")
+
+  public List<V1alpha1KafkaTopicSpecClientConfigs> getClientConfigs() {
+    return clientConfigs;
+  }
+
+
+  public void setClientConfigs(List<V1alpha1KafkaTopicSpecClientConfigs> clientConfigs) {
+    this.clientConfigs = clientConfigs;
+  }
+
+
+  public V1alpha1KafkaTopicSpec clientOverrides(Map<String, String> clientOverrides) {
+    
+    this.clientOverrides = clientOverrides;
+    return this;
+  }
+
+  public V1alpha1KafkaTopicSpec putClientOverridesItem(String key, String clientOverridesItem) {
+    if (this.clientOverrides == null) {
+      this.clientOverrides = new HashMap<>();
+    }
+    this.clientOverrides.put(key, clientOverridesItem);
+    return this;
+  }
+
+   /**
+   * AdminClient overrides.
+   * @return clientOverrides
+  **/
+  @javax.annotation.Nullable
+  @ApiModelProperty(value = "AdminClient overrides.")
+
+  public Map<String, String> getClientOverrides() {
+    return clientOverrides;
+  }
+
+
+  public void setClientOverrides(Map<String, String> clientOverrides) {
+    this.clientOverrides = clientOverrides;
+  }
+
+
+  public V1alpha1KafkaTopicSpec configs(Map<String, String> configs) {
+    
+    this.configs = configs;
+    return this;
+  }
+
+  public V1alpha1KafkaTopicSpec putConfigsItem(String key, String configsItem) {
+    if (this.configs == null) {
+      this.configs = new HashMap<>();
+    }
+    this.configs.put(key, configsItem);
+    return this;
+  }
+
+   /**
+   * Topic configurations.
+   * @return configs
+  **/
+  @javax.annotation.Nullable
+  @ApiModelProperty(value = "Topic configurations.")
+
+  public Map<String, String> getConfigs() {
+    return configs;
+  }
+
+
+  public void setConfigs(Map<String, String> configs) {
+    this.configs = configs;
+  }
+
+
+  public V1alpha1KafkaTopicSpec numPartitions(Integer numPartitions) {
+    
+    this.numPartitions = numPartitions;
+    return this;
+  }
+
+   /**
+   * Number of partitions the topic should have. By default, the cluster decides.
+   * @return numPartitions
+  **/
+  @javax.annotation.Nullable
+  @ApiModelProperty(value = "Number of partitions the topic should have. By default, the cluster decides.")
+
+  public Integer getNumPartitions() {
+    return numPartitions;
+  }
+
+
+  public void setNumPartitions(Integer numPartitions) {
+    this.numPartitions = numPartitions;
+  }
+
+
+  public V1alpha1KafkaTopicSpec replicationFactor(Integer replicationFactor) {
+    
+    this.replicationFactor = replicationFactor;
+    return this;
+  }
+
+   /**
+   * The replication factor the topic should have. By default, the cluster decides.
+   * @return replicationFactor
+  **/
+  @javax.annotation.Nullable
+  @ApiModelProperty(value = "The replication factor the topic should have. By default, the cluster decides.")
+
+  public Integer getReplicationFactor() {
+    return replicationFactor;
+  }
+
+
+  public void setReplicationFactor(Integer replicationFactor) {
+    this.replicationFactor = replicationFactor;
+  }
+
+
+  public V1alpha1KafkaTopicSpec topicName(String topicName) {
+    
+    this.topicName = topicName;
+    return this;
+  }
+
+   /**
+   * The topic name.
+   * @return topicName
+  **/
+  @ApiModelProperty(required = true, value = "The topic name.")
+
+  public String getTopicName() {
+    return topicName;
+  }
+
+
+  public void setTopicName(String topicName) {
+    this.topicName = topicName;
+  }
+
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    V1alpha1KafkaTopicSpec v1alpha1KafkaTopicSpec = (V1alpha1KafkaTopicSpec) o;
+    return Objects.equals(this.clientConfigs, v1alpha1KafkaTopicSpec.clientConfigs) &&
+        Objects.equals(this.clientOverrides, v1alpha1KafkaTopicSpec.clientOverrides) &&
+        Objects.equals(this.configs, v1alpha1KafkaTopicSpec.configs) &&
+        Objects.equals(this.numPartitions, v1alpha1KafkaTopicSpec.numPartitions) &&
+        Objects.equals(this.replicationFactor, v1alpha1KafkaTopicSpec.replicationFactor) &&
+        Objects.equals(this.topicName, v1alpha1KafkaTopicSpec.topicName);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(clientConfigs, clientOverrides, configs, numPartitions, replicationFactor, topicName);
+  }
+
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("class V1alpha1KafkaTopicSpec {\n");
+    sb.append("    clientConfigs: ").append(toIndentedString(clientConfigs)).append("\n");
+    sb.append("    clientOverrides: ").append(toIndentedString(clientOverrides)).append("\n");
+    sb.append("    configs: ").append(toIndentedString(configs)).append("\n");
+    sb.append("    numPartitions: ").append(toIndentedString(numPartitions)).append("\n");
+    sb.append("    replicationFactor: ").append(toIndentedString(replicationFactor)).append("\n");
+    sb.append("    topicName: ").append(toIndentedString(topicName)).append("\n");
+    sb.append("}");
+    return sb.toString();
+  }
+
+  /**
+   * Convert the given object to string with each line indented by 4 spaces
+   * (except the first line).
+   */
+  private String toIndentedString(Object o) {
+    if (o == null) {
+      return "null";
+    }
+    return o.toString().replace("\n", "\n    ");
+  }
+
+}
+
diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpecClientConfigs.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpecClientConfigs.java
new file mode 100644
index 00000000..27536ba1
--- /dev/null
+++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpecClientConfigs.java
@@ -0,0 +1,100 @@
+/*
+ * Kubernetes
+ * No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator)
+ *
+ * The version of the OpenAPI document: v1.21.1
+ * 
+ *
+ * NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech).
+ * https://openapi-generator.tech
+ * Do not edit the class manually.
+ */
+
+
+package com.linkedin.hoptimator.models;
+
+import java.util.Objects;
+import java.util.Arrays;
+import com.google.gson.TypeAdapter;
+import com.google.gson.annotations.JsonAdapter;
+import com.google.gson.annotations.SerializedName;
+import com.google.gson.stream.JsonReader;
+import com.google.gson.stream.JsonWriter;
+import com.linkedin.hoptimator.models.V1alpha1KafkaTopicSpecConfigMapRef;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import java.io.IOException;
+
+/**
+ * V1alpha1KafkaTopicSpecClientConfigs
+ */
+@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]")
+public class V1alpha1KafkaTopicSpecClientConfigs {
+  public static final String SERIALIZED_NAME_CONFIG_MAP_REF = "configMapRef";
+  @SerializedName(SERIALIZED_NAME_CONFIG_MAP_REF)
+  private V1alpha1KafkaTopicSpecConfigMapRef configMapRef;
+
+
+  public V1alpha1KafkaTopicSpecClientConfigs configMapRef(V1alpha1KafkaTopicSpecConfigMapRef configMapRef) {
+    
+    this.configMapRef = configMapRef;
+    return this;
+  }
+
+   /**
+   * Get configMapRef
+   * @return configMapRef
+  **/
+  @javax.annotation.Nullable
+  @ApiModelProperty(value = "")
+
+  public V1alpha1KafkaTopicSpecConfigMapRef getConfigMapRef() {
+    return configMapRef;
+  }
+
+
+  public void setConfigMapRef(V1alpha1KafkaTopicSpecConfigMapRef configMapRef) {
+    this.configMapRef = configMapRef;
+  }
+
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    V1alpha1KafkaTopicSpecClientConfigs v1alpha1KafkaTopicSpecClientConfigs = (V1alpha1KafkaTopicSpecClientConfigs) o;
+    return Objects.equals(this.configMapRef, v1alpha1KafkaTopicSpecClientConfigs.configMapRef);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(configMapRef);
+  }
+
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("class V1alpha1KafkaTopicSpecClientConfigs {\n");
+    sb.append("    configMapRef: ").append(toIndentedString(configMapRef)).append("\n");
+    sb.append("}");
+    return sb.toString();
+  }
+
+  /**
+   * Convert the given object to string with each line indented by 4 spaces
+   * (except the first line).
+   */
+  private String toIndentedString(Object o) {
+    if (o == null) {
+      return "null";
+    }
+    return o.toString().replace("\n", "\n    ");
+  }
+
+}
+
diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpecConfigMapRef.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpecConfigMapRef.java
new file mode 100644
index 00000000..5e999aa7
--- /dev/null
+++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpecConfigMapRef.java
@@ -0,0 +1,99 @@
+/*
+ * Kubernetes
+ * No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator)
+ *
+ * The version of the OpenAPI document: v1.21.1
+ * 
+ *
+ * NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech).
+ * https://openapi-generator.tech
+ * Do not edit the class manually.
+ */
+
+
+package com.linkedin.hoptimator.models;
+
+import java.util.Objects;
+import java.util.Arrays;
+import com.google.gson.TypeAdapter;
+import com.google.gson.annotations.JsonAdapter;
+import com.google.gson.annotations.SerializedName;
+import com.google.gson.stream.JsonReader;
+import com.google.gson.stream.JsonWriter;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import java.io.IOException;
+
+/**
+ * Reference to a ConfigMap to use for AdminClient configuration.
+ */
+@ApiModel(description = "Reference to a ConfigMap to use for AdminClient configuration.")
+@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]")
+public class V1alpha1KafkaTopicSpecConfigMapRef {
+  public static final String SERIALIZED_NAME_NAME = "name";
+  @SerializedName(SERIALIZED_NAME_NAME)
+  private String name;
+
+
+  public V1alpha1KafkaTopicSpecConfigMapRef name(String name) {
+    
+    this.name = name;
+    return this;
+  }
+
+   /**
+   * Name of ConfigMap to use for AdminClient configuration.
+   * @return name
+  **/
+  @ApiModelProperty(required = true, value = "Name of ConfigMap to use for AdminClient configuration.")
+
+  public String getName() {
+    return name;
+  }
+
+
+  public void setName(String name) {
+    this.name = name;
+  }
+
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    V1alpha1KafkaTopicSpecConfigMapRef v1alpha1KafkaTopicSpecConfigMapRef = (V1alpha1KafkaTopicSpecConfigMapRef) o;
+    return Objects.equals(this.name, v1alpha1KafkaTopicSpecConfigMapRef.name);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(name);
+  }
+
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("class V1alpha1KafkaTopicSpecConfigMapRef {\n");
+    sb.append("    name: ").append(toIndentedString(name)).append("\n");
+    sb.append("}");
+    return sb.toString();
+  }
+
+  /**
+   * Convert the given object to string with each line indented by 4 spaces
+   * (except the first line).
+   */
+  private String toIndentedString(Object o) {
+    if (o == null) {
+      return "null";
+    }
+    return o.toString().replace("\n", "\n    ");
+  }
+
+}
+
diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicStatus.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicStatus.java
new file mode 100644
index 00000000..4cc046d9
--- /dev/null
+++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicStatus.java
@@ -0,0 +1,158 @@
+/*
+ * Kubernetes
+ * No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator)
+ *
+ * The version of the OpenAPI document: v1.21.1
+ * 
+ *
+ * NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech).
+ * https://openapi-generator.tech
+ * Do not edit the class manually.
+ */
+
+
+package com.linkedin.hoptimator.models;
+
+import java.util.Objects;
+import java.util.Arrays;
+import com.google.gson.TypeAdapter;
+import com.google.gson.annotations.JsonAdapter;
+import com.google.gson.annotations.SerializedName;
+import com.google.gson.stream.JsonReader;
+import com.google.gson.stream.JsonWriter;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import java.io.IOException;
+
+/**
+ * Current state of the topic.
+ */
+@ApiModel(description = "Current state of the topic.")
+@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]")
+public class V1alpha1KafkaTopicStatus {
+  public static final String SERIALIZED_NAME_MESSAGE = "message";
+  @SerializedName(SERIALIZED_NAME_MESSAGE)
+  private String message;
+
+  public static final String SERIALIZED_NAME_NUM_PARTITIONS = "numPartitions";
+  @SerializedName(SERIALIZED_NAME_NUM_PARTITIONS)
+  private Integer numPartitions;
+
+  public static final String SERIALIZED_NAME_READY = "ready";
+  @SerializedName(SERIALIZED_NAME_READY)
+  private Boolean ready;
+
+
+  public V1alpha1KafkaTopicStatus message(String message) {
+    
+    this.message = message;
+    return this;
+  }
+
+   /**
+   * Error or success message, for information only.
+   * @return message
+  **/
+  @javax.annotation.Nullable
+  @ApiModelProperty(value = "Error or success message, for information only.")
+
+  public String getMessage() {
+    return message;
+  }
+
+
+  public void setMessage(String message) {
+    this.message = message;
+  }
+
+
+  public V1alpha1KafkaTopicStatus numPartitions(Integer numPartitions) {
+    
+    this.numPartitions = numPartitions;
+    return this;
+  }
+
+   /**
+   * Actual number of partitions the topic has when last checked.
+   * @return numPartitions
+  **/
+  @javax.annotation.Nullable
+  @ApiModelProperty(value = "Actual number of partitions the topic has when last checked.")
+
+  public Integer getNumPartitions() {
+    return numPartitions;
+  }
+
+
+  public void setNumPartitions(Integer numPartitions) {
+    this.numPartitions = numPartitions;
+  }
+
+
+  public V1alpha1KafkaTopicStatus ready(Boolean ready) {
+    
+    this.ready = ready;
+    return this;
+  }
+
+   /**
+   * Whether the requested topic has been created.
+   * @return ready
+  **/
+  @javax.annotation.Nullable
+  @ApiModelProperty(value = "Whether the requested topic has been created.")
+
+  public Boolean getReady() {
+    return ready;
+  }
+
+
+  public void setReady(Boolean ready) {
+    this.ready = ready;
+  }
+
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    V1alpha1KafkaTopicStatus v1alpha1KafkaTopicStatus = (V1alpha1KafkaTopicStatus) o;
+    return Objects.equals(this.message, v1alpha1KafkaTopicStatus.message) &&
+        Objects.equals(this.numPartitions, v1alpha1KafkaTopicStatus.numPartitions) &&
+        Objects.equals(this.ready, v1alpha1KafkaTopicStatus.ready);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(message, numPartitions, ready);
+  }
+
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("class V1alpha1KafkaTopicStatus {\n");
+    sb.append("    message: ").append(toIndentedString(message)).append("\n");
+    sb.append("    numPartitions: ").append(toIndentedString(numPartitions)).append("\n");
+    sb.append("    ready: ").append(toIndentedString(ready)).append("\n");
+    sb.append("}");
+    return sb.toString();
+  }
+
+  /**
+   * Convert the given object to string with each line indented by 4 spaces
+   * (except the first line).
+   */
+  private String toIndentedString(Object o) {
+    if (o == null) {
+      return "null";
+    }
+    return o.toString().replace("\n", "\n    ");
+  }
+
+}
+
diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1Subscription.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1Subscription.java
new file mode 100644
index 00000000..013d6f6a
--- /dev/null
+++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1Subscription.java
@@ -0,0 +1,219 @@
+/*
+ * Kubernetes
+ * No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator)
+ *
+ * The version of the OpenAPI document: v1.21.1
+ * 
+ *
+ * NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech).
+ * https://openapi-generator.tech
+ * Do not edit the class manually.
+ */
+
+
+package com.linkedin.hoptimator.models;
+
+import java.util.Objects;
+import java.util.Arrays;
+import com.google.gson.TypeAdapter;
+import com.google.gson.annotations.JsonAdapter;
+import com.google.gson.annotations.SerializedName;
+import com.google.gson.stream.JsonReader;
+import com.google.gson.stream.JsonWriter;
+import com.linkedin.hoptimator.models.V1alpha1SubscriptionSpec;
+import com.linkedin.hoptimator.models.V1alpha1SubscriptionStatus;
+import io.kubernetes.client.openapi.models.V1ObjectMeta;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import java.io.IOException;
+
+/**
+ * Hoptimator Subscription
+ */
+@ApiModel(description = "Hoptimator Subscription")
+@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]")
+public class V1alpha1Subscription implements io.kubernetes.client.common.KubernetesObject {
+  public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
+  @SerializedName(SERIALIZED_NAME_API_VERSION)
+  private String apiVersion;
+
+  public static final String SERIALIZED_NAME_KIND = "kind";
+  @SerializedName(SERIALIZED_NAME_KIND)
+  private String kind;
+
+  public static final String SERIALIZED_NAME_METADATA = "metadata";
+  @SerializedName(SERIALIZED_NAME_METADATA)
+  private V1ObjectMeta metadata = null;
+
+  public static final String SERIALIZED_NAME_SPEC = "spec";
+  @SerializedName(SERIALIZED_NAME_SPEC)
+  private V1alpha1SubscriptionSpec spec;
+
+  public static final String SERIALIZED_NAME_STATUS = "status";
+  @SerializedName(SERIALIZED_NAME_STATUS)
+  private V1alpha1SubscriptionStatus status;
+
+
+  public V1alpha1Subscription apiVersion(String apiVersion) {
+    
+    this.apiVersion = apiVersion;
+    return this;
+  }
+
+   /**
+   * APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources
+   * @return apiVersion
+  **/
+  @javax.annotation.Nullable
+  @ApiModelProperty(value = "APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources")
+
+  public String getApiVersion() {
+    return apiVersion;
+  }
+
+
+  public void setApiVersion(String apiVersion) {
+    this.apiVersion = apiVersion;
+  }
+
+
+  public V1alpha1Subscription kind(String kind) {
+    
+    this.kind = kind;
+    return this;
+  }
+
+   /**
+   * Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds
+   * @return kind
+  **/
+  @javax.annotation.Nullable
+  @ApiModelProperty(value = "Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds")
+
+  public String getKind() {
+    return kind;
+  }
+
+
+  public void setKind(String kind) {
+    this.kind = kind;
+  }
+
+
+  public V1alpha1Subscription metadata(V1ObjectMeta metadata) {
+    
+    this.metadata = metadata;
+    return this;
+  }
+
+   /**
+   * Get metadata
+   * @return metadata
+  **/
+  @javax.annotation.Nullable
+  @ApiModelProperty(value = "")
+
+  public V1ObjectMeta getMetadata() {
+    return metadata;
+  }
+
+
+  public void setMetadata(V1ObjectMeta metadata) {
+    this.metadata = metadata;
+  }
+
+
+  public V1alpha1Subscription spec(V1alpha1SubscriptionSpec spec) {
+    
+    this.spec = spec;
+    return this;
+  }
+
+   /**
+   * Get spec
+   * @return spec
+  **/
+  @javax.annotation.Nullable
+  @ApiModelProperty(value = "")
+
+  public V1alpha1SubscriptionSpec getSpec() {
+    return spec;
+  }
+
+
+  public void setSpec(V1alpha1SubscriptionSpec spec) {
+    this.spec = spec;
+  }
+
+
+  public V1alpha1Subscription status(V1alpha1SubscriptionStatus status) {
+    
+    this.status = status;
+    return this;
+  }
+
+   /**
+   * Get status
+   * @return status
+  **/
+  @javax.annotation.Nullable
+  @ApiModelProperty(value = "")
+
+  public V1alpha1SubscriptionStatus getStatus() {
+    return status;
+  }
+
+
+  public void setStatus(V1alpha1SubscriptionStatus status) {
+    this.status = status;
+  }
+
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    V1alpha1Subscription v1alpha1Subscription = (V1alpha1Subscription) o;
+    return Objects.equals(this.apiVersion, v1alpha1Subscription.apiVersion) &&
+        Objects.equals(this.kind, v1alpha1Subscription.kind) &&
+        Objects.equals(this.metadata, v1alpha1Subscription.metadata) &&
+        Objects.equals(this.spec, v1alpha1Subscription.spec) &&
+        Objects.equals(this.status, v1alpha1Subscription.status);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(apiVersion, kind, metadata, spec, status);
+  }
+
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("class V1alpha1Subscription {\n");
+    sb.append("    apiVersion: ").append(toIndentedString(apiVersion)).append("\n");
+    sb.append("    kind: ").append(toIndentedString(kind)).append("\n");
+    sb.append("    metadata: ").append(toIndentedString(metadata)).append("\n");
+    sb.append("    spec: ").append(toIndentedString(spec)).append("\n");
+    sb.append("    status: ").append(toIndentedString(status)).append("\n");
+    sb.append("}");
+    return sb.toString();
+  }
+
+  /**
+   * Convert the given object to string with each line indented by 4 spaces
+   * (except the first line).
+   */
+  private String toIndentedString(Object o) {
+    if (o == null) {
+      return "null";
+    }
+    return o.toString().replace("\n", "\n    ");
+  }
+
+}
+
diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionList.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionList.java
new file mode 100644
index 00000000..85ca00b3
--- /dev/null
+++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionList.java
@@ -0,0 +1,195 @@
+/*
+ * Kubernetes
+ * No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator)
+ *
+ * The version of the OpenAPI document: v1.21.1
+ * 
+ *
+ * NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech).
+ * https://openapi-generator.tech
+ * Do not edit the class manually.
+ */
+
+
+package com.linkedin.hoptimator.models;
+
+import java.util.Objects;
+import java.util.Arrays;
+import com.google.gson.TypeAdapter;
+import com.google.gson.annotations.JsonAdapter;
+import com.google.gson.annotations.SerializedName;
+import com.google.gson.stream.JsonReader;
+import com.google.gson.stream.JsonWriter;
+import com.linkedin.hoptimator.models.V1alpha1Subscription;
+import io.kubernetes.client.openapi.models.V1ListMeta;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * SubscriptionList is a list of Subscription
+ */
+@ApiModel(description = "SubscriptionList is a list of Subscription")
+@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]")
+public class V1alpha1SubscriptionList implements io.kubernetes.client.common.KubernetesListObject {
+  public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
+  @SerializedName(SERIALIZED_NAME_API_VERSION)
+  private String apiVersion;
+
+  public static final String SERIALIZED_NAME_ITEMS = "items";
+  @SerializedName(SERIALIZED_NAME_ITEMS)
+  private List<V1alpha1Subscription> items = new ArrayList<>();
+
+  public static final String SERIALIZED_NAME_KIND = "kind";
+  @SerializedName(SERIALIZED_NAME_KIND)
+  private String kind;
+
+  public static final String SERIALIZED_NAME_METADATA = "metadata";
+  @SerializedName(SERIALIZED_NAME_METADATA)
+  private V1ListMeta metadata = null;
+
+
+  public V1alpha1SubscriptionList apiVersion(String apiVersion) {
+    
+    this.apiVersion = apiVersion;
+    return this;
+  }
+
+   /**
+   * APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources
+   * @return apiVersion
+  **/
+  @javax.annotation.Nullable
+  @ApiModelProperty(value = "APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources")
+
+  public String getApiVersion() {
+    return apiVersion;
+  }
+
+
+  public void setApiVersion(String apiVersion) {
+    this.apiVersion = apiVersion;
+  }
+
+
+  public V1alpha1SubscriptionList items(List<V1alpha1Subscription> items) {
+    
+    this.items = items;
+    return this;
+  }
+
+  public V1alpha1SubscriptionList addItemsItem(V1alpha1Subscription itemsItem) {
+    this.items.add(itemsItem);
+    return this;
+  }
+
+   /**
+   * List of subscriptions. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md
+   * @return items
+  **/
+  @ApiModelProperty(required = true, value = "List of subscriptions. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md")
+
+  public List<V1alpha1Subscription> getItems() {
+    return items;
+  }
+
+
+  public void setItems(List<V1alpha1Subscription> items) {
+    this.items = items;
+  }
+
+
+  public V1alpha1SubscriptionList kind(String kind) {
+    
+    this.kind = kind;
+    return this;
+  }
+
+   /**
+   * Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds
+   * @return kind
+  **/
+  @javax.annotation.Nullable
+  @ApiModelProperty(value = "Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds")
+
+  public String getKind() {
+    return kind;
+  }
+
+
+  public void setKind(String kind) {
+    this.kind = kind;
+  }
+
+
+  public V1alpha1SubscriptionList metadata(V1ListMeta metadata) {
+    
+    this.metadata = metadata;
+    return this;
+  }
+
+   /**
+   * Get metadata
+   * @return metadata
+  **/
+  @javax.annotation.Nullable
+  @ApiModelProperty(value = "")
+
+  public V1ListMeta getMetadata() {
+    return metadata;
+  }
+
+
+  public void setMetadata(V1ListMeta metadata) {
+    this.metadata = metadata;
+  }
+
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    V1alpha1SubscriptionList v1alpha1SubscriptionList = (V1alpha1SubscriptionList) o;
+    return Objects.equals(this.apiVersion, v1alpha1SubscriptionList.apiVersion) &&
+        Objects.equals(this.items, v1alpha1SubscriptionList.items) &&
+        Objects.equals(this.kind, v1alpha1SubscriptionList.kind) &&
+        Objects.equals(this.metadata, v1alpha1SubscriptionList.metadata);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(apiVersion, items, kind, metadata);
+  }
+
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("class V1alpha1SubscriptionList {\n");
+    sb.append("    apiVersion: ").append(toIndentedString(apiVersion)).append("\n");
+    sb.append("    items: ").append(toIndentedString(items)).append("\n");
+    sb.append("    kind: ").append(toIndentedString(kind)).append("\n");
+    sb.append("    metadata: ").append(toIndentedString(metadata)).append("\n");
+    sb.append("}");
+    return sb.toString();
+  }
+
+  /**
+   * Convert the given object to string with each line indented by 4 spaces
+   * (except the first line).
+   */
+  private String toIndentedString(Object o) {
+    if (o == null) {
+      return "null";
+    }
+    return o.toString().replace("\n", "\n    ");
+  }
+
+}
+
diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionSpec.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionSpec.java
new file mode 100644
index 00000000..807b842d
--- /dev/null
+++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionSpec.java
@@ -0,0 +1,127 @@
+/*
+ * Kubernetes
+ * No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator)
+ *
+ * The version of the OpenAPI document: v1.21.1
+ * 
+ *
+ * NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech).
+ * https://openapi-generator.tech
+ * Do not edit the class manually.
+ */
+
+
+package com.linkedin.hoptimator.models;
+
+import java.util.Objects;
+import java.util.Arrays;
+import com.google.gson.TypeAdapter;
+import com.google.gson.annotations.JsonAdapter;
+import com.google.gson.annotations.SerializedName;
+import com.google.gson.stream.JsonReader;
+import com.google.gson.stream.JsonWriter;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import java.io.IOException;
+
+/**
+ * Subscription spec
+ */
+@ApiModel(description = "Subscription spec")
+@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]")
+public class V1alpha1SubscriptionSpec {
+  public static final String SERIALIZED_NAME_DATABASE = "database";
+  @SerializedName(SERIALIZED_NAME_DATABASE)
+  private String database;
+
+  public static final String SERIALIZED_NAME_SQL = "sql";
+  @SerializedName(SERIALIZED_NAME_SQL)
+  private String sql;
+
+
+  public V1alpha1SubscriptionSpec database(String database) {
+    
+    this.database = database;
+    return this;
+  }
+
+   /**
+   * The database in which to create the output/sink table.
+   * @return database
+  **/
+  @ApiModelProperty(required = true, value = "The database in which to create the output/sink table.")
+
+  public String getDatabase() {
+    return database;
+  }
+
+
+  public void setDatabase(String database) {
+    this.database = database;
+  }
+
+
+  public V1alpha1SubscriptionSpec sql(String sql) {
+    
+    this.sql = sql;
+    return this;
+  }
+
+   /**
+   * A single SQL query.
+   * @return sql
+  **/
+  @ApiModelProperty(required = true, value = "A single SQL query.")
+
+  public String getSql() {
+    return sql;
+  }
+
+
+  public void setSql(String sql) {
+    this.sql = sql;
+  }
+
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    V1alpha1SubscriptionSpec v1alpha1SubscriptionSpec = (V1alpha1SubscriptionSpec) o;
+    return Objects.equals(this.database, v1alpha1SubscriptionSpec.database) &&
+        Objects.equals(this.sql, v1alpha1SubscriptionSpec.sql);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(database, sql);
+  }
+
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("class V1alpha1SubscriptionSpec {\n");
+    sb.append("    database: ").append(toIndentedString(database)).append("\n");
+    sb.append("    sql: ").append(toIndentedString(sql)).append("\n");
+    sb.append("}");
+    return sb.toString();
+  }
+
+  /**
+   * Convert the given object to string with each line indented by 4 spaces
+   * (except the first line).
+   */
+  private String toIndentedString(Object o) {
+    if (o == null) {
+      return "null";
+    }
+    return o.toString().replace("\n", "\n    ");
+  }
+
+}
+
diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionStatus.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionStatus.java
new file mode 100644
index 00000000..0d2355f7
--- /dev/null
+++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionStatus.java
@@ -0,0 +1,197 @@
+/*
+ * Kubernetes
+ * No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator)
+ *
+ * The version of the OpenAPI document: v1.21.1
+ * 
+ *
+ * NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech).
+ * https://openapi-generator.tech
+ * Do not edit the class manually.
+ */
+
+
+package com.linkedin.hoptimator.models;
+
+import java.util.Objects;
+import java.util.Arrays;
+import com.google.gson.TypeAdapter;
+import com.google.gson.annotations.JsonAdapter;
+import com.google.gson.annotations.SerializedName;
+import com.google.gson.stream.JsonReader;
+import com.google.gson.stream.JsonWriter;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Filled in by the operator.
+ */
+@ApiModel(description = "Filled in by the operator.")
+@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-07-19T17:12:58.614Z[Etc/UTC]")
+public class V1alpha1SubscriptionStatus {
+  public static final String SERIALIZED_NAME_MESSAGE = "message";
+  @SerializedName(SERIALIZED_NAME_MESSAGE)
+  private String message;
+
+  public static final String SERIALIZED_NAME_READY = "ready";
+  @SerializedName(SERIALIZED_NAME_READY)
+  private Boolean ready;
+
+  public static final String SERIALIZED_NAME_RESOURCES = "resources";
+  @SerializedName(SERIALIZED_NAME_RESOURCES)
+  private List<String> resources = null;
+
+  public static final String SERIALIZED_NAME_SQL = "sql";
+  @SerializedName(SERIALIZED_NAME_SQL)
+  private String sql;
+
+
+  public V1alpha1SubscriptionStatus message(String message) {
+    
+    this.message = message;
+    return this;
+  }
+
+   /**
+   * Error or success message, for information only.
+   * @return message
+  **/
+  @javax.annotation.Nullable
+  @ApiModelProperty(value = "Error or success message, for information only.")
+
+  public String getMessage() {
+    return message;
+  }
+
+
+  public void setMessage(String message) {
+    this.message = message;
+  }
+
+
+  public V1alpha1SubscriptionStatus ready(Boolean ready) {
+    
+    this.ready = ready;
+    return this;
+  }
+
+   /**
+   * Whether the subscription is ready to be consumed.
+   * @return ready
+  **/
+  @javax.annotation.Nullable
+  @ApiModelProperty(value = "Whether the subscription is ready to be consumed.")
+
+  public Boolean getReady() {
+    return ready;
+  }
+
+
+  public void setReady(Boolean ready) {
+    this.ready = ready;
+  }
+
+
+  public V1alpha1SubscriptionStatus resources(List<String> resources) {
+    
+    this.resources = resources;
+    return this;
+  }
+
+  public V1alpha1SubscriptionStatus addResourcesItem(String resourcesItem) {
+    if (this.resources == null) {
+      this.resources = new ArrayList<>();
+    }
+    this.resources.add(resourcesItem);
+    return this;
+  }
+
+   /**
+   * The YAML generated to implement this pipeline.
+   * @return resources
+  **/
+  @javax.annotation.Nullable
+  @ApiModelProperty(value = "The YAML generated to implement this pipeline.")
+
+  public List<String> getResources() {
+    return resources;
+  }
+
+
+  public void setResources(List<String> resources) {
+    this.resources = resources;
+  }
+
+
+  public V1alpha1SubscriptionStatus sql(String sql) {
+    
+    this.sql = sql;
+    return this;
+  }
+
+   /**
+   * The SQL being implemented by this pipeline.
+   * @return sql
+  **/
+  @javax.annotation.Nullable
+  @ApiModelProperty(value = "The SQL being implemented by this pipeline.")
+
+  public String getSql() {
+    return sql;
+  }
+
+
+  public void setSql(String sql) {
+    this.sql = sql;
+  }
+
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    V1alpha1SubscriptionStatus v1alpha1SubscriptionStatus = (V1alpha1SubscriptionStatus) o;
+    return Objects.equals(this.message, v1alpha1SubscriptionStatus.message) &&
+        Objects.equals(this.ready, v1alpha1SubscriptionStatus.ready) &&
+        Objects.equals(this.resources, v1alpha1SubscriptionStatus.resources) &&
+        Objects.equals(this.sql, v1alpha1SubscriptionStatus.sql);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(message, ready, resources, sql);
+  }
+
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("class V1alpha1SubscriptionStatus {\n");
+    sb.append("    message: ").append(toIndentedString(message)).append("\n");
+    sb.append("    ready: ").append(toIndentedString(ready)).append("\n");
+    sb.append("    resources: ").append(toIndentedString(resources)).append("\n");
+    sb.append("    sql: ").append(toIndentedString(sql)).append("\n");
+    sb.append("}");
+    return sb.toString();
+  }
+
+  /**
+   * Convert the given object to string with each line indented by 4 spaces
+   * (except the first line).
+   */
+  private String toIndentedString(Object o) {
+    if (o == null) {
+      return "null";
+    }
+    return o.toString().replace("\n", "\n    ");
+  }
+
+}
+