Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GH-15810] Allow the user to adjust parquet import timezone [nocheck] #16304

Merged
merged 8 commits into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ public static void gbm_example_flow() {
null,
(byte)'\\',
false,
false,
null).execute().body();
System.out.println("parseSetupBody: " + parseSetupBody);

Expand Down Expand Up @@ -140,6 +141,7 @@ public static void gbm_example_flow() {
null,
null,
parseSetupBody.escapechar,
false,
null).execute().body();
System.out.println("parseBody: " + parseBody);

Expand Down
2 changes: 1 addition & 1 deletion h2o-core/src/main/java/water/api/FramesHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ public FramesV3 export(int version, FramesV3 s) {
if (s.parallel) {
Log.warn("Parallel export to a single file is not supported for parquet format! Export will continue with a parquet-specific setup.");
}
s.job = new JobV3(Frame.exportParquet(fr, s.path, s.force, s.compression, s.write_checksum));
s.job = new JobV3(Frame.exportParquet(fr, s.path, s.force, s.compression, s.write_checksum, s.tz_adjust_from_local));
} else {
Frame.CSVStreamParams csvParms = new Frame.CSVStreamParams()
.setSeparator(s.separator)
Expand Down
2 changes: 1 addition & 1 deletion h2o-core/src/main/java/water/api/ParseHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public ParseV3 parse(int version, ParseV3 parse) {
new ParseWriter.ParseErr[0], parse.chunk_size,
parse.decrypt_tool != null ? parse.decrypt_tool.key() : null, parse.skipped_columns,
parse.custom_non_data_line_markers != null ? parse.custom_non_data_line_markers.getBytes(): null,
parse.escapechar, parse.force_col_types);
parse.escapechar, parse.force_col_types, parse.tz_adjust_to_local);

