Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

some-logging-improvements #262

Merged
merged 2 commits into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,9 @@ public boolean isCommitTimedOut() {
return false;
}

if (System.currentTimeMillis() - startTime > config.commitTimeoutMs()) {
LOG.info("Commit timeout reached");
long currentTime = System.currentTimeMillis();
if (currentTime - startTime > config.commitTimeoutMs()) {
LOG.info("Commit timeout reached. Now: {}, start: {}, timeout: {}", currentTime, startTime, config.commitTimeoutMs());
return true;
}
return false;
Expand All @@ -125,14 +126,14 @@ public boolean isCommitReady(int expectedPartitionCount) {
.sum();

if (receivedPartitionCount >= expectedPartitionCount) {
LOG.debug(
LOG.info(
tabmatfournier marked this conversation as resolved.
Show resolved Hide resolved
"Commit {} ready, received responses for all {} partitions",
currentCommitId,
receivedPartitionCount);
return true;
}

LOG.debug(
LOG.info(
"Commit {} not ready, received responses for {} of {} partitions, waiting for more",
currentCommitId,
receivedPartitionCount,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,10 @@ public void process() {
if (commitState.isCommitIntervalReached()) {
// send out begin commit
commitState.startNewCommit();
LOG.info("Started new commit with commit-id={}", commitState.currentCommitId().toString());
tabmatfournier marked this conversation as resolved.
Show resolved Hide resolved
Event event =
new Event(config.controlGroupId(), new StartCommit(commitState.currentCommitId()));
send(event);
LOG.debug("Started new commit with commit-id={}", commitState.currentCommitId().toString());
}

consumeAvailable(POLL_DURATION, this::receive);
Expand All @@ -127,6 +127,7 @@ private boolean receive(Envelope envelope) {

private void commit(boolean partialCommit) {
try {
LOG.info("Processing commit after responses for {}, isPartialCommit {}",commitState.currentCommitId(), partialCommit);
tabmatfournier marked this conversation as resolved.
Show resolved Hide resolved
doCommit(partialCommit);
} catch (Exception e) {
LOG.warn("Commit failed, will try again next cycle", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ private void routeRecordStatically(SinkRecord record) {

private void routeRecordDynamically(SinkRecord record) {
String routeField = config.tablesRouteField();
Preconditions.checkNotNull(routeField, "Route field cannot be null with dynamic routing");
Preconditions.checkNotNull(routeField, String.format("Route field cannot be null with dynamic routing at topic: %s, partition: %d, offset: %d", record.topic(), record.kafkaPartition(), record.kafkaOffset()));
tabmatfournier marked this conversation as resolved.
Show resolved Hide resolved

String routeValue = extractRouteValue(record.value(), routeField);
if (routeValue != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public IcebergWriterFactory(Catalog catalog, IcebergSinkConfig config) {
}

public RecordWriter createWriter(
String tableName, SinkRecord sample, boolean ignoreMissingTable) {
String tableName, SinkRecord sample, boolean ignoreMissingTable) {
TableIdentifier identifier = TableIdentifier.parse(tableName);
Table table;
try {
Expand All @@ -56,7 +56,8 @@ public RecordWriter createWriter(
if (config.autoCreateEnabled()) {
table = autoCreateTable(tableName, sample);
} else if (ignoreMissingTable) {
return new RecordWriter() {};
return new RecordWriter() {
};
tabmatfournier marked this conversation as resolved.
Show resolved Hide resolved
} else {
throw nst;
}
Expand All @@ -67,47 +68,52 @@ public RecordWriter createWriter(

@VisibleForTesting
Table autoCreateTable(String tableName, SinkRecord sample) {
StructType structType;
if (sample.valueSchema() == null) {
structType =
SchemaUtils.inferIcebergType(sample.value(), config)
.orElseThrow(() -> new DataException("Unable to create table from empty object"))
.asStructType();
} else {
structType = SchemaUtils.toIcebergType(sample.valueSchema(), config).asStructType();
}
try {
StructType structType;
if (sample.valueSchema() == null) {
structType =
SchemaUtils.inferIcebergType(sample.value(), config)
.orElseThrow(() -> new DataException("Unable to create table from empty object"))
.asStructType();
} else {
structType = SchemaUtils.toIcebergType(sample.valueSchema(), config).asStructType();
}

org.apache.iceberg.Schema schema = new org.apache.iceberg.Schema(structType.fields());
TableIdentifier identifier = TableIdentifier.parse(tableName);
org.apache.iceberg.Schema schema = new org.apache.iceberg.Schema(structType.fields());
TableIdentifier identifier = TableIdentifier.parse(tableName);

List<String> partitionBy = config.tableConfig(tableName).partitionBy();
PartitionSpec spec;
try {
spec = SchemaUtils.createPartitionSpec(schema, partitionBy);
List<String> partitionBy = config.tableConfig(tableName).partitionBy();
PartitionSpec spec;
try {
spec = SchemaUtils.createPartitionSpec(schema, partitionBy);
} catch (Exception e) {
LOG.error(
"Unable to create partition spec {}, table {} will be unpartitioned",
partitionBy,
identifier,
e);
spec = PartitionSpec.unpartitioned();
}

PartitionSpec partitionSpec = spec;
AtomicReference<Table> result = new AtomicReference<>();
Tasks.range(1)
.retry(IcebergSinkConfig.CREATE_TABLE_RETRIES)
.run(
notUsed -> {
try {
result.set(catalog.loadTable(identifier));
} catch (NoSuchTableException e) {
result.set(
catalog.createTable(
identifier, schema, partitionSpec, config.autoCreateProps()));
LOG.info("Created new table {} from record at topic: {}, partition: {}, offset: {}", identifier, sample.topic(), sample.kafkaPartition(), sample.kafkaOffset());
}
});
return result.get();
} catch (Exception e) {
LOG.error(
"Unable to create partition spec {}, table {} will be unpartitioned",
partitionBy,
identifier,
e);
spec = PartitionSpec.unpartitioned();
LOG.error("Error creating new table {} from record at topic: {}, partition: {}, offset: {}", tableName, sample.topic(), sample.kafkaPartition(), sample.kafkaOffset());
throw e;
}
tabmatfournier marked this conversation as resolved.
Show resolved Hide resolved

PartitionSpec partitionSpec = spec;
AtomicReference<Table> result = new AtomicReference<>();
Tasks.range(1)
.retry(IcebergSinkConfig.CREATE_TABLE_RETRIES)
.run(
notUsed -> {
try {
result.set(catalog.loadTable(identifier));
} catch (NoSuchTableException e) {
result.set(
catalog.createTable(
identifier, schema, partitionSpec, config.autoCreateProps()));
LOG.info("Created new table {} from record at topic: {}, partition: {}, offset: {}", identifier, sample.topic(), sample.kafkaPartition(), sample.kafkaOffset());
}
});
return result.get();
}
}
Loading