diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sContext.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sContext.java index a3d6830..4b71d31 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sContext.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sContext.java @@ -10,6 +10,8 @@ import java.util.Optional; import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import io.kubernetes.client.apimachinery.GroupVersion; import io.kubernetes.client.common.KubernetesListObject; @@ -25,15 +27,18 @@ public class K8sContext { + private final static Logger LOG = LoggerFactory.getLogger(K8sContext.class); + public static final String DEFAULT_NAMESPACE = "default"; private static final String ENV_OVERRIDE_BASEPATH = "KUBECONFIG_BASEPATH"; private static K8sContext currentContext = null; private final String name; private final String namespace; - private final ApiClient apiClient; + private ApiClient apiClient; private final SharedInformerFactory informerFactory; public K8sContext(String name, String namespace, ApiClient apiClient) { + LOG.info("K8sContext created for namespace: {}", namespace); this.name = name; this.namespace = namespace; this.apiClient = apiClient; @@ -44,6 +49,12 @@ public ApiClient apiClient() { return apiClient; } + // Assigning a new api client should only happen once right after context creation. + // Re-assigning a new api client can have unexpected consequences. + public void apiClient(ApiClient apiClient) { + this.apiClient = apiClient; + } + public String name() { return name; } @@ -57,8 +68,8 @@ public SharedInformerFactory informerFactory() { } public void registerInformer( - K8sApiEndpoint endpoint, Duration resyncPeriod) { - informerFactory.sharedIndexInformerFor(generic(endpoint), endpoint.elementType(), resyncPeriod.toMillis()); + K8sApiEndpoint endpoint, Duration resyncPeriod, String watchNamespace) { + informerFactory.sharedIndexInformerFor(generic(endpoint), endpoint.elementType(), resyncPeriod.toMillis(), watchNamespace); } public DynamicKubernetesApi dynamic(String apiVersion, String plural) { @@ -113,19 +124,13 @@ static K8sContext defaultContext() throws IOException { KubeConfig kubeConfig = KubeConfig.loadKubeConfig(r); kubeConfig.setFile(file); ApiClient apiClient = addEnvOverrides(kubeConfig).build(); - String namespace = Optional.ofNullable(kubeConfig.getNamespace()).orElse("default"); + String namespace = Optional.ofNullable(kubeConfig.getNamespace()).orElse(DEFAULT_NAMESPACE); return new K8sContext(kubeConfig.getCurrentContext(), namespace, apiClient); } } else { ApiClient apiClient = Config.defaultClient(); - String filePath = System.getenv("POD_NAMESPACE_FILEPATH"); - String namespace; - if (filePath == null) { - namespace = "default"; - } else { - namespace = new String(Files.readAllBytes(Paths.get(filePath))); - } - return new K8sContext("default", namespace, apiClient); + String namespace = getNamespace(); + return new K8sContext(namespace, namespace, apiClient); } } @@ -139,4 +144,16 @@ private static ClientBuilder addEnvOverrides(KubeConfig kubeConfig) throws IOExc return builder; } + + private static String getNamespace() throws IOException { + String filePath = System.getenv("POD_NAMESPACE_FILEPATH"); + if (filePath != null) { + return new String(Files.readAllBytes(Paths.get(filePath))); + } + String namespace = System.getProperty("SELF_POD_NAMESPACE"); + if (namespace != null) { + return namespace; + } + return DEFAULT_NAMESPACE; + } } diff --git a/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/HoptimatorOperatorApp.java b/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/HoptimatorOperatorApp.java index d3793ac..6fbcd1d 100644 --- a/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/HoptimatorOperatorApp.java +++ b/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/HoptimatorOperatorApp.java @@ -1,5 +1,6 @@ package com.linkedin.hoptimator.operator; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Properties; @@ -23,6 +24,7 @@ import io.kubernetes.client.util.Config; import com.linkedin.hoptimator.catalog.Resource; +import com.linkedin.hoptimator.k8s.K8sApiEndpoints; import com.linkedin.hoptimator.k8s.K8sContext; import com.linkedin.hoptimator.models.V1alpha1Subscription; import com.linkedin.hoptimator.models.V1alpha1SubscriptionList; @@ -35,17 +37,17 @@ public class HoptimatorOperatorApp { private static final Logger log = LoggerFactory.getLogger(HoptimatorOperatorApp.class); final String url; - final String namespace; + final String watchNamespace; final ApiClient apiClient; final Predicate subscriptionFilter; final Properties properties; final Resource.Environment environment; /** This constructor is likely to evolve and break. */ - public HoptimatorOperatorApp(String url, String namespace, ApiClient apiClient, + public HoptimatorOperatorApp(String url, String watchNamespace, ApiClient apiClient, Predicate subscriptionFilter, Properties properties) { this.url = url; - this.namespace = namespace; + this.watchNamespace = watchNamespace; this.apiClient = apiClient; this.subscriptionFilter = subscriptionFilter; this.properties = properties; @@ -59,9 +61,10 @@ public static void main(String[] args) throws Exception { Options options = new Options(); - Option namespace = new Option("n", "namespace", true, "specified namespace"); - namespace.setRequired(false); - options.addOption(namespace); + Option watchNamespace = new Option("w", "watch", true, + "namespace to watch for resource operations, empty string indicates all namespaces"); + watchNamespace.setRequired(false); + options.addOption(watchNamespace); CommandLineParser parser = new DefaultParser(); HelpFormatter formatter = new HelpFormatter(); @@ -78,9 +81,10 @@ public static void main(String[] args) throws Exception { } String urlInput = cmd.getArgs()[0]; - String namespaceInput = cmd.getOptionValue("namespace", "default"); + String watchNamespaceInput = cmd.getOptionValue("watch", ""); - new HoptimatorOperatorApp(urlInput, namespaceInput, Config.defaultClient(), null, new Properties()).run(); + new HoptimatorOperatorApp(urlInput, watchNamespaceInput, + Config.defaultClient(), null, new Properties()).run(); } public void run() throws Exception { @@ -91,8 +95,9 @@ public void run() throws Exception { apiClient.setHttpClient(apiClient.getHttpClient().newBuilder().readTimeout(0, TimeUnit.SECONDS).build()); SharedInformerFactory informerFactory = new SharedInformerFactory(apiClient); - Operator operator = new Operator(namespace, apiClient, informerFactory, properties); + Operator operator = new Operator(watchNamespace, apiClient, informerFactory, properties); K8sContext context = K8sContext.currentContext(); + context.apiClient(apiClient); operator.registerApi("Subscription", "subscription", "subscriptions", "hoptimator.linkedin.com", "v1alpha1", V1alpha1Subscription.class, V1alpha1SubscriptionList.class); @@ -100,6 +105,8 @@ public void run() throws Exception { List controllers = new ArrayList<>(); controllers.addAll(ControllerService.controllers(operator)); controllers.add(SubscriptionReconciler.controller(operator, plannerFactory, environment, subscriptionFilter)); + + context.registerInformer(K8sApiEndpoints.PIPELINES, Duration.ofMinutes(5), watchNamespace); controllers.add(PipelineReconciler.controller(context)); ControllerManager controllerManager = diff --git a/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/PipelineOperatorApp.java b/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/PipelineOperatorApp.java index 5297c4c..a510bc1 100644 --- a/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/PipelineOperatorApp.java +++ b/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/PipelineOperatorApp.java @@ -4,6 +4,13 @@ import java.util.ArrayList; import java.util.List; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.DefaultParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -18,15 +25,44 @@ public class PipelineOperatorApp { private static final Logger log = LoggerFactory.getLogger(PipelineOperatorApp.class); + final String watchNamespace; + + public PipelineOperatorApp(String watchNamespace) { + this.watchNamespace = watchNamespace; + } + public static void main(String[] args) throws Exception { - new PipelineOperatorApp().run(); + Options options = new Options(); + + Option watchNamespace = new Option("w", "watch", true, + "namespace to watch for resource operations, empty string indicates all namespaces"); + watchNamespace.setRequired(false); + options.addOption(watchNamespace); + + CommandLineParser parser = new DefaultParser(); + HelpFormatter formatter = new HelpFormatter(); + CommandLine cmd; + + try { + cmd = parser.parse(options, args); + } catch (ParseException e) { + System.out.println(e.getMessage()); + formatter.printHelp("pipeline-operator", options); + + System.exit(1); + return; + } + + String watchNamespaceInput = cmd.getOptionValue("watch", ""); + + new PipelineOperatorApp(watchNamespaceInput).run(); } public void run() throws Exception { K8sContext context = K8sContext.currentContext(); // register informers - context.registerInformer(K8sApiEndpoints.PIPELINES, Duration.ofMinutes(5)); + context.registerInformer(K8sApiEndpoints.PIPELINES, Duration.ofMinutes(5), watchNamespace); List controllers = new ArrayList<>(); // TODO: add additional controllers from ControllerProvider SPI diff --git a/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/pipeline/PipelineReconciler.java b/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/pipeline/PipelineReconciler.java index 5122179..1b12b36 100644 --- a/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/pipeline/PipelineReconciler.java +++ b/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/pipeline/PipelineReconciler.java @@ -97,10 +97,6 @@ protected Duration pendingRetryDuration() { } public static Controller controller(K8sContext context) { - // Duplicate call, only needed while still using HoptimatorOperatorApp, - // when removed in favor of PipelineOperatorApp this call is redundant - context.registerInformer(K8sApiEndpoints.PIPELINES, Duration.ofMinutes(5)); - Reconciler reconciler = new PipelineReconciler(context); return ControllerBuilder.defaultBuilder(context.informerFactory()) .withReconciler(reconciler) diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/HoptimatorJdbcSchema.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/HoptimatorJdbcSchema.java index 191b4b7..113c776 100644 --- a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/HoptimatorJdbcSchema.java +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/HoptimatorJdbcSchema.java @@ -7,17 +7,14 @@ import org.apache.calcite.adapter.jdbc.JdbcTable; import org.apache.calcite.linq4j.tree.Expression; import org.apache.calcite.schema.Schema; -import org.apache.calcite.schema.Schemas; import org.apache.calcite.schema.SchemaPlus; import org.apache.calcite.schema.SchemaVersion; +import org.apache.calcite.schema.Schemas; import org.apache.calcite.schema.Table; import org.apache.calcite.sql.SqlDialect; -import org.apache.calcite.sql.SqlDialectFactory; -import org.apache.calcite.sql.SqlDialectFactoryImpl; import com.linkedin.hoptimator.Database; import com.linkedin.hoptimator.Engine; -import com.linkedin.hoptimator.util.planner.HoptimatorJdbcConvention; public class HoptimatorJdbcSchema extends JdbcSchema implements Database { diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/HoptimatorJdbcTable.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/HoptimatorJdbcTable.java index d3b1af8..68bf025 100644 --- a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/HoptimatorJdbcTable.java +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/HoptimatorJdbcTable.java @@ -3,12 +3,8 @@ import java.util.Collection; import java.util.List; -import com.linkedin.hoptimator.util.DataTypeUtils; - import org.apache.calcite.adapter.java.AbstractQueryableTable; -import org.apache.calcite.adapter.java.JavaTypeFactory; import org.apache.calcite.adapter.jdbc.JdbcTable; -import org.apache.calcite.adapter.jdbc.JdbcTableScan; import org.apache.calcite.linq4j.QueryProvider; import org.apache.calcite.linq4j.Queryable; import org.apache.calcite.plan.RelOptCluster; @@ -20,10 +16,10 @@ import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.rex.RexNode; import org.apache.calcite.schema.ModifiableTable; -import org.apache.calcite.schema.Table; import org.apache.calcite.schema.SchemaPlus; import org.apache.calcite.schema.TranslatableTable; -import org.apache.calcite.schema.impl.AbstractTableQueryable; + +import com.linkedin.hoptimator.util.DataTypeUtils; public class HoptimatorJdbcTable extends AbstractQueryableTable implements TranslatableTable, diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/HoptimatorJdbcTableScan.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/HoptimatorJdbcTableScan.java index ab3f07d..90647e5 100644 --- a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/HoptimatorJdbcTableScan.java +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/HoptimatorJdbcTableScan.java @@ -1,21 +1,12 @@ package com.linkedin.hoptimator.util.planner; -import org.apache.calcite.adapter.jdbc.JdbcTable; +import java.util.List; + import org.apache.calcite.adapter.jdbc.JdbcTableScan; -import org.apache.calcite.plan.Convention; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelOptTable; -import org.apache.calcite.plan.RelTraitSet; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.core.TableScan; import org.apache.calcite.rel.hint.RelHint; -import com.google.common.collect.ImmutableList; - -import java.util.List; - -import static org.apache.calcite.linq4j.Nullness.castNonNull; - public class HoptimatorJdbcTableScan extends JdbcTableScan { public final HoptimatorJdbcTable jdbcTable; diff --git a/hoptimator-venice/src/main/java/com/linkedin/hoptimator/venice/VeniceStore.java b/hoptimator-venice/src/main/java/com/linkedin/hoptimator/venice/VeniceStore.java index ef059ea..7d8f5a4 100644 --- a/hoptimator-venice/src/main/java/com/linkedin/hoptimator/venice/VeniceStore.java +++ b/hoptimator-venice/src/main/java/com/linkedin/hoptimator/venice/VeniceStore.java @@ -33,7 +33,6 @@ public RelDataType getRowType(RelDataTypeFactory typeFactory) { RelDataType key = rel(keySchema, typeFactory); RelDataType value = rel(valueSchema, typeFactory); RelDataTypeFactory.Builder builder = new RelDataTypeFactory.Builder(typeFactory); - builder.addAll(value.getFieldList()); if (key.isStruct()) { for (RelDataTypeField field: key.getFieldList()) { builder.add(KEY_PREFIX + field.getName(), field.getType()); @@ -41,6 +40,7 @@ public RelDataType getRowType(RelDataTypeFactory typeFactory) { } else { builder.add("KEY", key); } + builder.addAll(value.getFieldList()); RelDataType combinedSchema = builder.build(); return DataTypeUtils.flatten(combinedSchema, typeFactory); } diff --git a/hoptimator-venice/src/test/resources/venice-ddl-insert-all.id b/hoptimator-venice/src/test/resources/venice-ddl-insert-all.id index 5af869e..e1ca7d4 100644 --- a/hoptimator-venice/src/test/resources/venice-ddl-insert-all.id +++ b/hoptimator-venice/src/test/resources/venice-ddl-insert-all.id @@ -13,10 +13,10 @@ spec: entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner args: - CREATE DATABASE IF NOT EXISTS `VENICE-CLUSTER0` WITH () - - CREATE TABLE IF NOT EXISTS `VENICE-CLUSTER0`.`test-store` (`intField` INTEGER, `stringField` VARCHAR, `KEY_id` INTEGER) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store', 'value.fields-include'='EXCEPT_KEY') + - CREATE TABLE IF NOT EXISTS `VENICE-CLUSTER0`.`test-store` (`KEY_id` INTEGER, `intField` INTEGER, `stringField` VARCHAR) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store', 'value.fields-include'='EXCEPT_KEY') - CREATE DATABASE IF NOT EXISTS `VENICE-CLUSTER0` WITH () - - CREATE TABLE IF NOT EXISTS `VENICE-CLUSTER0`.`test-store-1` (`intField` INTEGER, `stringField` VARCHAR, `KEY_id` INTEGER) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store-1', 'value.fields-include'='EXCEPT_KEY') - - INSERT INTO `VENICE-CLUSTER0`.`test-store-1` (`intField`, `stringField`, `KEY_id`) SELECT * FROM `VENICE-CLUSTER0`.`test-store` + - CREATE TABLE IF NOT EXISTS `VENICE-CLUSTER0`.`test-store-1` (`KEY_id` INTEGER, `intField` INTEGER, `stringField` VARCHAR) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store-1', 'value.fields-include'='EXCEPT_KEY') + - INSERT INTO `VENICE-CLUSTER0`.`test-store-1` (`KEY_id`, `intField`, `stringField`) SELECT * FROM `VENICE-CLUSTER0`.`test-store` jarURI: file:///opt/hoptimator-flink-runner.jar parallelism: 1 upgradeMode: stateless diff --git a/hoptimator-venice/src/test/resources/venice-ddl-insert-partial.id b/hoptimator-venice/src/test/resources/venice-ddl-insert-partial.id index bb89c80..ada0b4c 100644 --- a/hoptimator-venice/src/test/resources/venice-ddl-insert-partial.id +++ b/hoptimator-venice/src/test/resources/venice-ddl-insert-partial.id @@ -13,10 +13,10 @@ spec: entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner args: - CREATE DATABASE IF NOT EXISTS `VENICE-CLUSTER0` WITH () - - CREATE TABLE IF NOT EXISTS `VENICE-CLUSTER0`.`test-store` (`intField` INTEGER, `stringField` VARCHAR, `KEY_id` INTEGER) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store', 'value.fields-include'='EXCEPT_KEY') + - CREATE TABLE IF NOT EXISTS `VENICE-CLUSTER0`.`test-store` (`KEY_id` INTEGER, `intField` INTEGER, `stringField` VARCHAR) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store', 'value.fields-include'='EXCEPT_KEY') - CREATE DATABASE IF NOT EXISTS `VENICE-CLUSTER0` WITH () - - CREATE TABLE IF NOT EXISTS `VENICE-CLUSTER0`.`test-store-1` (`intField` INTEGER, `stringField` VARCHAR, `KEY_id` INTEGER) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store-1', 'value.fields-include'='EXCEPT_KEY') - - INSERT INTO `VENICE-CLUSTER0`.`test-store-1` (`intField`, `KEY_id`) SELECT CAST(`stringField` AS SIGNED) AS `intField`, `KEY_id` FROM `VENICE-CLUSTER0`.`test-store` + - CREATE TABLE IF NOT EXISTS `VENICE-CLUSTER0`.`test-store-1` (`KEY_id` INTEGER, `intField` INTEGER, `stringField` VARCHAR) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store-1', 'value.fields-include'='EXCEPT_KEY') + - INSERT INTO `VENICE-CLUSTER0`.`test-store-1` (`KEY_id`, `intField`) SELECT `KEY_id`, CAST(`stringField` AS SIGNED) AS `intField` FROM `VENICE-CLUSTER0`.`test-store` jarURI: file:///opt/hoptimator-flink-runner.jar parallelism: 1 upgradeMode: stateless diff --git a/hoptimator-venice/src/test/resources/venice-ddl-select.id b/hoptimator-venice/src/test/resources/venice-ddl-select.id index eb0e38b..7c0b538 100644 --- a/hoptimator-venice/src/test/resources/venice-ddl-select.id +++ b/hoptimator-venice/src/test/resources/venice-ddl-select.id @@ -13,10 +13,10 @@ spec: entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner args: - CREATE DATABASE IF NOT EXISTS `VENICE-CLUSTER0` WITH () - - CREATE TABLE IF NOT EXISTS `VENICE-CLUSTER0`.`test-store-1` (`intField` INTEGER, `stringField` VARCHAR, `KEY_id` INTEGER) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store-1', 'value.fields-include'='EXCEPT_KEY') + - CREATE TABLE IF NOT EXISTS `VENICE-CLUSTER0`.`test-store-1` (`KEY_id` INTEGER, `intField` INTEGER, `stringField` VARCHAR) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store-1', 'value.fields-include'='EXCEPT_KEY') - CREATE DATABASE IF NOT EXISTS `PIPELINE` WITH () - - CREATE TABLE IF NOT EXISTS `PIPELINE`.`SINK` (`intField` INTEGER, `stringField` VARCHAR, `KEY_id` INTEGER) WITH () - - INSERT INTO `PIPELINE`.`SINK` (`intField`, `stringField`, `KEY_id`) SELECT * FROM `VENICE-CLUSTER0`.`test-store-1` + - CREATE TABLE IF NOT EXISTS `PIPELINE`.`SINK` (`KEY_id` INTEGER, `intField` INTEGER, `stringField` VARCHAR) WITH () + - INSERT INTO `PIPELINE`.`SINK` (`KEY_id`, `intField`, `stringField`) SELECT * FROM `VENICE-CLUSTER0`.`test-store-1` jarURI: file:///opt/hoptimator-flink-runner.jar parallelism: 1 upgradeMode: stateless