if (parse.source_frames == null)
throw new H2OIllegalArgumentException("Data for Frame '" + parse.destination_frame.name + "' is not available. Please check that the path is valid (for all H2O nodes).'");
Expand Down
3 changes: 3 additions & 0 deletions h2o-core/src/main/java/water/api/schemas3/FramesV3.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ public class FramesV3 extends RequestSchemaV3<Frames, FramesV3> {
@API(help="Specifies if checksum should be written next to data files on export (if supported by export format).")
public boolean write_checksum = true;

@API(help="Specifies if the timezone should be adjusted from local to UTC timezone (parquet only).")
public boolean tz_adjust_from_local = false;

@API(help="Field separator (default ',')")
public byte separator = Frame.CSVStreamParams.DEFAULT_SEPARATOR;

Expand Down
3 changes: 3 additions & 0 deletions h2o-core/src/main/java/water/api/schemas3/ParseSetupV3.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ public class ParseSetupV3 extends RequestSchemaV3<ParseSetup, ParseSetupV3> {
" will happen without setting this parameter. Defaults to false.", direction=API.Direction.INPUT)
public boolean force_col_types;

@API(help="Adjust the imported time from GMT timezone to cluster timezone.", direction=API.Direction.INPUT)
public boolean tz_adjust_to_local;

@Override
public ParseSetup fillImpl(ParseSetup impl) {
ParseSetup parseSetup = fillImpl(impl, new String[] {"parse_type"});
Expand Down
3 changes: 3 additions & 0 deletions h2o-core/src/main/java/water/api/schemas3/ParseV3.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,7 @@ public class ParseV3 extends RequestSchemaV3<Iced, ParseV3> {

@API(help="One ASCII character used to escape other characters.", direction=API.Direction.INOUT)
public byte escapechar = ParseSetup.DEFAULT_ESCAPE_CHAR;

@API(help="Adjust the imported time from GMT timezone to cluster timezone.", direction=API.Direction.INPUT)
public boolean tz_adjust_to_local;
}
4 changes: 2 additions & 2 deletions h2o-core/src/main/java/water/fvec/Frame.java
Original file line number Diff line number Diff line change
Expand Up @@ -1614,7 +1614,7 @@ public static Job export(Frame fr, String path, String frameName, boolean overwr
return job.start(t, fr.anyVec().nChunks());
}

public static Job exportParquet(Frame fr, String path, boolean overwrite, String compression, boolean writeChecksum) {
public static Job exportParquet(Frame fr, String path, boolean overwrite, String compression, boolean writeChecksum, boolean tzAdjustFromLocal) {
// Validate input
if (H2O.getPM().isFileAccessDenied(path)) {
throw new H2OFileAccessDeniedException("File " + path + " access denied");
Expand All @@ -1638,7 +1638,7 @@ public static Job exportParquet(Frame fr, String path, boolean overwrite, String
}
Job job = new Job<>(fr._key, "water.fvec.Frame", "Export dataset");

H2O.H2OCountedCompleter t = parquetExporter.export(fr, path, overwrite, compression, writeChecksum);
H2O.H2OCountedCompleter t = parquetExporter.export(fr, path, overwrite, compression, writeChecksum, tzAdjustFromLocal);
return job.start(t, fr.anyVec().nChunks());
}

Expand Down
2 changes: 1 addition & 1 deletion h2o-core/src/main/java/water/parser/ARFFParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ static ParseSetup guessSetup(ByteVec bv, byte[] bits, byte sep, boolean singleQu
naStrings = addDefaultNAs(naStrings, ncols);

// Return the final setup
return new ParseSetup(ARFF_INFO, sep, singleQuotes, ParseSetup.NO_HEADER, ncols, labels, ctypes, domains, naStrings, data, nonDataLineMarkers, escapechar);
return new ParseSetup(ARFF_INFO, sep, singleQuotes, ParseSetup.NO_HEADER, ncols, labels, ctypes, domains, naStrings, data, nonDataLineMarkers, escapechar, false);
}

private static String[][] addDefaultNAs(String[][] naStrings, int nCols) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

public interface BinaryFormatExporter {

H2O.H2OCountedCompleter export(Frame frame, String path, boolean force, String compression, boolean writeChecksum);
H2O.H2OCountedCompleter export(Frame frame, String path, boolean force, String compression, boolean writeChecksum, boolean tzAdjustFromLocal);

boolean supports(ExportFileFormat format);
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public abstract class BinaryParserProvider extends ParserProvider {
@Deprecated
public final ParseSetup guessSetup(ByteVec v, byte[] bits, byte sep, int ncols, boolean singleQuotes, int checkHeader, String[] columnNames, byte[] columnTypes, String[][] domains, String[][] naStrings) {
ParseSetup ps = new ParseSetup(null, sep, singleQuotes, checkHeader,
ncols, columnNames, columnTypes, domains, naStrings, null);
ncols, columnNames, columnTypes, domains, naStrings, null, false);
return guessSetup(v, bits, ps);
}

Expand Down
4 changes: 2 additions & 2 deletions h2o-core/src/main/java/water/parser/CsvParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -731,7 +731,7 @@ else if (ParseUUID.isUUID(str))
}
//FIXME should set warning message and let fall through
return new ParseSetup(CSV_INFO, GUESS_SEP, singleQuotes, checkHeader, 1, null, ctypes, domains, naStrings, data, new ParseWriter.ParseErr[0],FileVec.DFLT_CHUNK_SIZE,
nonDataLineMarkers, escapechar);
nonDataLineMarkers, escapechar, false);
}
}
data[0] = determineTokens(lines[0], sep, singleQuotes, escapechar);
Expand Down Expand Up @@ -791,7 +791,7 @@ else if (ParseUUID.isUUID(str))

// Assemble the setup understood so far
ParseSetup resSetup = new ParseSetup(CSV_INFO, sep, singleQuotes, checkHeader, ncols, labels, null, null /*domains*/, naStrings, data,
nonDataLineMarkers, escapechar);
nonDataLineMarkers, escapechar, false);

// now guess the types
if (columnTypes == null || ncols != columnTypes.length) {
Expand Down
52 changes: 33 additions & 19 deletions h2o-core/src/main/java/water/parser/ParseSetup.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class ParseSetup extends Iced {
int[] _parse_columns_indices; // store column indices to be parsed into the final file
byte[] _nonDataLineMarkers;
boolean _force_col_types = false; // at end of parsing, change column type to users specified ones
boolean _tz_adjust_to_local = false;
String[] _orig_column_types; // copy over the original column type setup before translating to byte[]

String[] _synthetic_column_names; // Columns with constant values to be added to parsed Frame
Expand Down Expand Up @@ -73,35 +74,35 @@ public ParseSetup(ParseSetup ps) {
ps._separator, ps._single_quotes, ps._check_header, ps._number_columns,
ps._column_names, ps._column_types, ps._domains, ps._na_strings, ps._data,
new ParseWriter.ParseErr[0], ps._chunk_size, ps._decrypt_tool, ps._skipped_columns,
ps._nonDataLineMarkers, ps._escapechar);
ps._nonDataLineMarkers, ps._escapechar, ps._tz_adjust_to_local);
}

public static ParseSetup makeSVMLightSetup(){
return new ParseSetup(SVMLight_INFO, ParseSetup.GUESS_SEP,
false,ParseSetup.NO_HEADER,1,null,new byte[]{Vec.T_NUM},null,null,null, new ParseWriter.ParseErr[0],
null);
null, false);
}

