Skip to content

Commit 054b4c0

Browse files
author
Megan Foss
committed
WIP converting to EVF v2. Pushing to repo for troubleshooting purposes.
1 parent 56d8f6e commit 054b4c0

File tree

6 files changed

+99
-11
lines changed

6 files changed

+99
-11
lines changed

contrib/format-fixedwidth/src/main/java/org/apache/drill/exec/store/fixedwidth/FixedwidthBatchReader.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -56,12 +56,11 @@ public class FixedwidthBatchReader implements ManagedReader<FileSchemaNegotiator
5656
private CustomErrorContext errorContext;
5757
private InputStream fsStream;
5858
private ResultSetLoader loader;
59-
private RowSetLoader writer;
6059
private BufferedReader reader;
6160
private int lineNum;
6261

6362
public FixedwidthBatchReader(FixedwidthFormatConfig config, int maxRecords) {
64-
this.config = config;
63+
this.config = config; //reader-specific schema and projection manager
6564
this.maxRecords = maxRecords;
6665
}
6766

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
package org.apache.drill.exec.store.fixedwidth;
2+
3+
import org.apache.drill.common.AutoCloseables;
4+
import org.apache.drill.common.exceptions.CustomErrorContext;
5+
import org.apache.drill.common.exceptions.UserException;
6+
import org.apache.drill.exec.ops.OperatorContext;
7+
import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
8+
import org.apache.drill.exec.physical.impl.scan.v3.SchemaNegotiator;
9+
import org.apache.drill.exec.physical.impl.scan.v3.file.FileScanLifecycle;
10+
import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
11+
import org.apache.drill.exec.physical.impl.scan.v3.lifecycle.ReaderLifecycle;
12+
import org.apache.drill.exec.physical.impl.scan.v3.lifecycle.SchemaNegotiatorImpl;
13+
import org.apache.drill.exec.physical.impl.scan.v3.schema.ProjectedColumn;
14+
import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
15+
import org.apache.drill.exec.record.metadata.TupleMetadata;
16+
import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
17+
import org.apache.hadoop.mapred.FileSplit;
18+
import org.slf4j.Logger;
19+
import org.slf4j.LoggerFactory;
20+
21+
import java.io.BufferedReader;
22+
import java.io.IOException;
23+
import java.io.InputStream;
24+
import java.io.InputStreamReader;
25+
26+
public class FixedwidthBatchReaderImpl implements ManagedReader {
27+
28+
private final int maxRecords;
29+
private final FixedwidthFormatConfig config;
30+
private InputStream fsStream;
31+
private ResultSetLoader loader;
32+
private FileSplit split;
33+
private CustomErrorContext errorContext;
34+
private static final Logger logger = LoggerFactory.getLogger(FixedwidthBatchReader.class);
35+
36+
public FixedwidthBatchReaderImpl (SchemaNegotiator negotiator, FixedwidthFormatConfig config, int maxRecords) {
37+
this.loader = open(negotiator);
38+
this.config = config;
39+
this.maxRecords = maxRecords;
40+
}
41+
42+
@Override
43+
public boolean next() {
44+
45+
}
46+
47+
@Override
48+
public void close() {
49+
if (fsStream != null){
50+
AutoCloseables.closeSilently(fsStream);
51+
fsStream = null;
52+
}
53+
}
54+
55+
private ResultSetLoader open(SchemaNegotiator negotiator) {
56+
split = (FileSplit) negotiator.split();
57+
errorContext = negotiator.parentErrorContext();
58+
openFile(negotiator);
59+
60+
try {
61+
negotiator.tableSchema(buildSchema(), true);
62+
loader = negotiator.build();
63+
} catch (Exception e) {
64+
throw UserException
65+
.dataReadError(e)
66+
.message("Failed to open input file: {}", split.getPath().toString())
67+
.addContext(errorContext)
68+
.addContext(e.getMessage())
69+
.build(logger);
70+
}
71+
reader = new BufferedReader(new InputStreamReader(fsStream, Charsets.UTF_8));
72+
return loader;
73+
}
74+
75+
private void openFile(FileSchemaNegotiator negotiator) {
76+
try {
77+
fsStream = negotiator.file().fileSystem().openPossiblyCompressedStream(split.getPath());
78+
sasFileReader = new SasFileReaderImpl(fsStream);
79+
firstRow = sasFileReader.readNext();
80+
} catch (IOException e) {
81+
throw UserException
82+
.dataReadError(e)
83+
.message("Unable to open Fixed Width File %s", split.getPath())
84+
.addContext(e.getMessage())
85+
.addContext(errorContext)
86+
.build(logger);
87+
}
88+
}
89+
}

