diff --git a/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/CsvScheme.java b/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/CsvScheme.java new file mode 100644 index 00000000000..03aabf29cf7 --- /dev/null +++ b/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/CsvScheme.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.sql.kafka; + +import org.apache.storm.spout.Scheme; +import org.apache.storm.sql.runtime.FieldInfo; +import org.apache.storm.tuple.Fields; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +public class CsvScheme implements Scheme { + List fieldsInfo = null; + List fields = null; + + public CsvScheme(List fieldsInfo) { + this.fieldsInfo = fieldsInfo; + fields = new ArrayList<>(); + for (FieldInfo info : fieldsInfo) { + fields.add(info.name()); + } + } + + @Override + public List deserialize(ByteBuffer ser) { + String str = ser.toString(); + String[] parts = str.split(","); + if (parts.length != this.fields.size()) { + throw new RuntimeException("Kafka CSV message does not match the designed schema"); + } + List ret = new ArrayList<>(this.fields.size()); + for (int i = 0; i < this.fieldsInfo.size(); i++) { + FieldInfo fieldInfo = this.fieldsInfo.get(i); + String part = parts[i]; + Class type = fieldInfo.type(); + if (type == String.class) { + ret.add(part); + } else { + try { + Constructor cons = type.getDeclaredConstructor(String.class); + ret.add(cons.newInstance(part)); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + return ret; + } + + @Override + public Fields getOutputFields() { + return new Fields(this.fields); + } +} diff --git a/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/KafkaDataSourcesProvider.java b/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/KafkaDataSourcesProvider.java index 0236948d8b1..97d196320bc 100644 --- a/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/KafkaDataSourcesProvider.java +++ b/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/KafkaDataSourcesProvider.java @@ -171,12 +171,23 @@ public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass, } } Preconditions.checkState(primaryIndex != -1, "Kafka stream table must have a primary key"); - conf.scheme = new SchemeAsMultiScheme(new JsonScheme(fieldNames)); ObjectMapper mapper = new ObjectMapper(); Properties producerProp = new Properties(); try { @SuppressWarnings("unchecked") HashMap map = mapper.readValue(properties, HashMap.class); + if(!map.containsKey("scheme")){ + conf.scheme = new SchemeAsMultiScheme(new JsonScheme(fieldNames)); + }else{ + String scheme = (String)map.get("scheme"); + if(scheme.toUpperCase().equals("JSON")){ + conf.scheme = new SchemeAsMultiScheme(new JsonScheme(fieldNames)); + }else if(scheme.toUpperCase().equals("CSV")){ + conf.scheme = new SchemeAsMultiScheme(new CsvScheme(fields)); + }else{ + throw new UnsupportedOperationException(String.format("%s: Unsupport scheme",scheme)); + } + } @SuppressWarnings("unchecked") HashMap producerConfig = (HashMap) map.get("producer"); Preconditions.checkNotNull(producerConfig, "Kafka Table must contain producer config");