// This method was called during guess setup, lot of things are null, like ctypes.
// when it is called again, it either contains the guess column types or it will have user defined column types
public ParseSetup(ParserInfo parse_type, byte sep, boolean singleQuotes, int checkHeader, int ncols, String[] columnNames,
byte[] ctypes, String[][] domains, String[][] naStrings, String[][] data, ParseWriter.ParseErr[] errs,
int chunkSize, byte[] nonDataLineMarkers, byte escapeChar) {
int chunkSize, byte[] nonDataLineMarkers, byte escapeChar, boolean tzAdjustToLocal) {
this(parse_type, sep, singleQuotes, checkHeader, ncols, columnNames, ctypes, domains, naStrings, data, errs,
chunkSize, null, null, nonDataLineMarkers, escapeChar);
chunkSize, null, null, nonDataLineMarkers, escapeChar, tzAdjustToLocal);
}

public ParseSetup(ParserInfo parse_type, byte sep, boolean singleQuotes, int checkHeader, int ncols, String[] columnNames,
byte[] ctypes, String[][] domains, String[][] naStrings, String[][] data, ParseWriter.ParseErr[] errs,
int chunkSize, Key<DecryptionTool> decrypt_tool, int[] skipped_columns, byte[] nonDataLineMarkers, byte escapeChar) {
int chunkSize, Key<DecryptionTool> decrypt_tool, int[] skipped_columns, byte[] nonDataLineMarkers, byte escapeChar, boolean tzAdjustToLocal) {
this(parse_type, sep, singleQuotes, checkHeader, ncols, columnNames, ctypes, domains, naStrings, data, errs,
chunkSize, decrypt_tool, skipped_columns, nonDataLineMarkers, escapeChar, false);
chunkSize, decrypt_tool, skipped_columns, nonDataLineMarkers, escapeChar, false, tzAdjustToLocal);
}

public ParseSetup(ParserInfo parse_type, byte sep, boolean singleQuotes, int checkHeader, int ncols, String[] columnNames,
byte[] ctypes, String[][] domains, String[][] naStrings, String[][] data, ParseWriter.ParseErr[] errs,
int chunkSize, Key<DecryptionTool> decrypt_tool, int[] skipped_columns, byte[] nonDataLineMarkers,
byte escapeChar, boolean force_col_types) {
byte escapeChar, boolean force_col_types, boolean tz_adjust_to_local) {
_parse_type = parse_type;
_separator = sep;
_nonDataLineMarkers = nonDataLineMarkers;
Expand All @@ -119,6 +120,7 @@ public ParseSetup(ParserInfo parse_type, byte sep, boolean singleQuotes, int che
_skipped_columns = skipped_columns;
_escapechar = escapeChar;
_force_col_types = force_col_types;
_tz_adjust_to_local = tz_adjust_to_local;
setParseColumnIndices(ncols, _skipped_columns);
}

Expand Down Expand Up @@ -172,7 +174,7 @@ ps.column_names, strToColumnTypes(ps.column_types),
ps.chunk_size,
ps.decrypt_tool != null ? ps.decrypt_tool.key() : null, ps.skipped_columns,
ps.custom_non_data_line_markers != null ? ps.custom_non_data_line_markers.getBytes() : null,
ps.escapechar, ps.force_col_types);
ps.escapechar, ps.force_col_types, ps.tz_adjust_to_local);
this._force_col_types = ps.force_col_types;
this._orig_column_types = this._force_col_types ? (ps.column_types == null ? null : ps.column_types.clone()) : null;
}
Expand All @@ -185,9 +187,9 @@ ps.column_names, strToColumnTypes(ps.column_types),
*/
public ParseSetup(ParserInfo parseType, byte sep, boolean singleQuotes, int checkHeader,
int ncols, String[] columnNames, byte[] ctypes,
String[][] domains, String[][] naStrings, String[][] data, byte[] nonDataLineMarkers, byte escapeChar) {
String[][] domains, String[][] naStrings, String[][] data, byte[] nonDataLineMarkers, byte escapeChar, boolean tzAdjustToLocal) {
this(parseType, sep, singleQuotes, checkHeader, ncols, columnNames, ctypes,
domains, naStrings, data, new ParseWriter.ParseErr[0], FileVec.DFLT_CHUNK_SIZE, nonDataLineMarkers, escapeChar);
domains, naStrings, data, new ParseWriter.ParseErr[0], FileVec.DFLT_CHUNK_SIZE, nonDataLineMarkers, escapeChar, tzAdjustToLocal);
}

