Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.sql.kafka;

import org.apache.storm.spout.Scheme;
import org.apache.storm.sql.runtime.FieldInfo;
import org.apache.storm.tuple.Fields;

import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;

public class CsvScheme implements Scheme {
List<FieldInfo> fieldsInfo = null;
List<String> fields = null;

public CsvScheme(List<FieldInfo> fieldsInfo) {
this.fieldsInfo = fieldsInfo;
fields = new ArrayList<>();
for (FieldInfo info : fieldsInfo) {
fields.add(info.name());
}
}

@Override
public List<Object> deserialize(ByteBuffer ser) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. I think we can have some test cases on this.
  2. StringScheme#deserializeString doesn't use ByteBuffer#toString, and javadoc says it provides information about instance state, not content: https://docs.oracle.com/javase/7/docs/api/java/nio/ByteBuffer.html#toString()

String str = ser.toString();
String[] parts = str.split(",");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not standard CSV format, something like a,"b,c",d is also valid. We can use TSV format to address.

if (parts.length != this.fields.size()) {
throw new RuntimeException("Kafka CSV message does not match the designed schema");
}
List<Object> ret = new ArrayList<>(this.fields.size());
for (int i = 0; i < this.fieldsInfo.size(); i++) {
FieldInfo fieldInfo = this.fieldsInfo.get(i);
String part = parts[i];
Class<?> type = fieldInfo.type();
if (type == String.class) {
ret.add(part);
} else {
try {
Constructor cons = type.getDeclaredConstructor(String.class);
ret.add(cons.newInstance(part));
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
return ret;
}

@Override
public Fields getOutputFields() {
return new Fields(this.fields);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -171,12 +171,23 @@ public ISqlTridentDataSource constructTrident(URI uri, String inputFormatClass,
}
}
Preconditions.checkState(primaryIndex != -1, "Kafka stream table must have a primary key");
conf.scheme = new SchemeAsMultiScheme(new JsonScheme(fieldNames));
ObjectMapper mapper = new ObjectMapper();
Properties producerProp = new Properties();
try {
@SuppressWarnings("unchecked")
HashMap<String, Object> map = mapper.readValue(properties, HashMap.class);
if(!map.containsKey("scheme")){
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please fix spaces between L179 and L187.

conf.scheme = new SchemeAsMultiScheme(new JsonScheme(fieldNames));
}else{
String scheme = (String)map.get("scheme");
if(scheme.toUpperCase().equals("JSON")){
conf.scheme = new SchemeAsMultiScheme(new JsonScheme(fieldNames));
}else if(scheme.toUpperCase().equals("CSV")){
conf.scheme = new SchemeAsMultiScheme(new CsvScheme(fields));
}else{
throw new UnsupportedOperationException(String.format("%s: Unsupport scheme",scheme));
}
}
@SuppressWarnings("unchecked")
HashMap<String, Object> producerConfig = (HashMap<String, Object>) map.get("producer");
Preconditions.checkNotNull(producerConfig, "Kafka Table must contain producer config");
Expand Down