Skip to content

Commit

Permalink
[FLINK-3640] Close MetadataApplier when the job stops
Browse files Browse the repository at this point in the history
  • Loading branch information
morozov committed Sep 29, 2024
1 parent 4b13c49 commit 5761577
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

/** {@code MetadataApplier} is used to apply metadata changes to external systems. */
@PublicEvolving
public interface MetadataApplier extends Serializable {
public interface MetadataApplier extends Serializable, AutoCloseable {

/** Apply the given {@link SchemaChangeEvent} to external systems. */
void applySchemaChange(SchemaChangeEvent schemaChangeEvent) throws SchemaEvolveException;
Expand All @@ -50,4 +50,7 @@ default boolean acceptsSchemaEvolutionType(SchemaChangeEventType schemaChangeEve
default Set<SchemaChangeEventType> getSupportedSchemaEvolutionTypes() {
return Arrays.stream(SchemaChangeEventTypeFamily.ALL).collect(Collectors.toSet());
}

@Override
default void close() throws Exception {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,12 @@ public void applySchemaChange(SchemaChangeEvent schemaChangeEvent)
});
}

public void close() throws Exception {
if (catalog != null) {
catalog.close();
}
}

private void applyCreateTable(CreateTableEvent event) throws SchemaEvolveException {
try {
if (!catalog.databaseExists(event.tableId().getSchemaName())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,12 @@ public void close() throws IOException {
if (schemaChangeThreadPool != null) {
schemaChangeThreadPool.shutdown();
}

try {
metadataApplier.close();
} catch (Exception e) {
throw new IOException("Failed to close metadata applier.", e);
}
}

private List<SchemaChangeEvent> calculateDerivedSchemaChangeEvents(SchemaChangeEvent event) {
Expand Down

0 comments on commit 5761577

Please sign in to comment.