/**
Expand All @@ -198,30 +200,30 @@ public ParseSetup(ParserInfo parseType, byte sep, boolean singleQuotes, int chec
*/
public ParseSetup(ParserInfo parseType, byte sep, boolean singleQuotes, int checkHeader,
int ncols, String[] columnNames, byte[] ctypes,
String[][] domains, String[][] naStrings, String[][] data, byte escapeChar) {
String[][] domains, String[][] naStrings, String[][] data, byte escapeChar, boolean tzAdjustToLocal) {
this(parseType, sep, singleQuotes, checkHeader, ncols, columnNames, ctypes,
domains, naStrings, data, new ParseWriter.ParseErr[0], FileVec.DFLT_CHUNK_SIZE, null, escapeChar);
domains, naStrings, data, new ParseWriter.ParseErr[0], FileVec.DFLT_CHUNK_SIZE, null, escapeChar, tzAdjustToLocal);
}

public ParseSetup(ParserInfo parseType, byte sep, boolean singleQuotes, int checkHeader,
int ncols, String[] columnNames, byte[] ctypes,
String[][] domains, String[][] naStrings, String[][] data) {
String[][] domains, String[][] naStrings, String[][] data, boolean tzAdjustToLocal) {
this(parseType, sep, singleQuotes, checkHeader, ncols, columnNames, ctypes,
domains, naStrings, data, new ParseWriter.ParseErr[0], FileVec.DFLT_CHUNK_SIZE, null, ParseSetup.DEFAULT_ESCAPE_CHAR);
domains, naStrings, data, new ParseWriter.ParseErr[0], FileVec.DFLT_CHUNK_SIZE, null, ParseSetup.DEFAULT_ESCAPE_CHAR, tzAdjustToLocal);
}

public ParseSetup(ParserInfo parseType, byte sep, boolean singleQuotes, int checkHeader,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should have a method like this:

public ParseSetup(ParserInfo parseType, byte sep, boolean singleQuotes, int checkHeader,
int ncols, String[] columnNames, byte[] ctypes,
String[][] domains, String[][] naStrings, String[][] data, ParseWriter.ParseErr[] errs, byte[] nonDataLineMarkers) {
this(ParserInfo parseType, byte sep, boolean singleQuotes, int checkHeader,
int ncols, String[] columnNames, byte[] ctypes,
String[][] domains, String[][] naStrings, String[][] data, ParseWriter.ParseErr[] errs, byte[] nonDataLineMarkers, false);
}

Please do this for all the original parseSetup methods.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doing that for all the original parseSetup methods would add a lot of not needed code
as the java api is not public I decided to fix the tests by adding "false"
please tell me if that's ok

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@krasinski : This is a breaking change for customers and please add a method that does not include your new parameters for them.

int ncols, String[] columnNames, byte[] ctypes,
String[][] domains, String[][] naStrings, String[][] data, ParseWriter.ParseErr[] errs, byte[] nonDataLineMarkers) {
String[][] domains, String[][] naStrings, String[][] data, ParseWriter.ParseErr[] errs, byte[] nonDataLineMarkers, boolean tzAdjustToLocal) {
this(parseType, sep, singleQuotes, checkHeader, ncols, columnNames, ctypes,
domains, naStrings, data, errs, FileVec.DFLT_CHUNK_SIZE, nonDataLineMarkers, ParseSetup.DEFAULT_ESCAPE_CHAR);
domains, naStrings, data, errs, FileVec.DFLT_CHUNK_SIZE, nonDataLineMarkers, ParseSetup.DEFAULT_ESCAPE_CHAR, tzAdjustToLocal);
}

public ParseSetup(ParserInfo parseType, byte sep, boolean singleQuotes, int checkHeader,
int ncols, String[] columnNames, byte[] ctypes,
String[][] domains, String[][] naStrings, String[][] data, ParseWriter.ParseErr[] errs) {
this(parseType, sep, singleQuotes, checkHeader, ncols, columnNames, ctypes,
domains, naStrings, data, errs, FileVec.DFLT_CHUNK_SIZE, null, ParseSetup.DEFAULT_ESCAPE_CHAR);
domains, naStrings, data, errs, FileVec.DFLT_CHUNK_SIZE, null, ParseSetup.DEFAULT_ESCAPE_CHAR, false);
}