contrib/format-fixedwidth/src/main/java/org/apache/drill/exec/store/fixedwidth/FixedwidthFormatConfig.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,6 @@ public void setFieldTypes(int i) {
162162
}
163163
}
164164

165-
@JsonIgnore
166165
public void validateFieldInput(){
167166
Set<String> uniqueNames = new HashSet<>();
168167
List<Integer> fieldIndices = this.getFieldIndices();
@@ -192,7 +191,7 @@ public void validateFieldInput(){
192191
if (!Pattern.matches("[a-zA-Z]\\w*", name)) {
193192
throw UserException
194193
.validationError()
195-
.message("Invalid input: " + name)
194+
.message("Column Name '" + name + "' is not valid. Must contain letters, numbers, and underscores only.")
196195
.addContext("Plugin", FixedwidthFormatPlugin.DEFAULT_NAME)
197196
.build(logger);
198197
}

contrib/format-fixedwidth/src/main/java/org/apache/drill/exec/store/fixedwidth/FixedwidthFormatPlugin.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -89,4 +89,4 @@ protected FileScanBuilder frameworkBuilder(OptionManager options, EasySubScan sc
8989
builder.nullType(Types.optional(TypeProtos.MinorType.VARCHAR));
9090
return builder;
9191
}
92-
}
92+
}

contrib/format-fixedwidth/src/test/java/org/apache/drill/exec/store/fixedwidth/TestFixedwidthRecordReader.java

+5-6
Original file line numberDiff line numberDiff line change
@@ -51,11 +51,11 @@ public static void setup() throws Exception {
5151
FixedwidthFormatConfig formatConfig = new FixedwidthFormatConfig(Lists.newArrayList("fwf"),
5252
Lists.newArrayList(
5353
new FixedwidthFieldConfig("Number", 1, 5, TypeProtos.MinorType.VARDECIMAL),
54-
new FixedwidthFieldConfig("Address",12, 3,TypeProtos.MinorType.INT, ""),
55-
new FixedwidthFieldConfig("Letter", 7,4, TypeProtos.MinorType.VARCHAR, ""),
56-
new FixedwidthFieldConfig("Date",16, 10,TypeProtos.MinorType.DATE, "MM-dd-yyyy"),
57-
new FixedwidthFieldConfig( "Time", 27, 8,TypeProtos.MinorType.TIME,"HH:mm:ss" ),
58-
new FixedwidthFieldConfig("DateTime", 36, 23,TypeProtos.MinorType.TIMESTAMP, "MM-dd-yyyy'T'HH:mm:ss.SSX" )
54+
new FixedwidthFieldConfig("Address", 12, 3, TypeProtos.MinorType.INT),
55+
new FixedwidthFieldConfig("Letter", 7, 4, TypeProtos.MinorType.VARCHAR),
56+
new FixedwidthFieldConfig("Date", 16, 10, TypeProtos.MinorType.DATE, "MM-dd-yyyy"),
57+
new FixedwidthFieldConfig("Time", 27, 8, TypeProtos.MinorType.TIME,"HH:mm:ss"),
58+
new FixedwidthFieldConfig("DateTime", 36, 23, TypeProtos.MinorType.TIMESTAMP, "MM-dd-yyyy'T'HH:mm:ss.SSX")
5959
));
6060
cluster.defineFormat("dfs", "fwf", formatConfig);
6161
cluster.defineFormat("cp", "fwf", formatConfig);
@@ -218,4 +218,3 @@ private RowSet setupTestData(){
218218
}
219219

220220
}
221-

exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/SchemaNegotiator.java

+2
Original file line numberDiff line numberDiff line change
@@ -224,4 +224,6 @@ public interface SchemaNegotiator {
224224
* schema order
225225
*/
226226
ResultSetLoader build();
227+
228+
Object split();
227229
}

0 commit comments

Comments
 (0)