Skip to content

Commit

Permalink
Rename all instances of OneTable to xtable where possible
Browse files Browse the repository at this point in the history
  • Loading branch information
the-other-tim-brown committed Apr 24, 2024
1 parent 0b74362 commit d1b1737
Show file tree
Hide file tree
Showing 51 changed files with 189 additions and 182 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ catalogOptions: # all other options are passed through in a map
```
5. run with `java -jar utilities/target/utilities-0.1.0-SNAPSHOT-bundled.jar --datasetConfig my_config.yaml [--hadoopConfig hdfs-site.xml] [--convertersConfig converters.yaml] [--icebergCatalogConfig catalog.yaml]`
The bundled jar includes hadoop dependencies for AWS, Azure, and GCP. Sample hadoop configurations for configuring the converters
can be found in the [onetable-hadoop-defaults.xml](https://github.com/apache/incubator-xtable/blob/main/utilities/src/main/resources/onetable-hadoop-defaults.xml) file.
can be found in the [xtable-hadoop-defaults.xml](https://github.com/apache/incubator-xtable/blob/main/utilities/src/main/resources/xtable-hadoop-defaults.xml) file.
The custom hadoop configurations can be passed in with the `--hadoopConfig [custom-hadoop-config-file]` option.
The config in custom hadoop config file will override the default hadoop configurations. For an example
of a custom hadoop config file, see [hadoop.xml](https://xtable.apache.org/docs/fabric#step-2-translate-source-table-to-delta-lake-format-using-apache-xtable-incubating).
Expand All @@ -107,7 +107,7 @@ For setting up the repo on IntelliJ, open the project and change the java versio
You have found a bug, or have a cool idea you that want to contribute to the project ? Please file a GitHub issue [here](https://github.com/apache/incubator-xtable/issues)

## Adding a new target format
Adding a new target format requires a developer implement [ConversionTarget](./api/src/main/java/org/apache/xtable/spi/sync/ConversionTarget.java). Once you have implemented that interface, you can integrate it into the [OneTableClient](./core/src/main/java/org/apache/xtable/client/OneTableClient.java). If you think others may find that target useful, please raise a Pull Request to add it to the project.
Adding a new target format requires a developer implement [ConversionTarget](./api/src/main/java/org/apache/xtable/spi/sync/ConversionTarget.java). Once you have implemented that interface, you can integrate it into the [ConversionController](./core/src/main/java/org/apache/xtable/conversion/ConversionController.java). If you think others may find that target useful, please raise a Pull Request to add it to the project.

## Overview of the sync process
![img.png](assets/images/sync_flow.jpg)
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class TableSyncMetadata {
* Property name for the lastInstantSynced field from SyncResult, used for persisting
* lastInstantSynced in the table metadata/properties
*/
public static final String ONETABLE_LAST_INSTANT_SYNCED_PROP = "ONETABLE_LAST_INSTANT_SYNCED";
public static final String XTABLE_LAST_INSTANT_SYNCED_PROP = "XTABLE_LAST_INSTANT_SYNCED";
/**
* Property name for the list of instants to consider during the next sync. This list may include
* out-of-order instants that could be missed without explicit tracking.
Expand All @@ -54,7 +54,7 @@ public class TableSyncMetadata {

public Map<String, String> asMap() {
Map<String, String> map = new HashMap<>();
map.put(ONETABLE_LAST_INSTANT_SYNCED_PROP, lastInstantSynced.toString());
map.put(XTABLE_LAST_INSTANT_SYNCED_PROP, lastInstantSynced.toString());
map.put(
INFLIGHT_COMMITS_TO_CONSIDER_FOR_NEXT_SYNC_PROP,
convertInstantsToConsiderForNextSyncToString());
Expand All @@ -65,8 +65,8 @@ public static Optional<TableSyncMetadata> fromMap(Map<String, String> properties
if (properties != null) {
Instant lastInstantSynced = null;
List<Instant> instantsToConsiderForNextSync = null;
if (properties.containsKey(ONETABLE_LAST_INSTANT_SYNCED_PROP)) {
lastInstantSynced = Instant.parse(properties.get(ONETABLE_LAST_INSTANT_SYNCED_PROP));
if (properties.containsKey(XTABLE_LAST_INSTANT_SYNCED_PROP)) {
lastInstantSynced = Instant.parse(properties.get(XTABLE_LAST_INSTANT_SYNCED_PROP));
}
if (properties.containsKey(INFLIGHT_COMMITS_TO_CONSIDER_FOR_NEXT_SYNC_PROP)) {
instantsToConsiderForNextSync =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@ public enum ErrorCode {
INVALID_CONFIGURATION(10001),
INVALID_PARTITION_SPEC(10002),
INVALID_PARTITION_VALUE(10003),
IO_EXCEPTION(10004),
INVALID_SCHEMA(10005),
UNSUPPORTED_SCHEMA_TYPE(10006),
UNSUPPORTED_FEATURE(10007),
PARSE_EXCEPTION(10008);
READ_EXCEPTION(10004),
UPDATE_EXCEPTION(10005),
INVALID_SCHEMA(10006),
UNSUPPORTED_SCHEMA_TYPE(10007),
UNSUPPORTED_FEATURE(10008),
PARSE_EXCEPTION(10009);

private final int errorCode;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public interface ConversionTarget {
/** Completes the sync and performs any cleanup required. */
void completeSync();

/** Returns the onetable metadata persisted in the target */
/** Returns the xtable metadata persisted in the target */
Optional<TableSyncMetadata> getTableMetadata();

/** Returns the TableFormat name the client syncs to */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@

import org.apache.hadoop.conf.Configuration;

import org.apache.xtable.exception.OneIOException;
import org.apache.xtable.exception.ReadException;
import org.apache.xtable.model.IncrementalTableChanges;
import org.apache.xtable.model.InstantsForIncrementalSync;
import org.apache.xtable.model.InternalSnapshot;
Expand Down Expand Up @@ -139,7 +139,7 @@ public <COMMIT> Map<String, SyncResult> sync(
}
return syncResultsMerged;
} catch (IOException ioException) {
throw new OneIOException("Failed to close source converter", ioException);
throw new ReadException("Failed to close source converter", ioException);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public InternalDataFile convertRemoveActionToInternalDataFile(
.build();
}

public FileFormat convertToOneTableFileFormat(String provider) {
public FileFormat convertToFileFormat(String provider) {
if (provider.equals("parquet")) {
return FileFormat.APACHE_PARQUET;
} else if (provider.equals("orc")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@

import io.delta.tables.DeltaTable;

import org.apache.xtable.exception.OneIOException;
import org.apache.xtable.exception.ReadException;
import org.apache.xtable.model.CommitsBacklog;
import org.apache.xtable.model.InstantsForIncrementalSync;
import org.apache.xtable.model.InternalSnapshot;
Expand Down Expand Up @@ -112,8 +112,7 @@ public TableChange getTableChangeForCommit(Long versionNumber) {
List<Action> actionsForVersion = getChangesState().getActionsForVersion(versionNumber);
Snapshot snapshotAtVersion = deltaLog.getSnapshotAt(versionNumber, Option.empty());
FileFormat fileFormat =
actionsConverter.convertToOneTableFileFormat(
snapshotAtVersion.metadata().format().provider());
actionsConverter.convertToFileFormat(snapshotAtVersion.metadata().format().provider());
Set<InternalDataFile> addedFiles = new HashSet<>();
Set<InternalDataFile> removedFiles = new HashSet<>();
for (Action action : actionsForVersion) {
Expand Down Expand Up @@ -193,7 +192,7 @@ private List<PartitionFileGroup> getInternalDataFiles(Snapshot snapshot, Interna
fileIterator.forEachRemaining(dataFiles::add);
return PartitionFileGroup.fromFiles(dataFiles);
} catch (Exception e) {
throw new OneIOException("Failed to iterate through Delta data files", e);
throw new ReadException("Failed to iterate through Delta data files", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ private void commitTransaction() {
transaction.updateMetadata(metadata, false);
transaction.commit(
actions,
new DeltaOperations.Update(Option.apply(Literal.fromObject("onetable-delta-sync"))));
new DeltaOperations.Update(Option.apply(Literal.fromObject("xtable-delta-sync"))));
}

private Map<String, String> getConfigurationsForDeltaSync() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public class DeltaDataFileIterator implements DataFileIterator {
private DeltaDataFileIterator(
Snapshot snapshot, InternalSchema schema, boolean includeColumnStats) {
this.fileFormat =
actionsConverter.convertToOneTableFileFormat(snapshot.metadata().format().provider());
actionsConverter.convertToFileFormat(snapshot.metadata().format().provider());
this.fields = schema.getFields();
this.partitionFields =
partitionExtractor.convertFromDeltaPartitionFormat(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public class DeltaPartitionExtractor {
private static final String DATE_FORMAT_FOR_YEAR = "yyyy";
// For timestamp partition fields, actual partition column names in delta format will be of type
// generated & and with a name like `delta_partition_col_{transform_type}_{source_field_name}`.
private static final String DELTA_PARTITION_COL_NAME_FORMAT = "onetable_partition_col_%s_%s";
private static final String DELTA_PARTITION_COL_NAME_FORMAT = "xtable_partition_col_%s_%s";
static final String DELTA_GENERATION_EXPRESSION = "delta.generationExpression";
private static final List<ParsedGeneratedExpr.GeneratedExprType> GRANULARITIES =
Arrays.asList(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
*
* <ul>
* <li>Delta schemas are represented as Spark StructTypes which do not have enums so the enum
* types are lost when converting from Onetable to Delta Lake representations
* types are lost when converting from XTable to Delta Lake representations
* <li>Delta does not have a fixed length byte array option so {@link InternalType#FIXED} is
* simply translated to a {@link org.apache.spark.sql.types.BinaryType}
* <li>Similarly, {@link InternalType#TIMESTAMP_NTZ} is translated to a long in Delta Lake
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;

import org.apache.xtable.collectors.CustomCollectors;
import org.apache.xtable.exception.OneIOException;
import org.apache.xtable.model.exception.ParseException;
import org.apache.xtable.model.schema.InternalField;
import org.apache.xtable.model.schema.InternalSchema;
import org.apache.xtable.model.schema.InternalType;
Expand Down Expand Up @@ -201,7 +201,7 @@ public List<ColumnStat> getColumnStatsForFile(AddFile addFile, List<InternalFiel
})
.collect(CustomCollectors.toList(fields.size()));
} catch (IOException ex) {
throw new OneIOException("Unable to parse stats json", ex);
throw new ParseException("Unable to parse stats json", ex);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import java.util.concurrent.TimeUnit;

import org.apache.xtable.exception.NotSupportedException;
import org.apache.xtable.exception.OneIOException;
import org.apache.xtable.model.schema.InternalSchema;
import org.apache.xtable.model.schema.InternalType;
import org.apache.xtable.model.schema.PartitionTransformType;
Expand Down Expand Up @@ -75,7 +74,8 @@ public static Object convertFromDeltaColumnStatValue(Object value, InternalSchem
try {
instant = dateFormat.parse(value.toString()).toInstant();
} catch (ParseException ex) {
throw new OneIOException("Unable to parse time from column stats", ex);
throw new org.apache.xtable.model.exception.ParseException(
"Unable to parse time from column stats", ex);
}
}
InternalSchema.MetadataValue timestampPrecision =
Expand Down Expand Up @@ -186,7 +186,8 @@ public static Object convertFromDeltaPartitionValue(
DateFormat formatter = getDateFormat(dateFormat);
return formatter.parse(value).toInstant().toEpochMilli();
} catch (ParseException ex) {
throw new OneIOException("Unable to parse partition value", ex);
throw new org.apache.xtable.model.exception.ParseException(
"Unable to parse partition value", ex);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,22 @@
* limitations under the License.
*/

package org.apache.xtable.constants;
package org.apache.xtable.exception;

import lombok.Builder;
import lombok.Value;
import org.apache.xtable.model.exception.ErrorCode;
import org.apache.xtable.model.exception.InternalException;

@Builder
@Value
public class OneTableConstants {
/**
* Maximum number of syncs that are persisted in the archive file, after that least recent sync is
* evicted.
*/
public static final Integer NUM_ARCHIVED_SYNCS_RESULTS = 10;
/**
* Exception thrown when there is an error reading existing state from a {@link
* org.apache.xtable.spi.extractor.ConversionSource} or {@link
* org.apache.xtable.spi.sync.ConversionTarget}.
*/
public class ReadException extends InternalException {
public ReadException(String message, Throwable e) {
super(ErrorCode.READ_EXCEPTION, message, e);
}

/** InternalTable meta directory inside table base path to store sync info. */
public static final String ONETABLE_META_DIR = ".onetable";
public ReadException(String message) {
super(ErrorCode.READ_EXCEPTION, message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,16 @@
import org.apache.xtable.model.exception.ErrorCode;
import org.apache.xtable.model.exception.InternalException;

public class OneIOException extends InternalException {
public OneIOException(String message, Throwable e) {
super(ErrorCode.IO_EXCEPTION, message, e);
/**
* Exception thrown when there is an error updating a {@link
* org.apache.xtable.spi.sync.ConversionTarget}.
*/
public class UpdateException extends InternalException {
public UpdateException(String message, Throwable e) {
super(ErrorCode.UPDATE_EXCEPTION, message, e);
}

public OneIOException(String message) {
super(ErrorCode.IO_EXCEPTION, message);
public UpdateException(String message) {
super(ErrorCode.UPDATE_EXCEPTION, message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package org.apache.xtable.hudi;

import static org.apache.xtable.hudi.HudiSchemaExtractor.convertFromOneTablePath;
import static org.apache.xtable.hudi.HudiSchemaExtractor.convertFromXTablePath;

import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -244,7 +244,7 @@ private Map<String, HoodieColumnRangeMetadata<Comparable>> convertColStats(
columnStat ->
HoodieColumnRangeMetadata.<Comparable>create(
fileName,
convertFromOneTablePath(columnStat.getField().getPath()),
convertFromXTablePath(columnStat.getField().getPath()),
(Comparable) columnStat.getRange().getMinValue(),
(Comparable) columnStat.getRange().getMaxValue(),
columnStat.getNumNulls(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
import com.google.common.collect.PeekingIterator;

import org.apache.xtable.collectors.CustomCollectors;
import org.apache.xtable.exception.OneIOException;
import org.apache.xtable.exception.ReadException;
import org.apache.xtable.model.CommitsBacklog;
import org.apache.xtable.model.InstantsForIncrementalSync;
import org.apache.xtable.model.InternalSnapshot;
Expand Down Expand Up @@ -91,7 +91,7 @@ public InternalSnapshot getCurrentSnapshot() {
completedTimeline
.lastInstant()
.orElseThrow(
() -> new OneIOException("Unable to read latest commit from Hudi source table"));
() -> new ReadException("Unable to read latest commit from Hudi source table"));
List<HoodieInstant> pendingInstants =
activeTimeline
.filterInflightsAndRequested()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@
import org.apache.xtable.avro.AvroSchemaConverter;
import org.apache.xtable.conversion.PerTableConfig;
import org.apache.xtable.exception.NotSupportedException;
import org.apache.xtable.exception.OneIOException;
import org.apache.xtable.exception.ReadException;
import org.apache.xtable.exception.UpdateException;
import org.apache.xtable.model.InternalTable;
import org.apache.xtable.model.TableSyncMetadata;
import org.apache.xtable.model.schema.InternalField;
Expand Down Expand Up @@ -304,7 +305,7 @@ public Optional<TableSyncMetadata> getTableMetadata() {
.getExtraMetadata();
}
} catch (IOException ex) {
throw new OneIOException("Unable to read Hudi commit metadata", ex);
throw new ReadException("Unable to read Hudi commit metadata", ex);
}
})
.flatMap(TableSyncMetadata::fromMap));
Expand Down Expand Up @@ -359,7 +360,7 @@ public void commit() {
// reuse existing table schema if no schema is provided as part of this commit
schema = new TableSchemaResolver(metaClient).getTableAvroSchema();
} catch (Exception ex) {
throw new OneIOException("Unable to read Hudi table schema", ex);
throw new ReadException("Unable to read Hudi table schema", ex);
}
}
HoodieWriteConfig writeConfig =
Expand Down Expand Up @@ -426,7 +427,7 @@ private void markInstantsAsCleaned(
try {
partitionsToClean = planner.getPartitionPathsToClean(earliestInstant);
} catch (IOException ex) {
throw new OneIOException("Unable to get partitions to clean", ex);
throw new ReadException("Unable to get partitions to clean", ex);
}
if (partitionsToClean.isEmpty()) {
return;
Expand Down Expand Up @@ -532,7 +533,7 @@ private void markInstantsAsCleaned(
activeTimeline.transitionCleanInflightToComplete(
inflightClean, TimelineMetadataUtils.serializeCleanMetadata(cleanMetadata));
} catch (Exception ex) {
throw new OneIOException("Unable to clean Hudi timeline", ex);
throw new UpdateException("Unable to clean Hudi timeline", ex);
}
}

Expand All @@ -543,7 +544,7 @@ private void runArchiver(
HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(config, table);
archiver.archiveIfRequired(engineContext, true);
} catch (IOException ex) {
throw new OneIOException("Unable to archive Hudi timeline", ex);
throw new UpdateException("Unable to archive Hudi timeline", ex);
}
}

Expand Down
Loading

0 comments on commit d1b1737

Please sign in to comment.