/**
Expand All @@ -230,7 +232,7 @@ public ParseSetup(ParserInfo parseType, byte sep, boolean singleQuotes, int chec
* Typically used by file type parsers for returning final invalid results
*/
public ParseSetup(ParserInfo parseType, byte sep, boolean singleQuotes, int checkHeader, int ncols, String[][] data, ParseWriter.ParseErr[] errs) {
this(parseType, sep, singleQuotes, checkHeader, ncols, null, null, null, null, data, errs, FileVec.DFLT_CHUNK_SIZE, null, ParseSetup.DEFAULT_ESCAPE_CHAR);
this(parseType, sep, singleQuotes, checkHeader, ncols, null, null, null, null, data, errs, FileVec.DFLT_CHUNK_SIZE, null, ParseSetup.DEFAULT_ESCAPE_CHAR, false);
}

/**
Expand Down Expand Up @@ -258,6 +260,10 @@ public String[] getOrigColumnTypes() {
public boolean getForceColTypes() {
return _force_col_types;
}

public boolean gettzAdjustToLocal() {
return _tz_adjust_to_local;
}

public byte[] getColumnTypes() { return _column_types; }

Expand Down Expand Up @@ -558,6 +564,7 @@ public GuessSetupTsk(ParseSetup userSetup) {
}
if (_gblSetup==null)
throw new RuntimeException("This H2O node couldn't find the file(s) to parse. Please check files and/or working directories.");
_gblSetup.settzAdjustToLocal(_userSetup.gettzAdjustToLocal());
_gblSetup.setFileName(FileUtils.keyToFileName(key));
}

Expand Down Expand Up @@ -587,6 +594,7 @@ public void reduce(GuessSetupTsk other) {
else
_gblSetup._na_strings = _userSetup._na_strings;
}
_gblSetup._tz_adjust_to_local = _gblSetup._tz_adjust_to_local || _userSetup._tz_adjust_to_local;
// if(_gblSetup._errs != null)
for(ParseWriter.ParseErr err:_gblSetup._errs)
Log.warn("ParseSetup: " + err.toString());
Expand All @@ -600,6 +608,7 @@ private ParseSetup mergeSetups(ParseSetup setupA, ParseSetup setupB, String file
}
ParseSetup mergedSetup = setupA;

mergedSetup._tz_adjust_to_local = setupA._tz_adjust_to_local || setupB._tz_adjust_to_local;
mergedSetup._check_header = unifyCheckHeader(setupA._check_header, setupB._check_header);

mergedSetup._separator = unifyColumnSeparators(setupA._separator, setupB._separator);
Expand Down Expand Up @@ -707,7 +716,7 @@ public static ParseSetup guessSetup(ByteVec bv, byte [] bits, ParseSetup userSet
*/
private ParseSetup toInitialSetup() {
return new ParseSetup(_parse_type, _separator, _single_quotes, _check_header, GUESS_COL_CNT, _column_names,
_column_types, null, null, null, _nonDataLineMarkers, _escapechar);
_column_types, null, null, null, _nonDataLineMarkers, _escapechar, false);
}

/**
Expand Down Expand Up @@ -878,6 +887,11 @@ public ParseSetup setForceColTypes(boolean force_col_types) {
return this;
}

public ParseSetup settzAdjustToLocal(boolean tz_adjust_to_local) {
this._tz_adjust_to_local = tz_adjust_to_local;
return this;
}

public ParseSetup setDomains(String[][] domains) {
this._domains = domains;
return this;
Expand Down
2 changes: 1 addition & 1 deletion h2o-core/src/main/java/water/parser/ParserProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public final ParseSetup guessSetup(ByteVec v, byte[] bits, ParseSetup userSetup)
*/
protected ParseSetup guessSetup_impl(ByteVec v, byte[] bits, ParseSetup userSetup) {
ParseSetup ps = guessInitSetup(v, bits, userSetup);
return guessFinalSetup(v, bits, ps);
return guessFinalSetup(v, bits, ps).settzAdjustToLocal(userSetup._tz_adjust_to_local);
}

/**
Expand Down
2 changes: 1 addition & 1 deletion h2o-core/src/main/java/water/parser/SVMLightParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public static ParseSetup guessSetup(byte [] bits) {
if(lastNewline > 0) bits = Arrays.copyOf(bits,lastNewline+1);
SVMLightParser p = new SVMLightParser(new ParseSetup(SVMLight_INFO,
ParseSetup.GUESS_SEP, false,ParseSetup.GUESS_HEADER,ParseSetup.GUESS_COL_CNT,
null,null,null,null,null), null);
null,null,null,null,null, false), null);
SVMLightInspectParseWriter dout = new SVMLightInspectParseWriter();
p.parseChunk(0,new ByteAryData(bits,0), dout);
if (dout._ncols > 0 && dout._nlines > 0 && dout._nlines > dout._invalidLines)
Expand Down
Loading
Loading