Skip to content
4 changes: 3 additions & 1 deletion api/src/main/java/org/apache/iceberg/catalog/Namespace.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

/** A namespace in a {@link Catalog}. */
public class Namespace {
public class Namespace implements java.io.Serializable {

private static final long serialVersionUID = 1L;
private static final Namespace EMPTY_NAMESPACE = new Namespace(new String[] {});
private static final Joiner DOT = Joiner.on('.');
private static final Predicate<String> CONTAINS_NULL_CHARACTER =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.iceberg.catalog;

import java.io.Serializable;
import java.util.Arrays;
import java.util.Locale;
import java.util.Objects;
Expand All @@ -26,10 +27,10 @@
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;

/** Identifies a table in iceberg catalog. */
public class TableIdentifier {
public class TableIdentifier implements Serializable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does making this class Serializable really fix the root cause? It looks like the parse(String) method would have to have a different parsing logic to respect escaped dots.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The root cause was the following line:
this.identifier = tableIdentifier.toString();

Instead of storing the string value, I stored the TableIdentifier object itself. As a result, the TableIdentifier class needs to implement Serializable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense, but wouldn't TableIdentifier.parse(tableName) still suffer from the same issue?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you’re right. When using the parse function, dots are treated as separators.

Maybe parsing isn’t the best approach in this case. Would it make more sense to work with the complete object instead? Would of course be a breaking change

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the information about which part of the string represents the namespace is lost.

Copy link
Author

@peach12345 peach12345 Jan 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The parsing function is, of course, working as intended.
However, in the CatalogTableLoader class, when you pass in a TableIdentifier, the identifier is stored as a string:

private CatalogTableLoader(CatalogLoader catalogLoader, TableIdentifier tableIdentifier) {
      this.catalogLoader = catalogLoader;
      this.identifier = tableIdentifier.toString();
    }

Later, when calling the loadTable function, that string is parsed again. This fails when the table name contains dots:

public Table loadTable() {
      FlinkEnvironmentContext.init();
      return catalog.loadTable(TableIdentifier.parse(identifier));
    }

That’s why storing the full TableIdentifier object instead of its string representation makes more sense.
Fixing the parsing function to properly handle dots would also be a valid approach, but in my experience, relying on parsing often leads to subtle errors or bugs.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mxm what do you think? or should be change the parsing to logic to make it more robust and support dots in table names?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think fixing the parsing logic would be preferable for me. There are many cases where we dynamically need to parse a table name and we don't want to rely on Java serialization in the process.

Making Namespace / TableIdentifier Serializable could be an addition. What do you think about the Serializable change @pvary?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @mxm for the answer.

If we only change the parsing logic, I don’t think we need the additional Serializable parts. I’ll give it a try.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The iceberg-go library uses a special ASCII separator charactor for this. Maybe we could do something similar?
https://github.com/apache/iceberg-go/blob/7cfbf238ee7f6159af86a30f90e78097f2caae8b/catalog/rest/rest.go#L64


private static final long serialVersionUID = 1L;
private static final Splitter DOT = Splitter.on('.');

private final Namespace namespace;
private final String name;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,13 @@ class CatalogTableLoader implements TableLoader {
private static final long serialVersionUID = 1L;

private final CatalogLoader catalogLoader;
private final String identifier;
private final TableIdentifier identifier;

private transient Catalog catalog;

private CatalogTableLoader(CatalogLoader catalogLoader, TableIdentifier tableIdentifier) {
this.catalogLoader = catalogLoader;
this.identifier = tableIdentifier.toString();
this.identifier = tableIdentifier;
}

@Override
Expand All @@ -130,7 +130,7 @@ public boolean isOpen() {
@Override
public Table loadTable() {
FlinkEnvironmentContext.init();
return catalog.loadTable(TableIdentifier.parse(identifier));
return catalog.loadTable(identifier);
}

@Override
Expand All @@ -145,7 +145,7 @@ public void close() throws IOException {
@Override
@SuppressWarnings({"checkstyle:NoClone", "checkstyle:SuperClone"})
public TableLoader clone() {
return new CatalogTableLoader(catalogLoader.clone(), TableIdentifier.parse(identifier));
return new CatalogTableLoader(catalogLoader.clone(), identifier);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.flink;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem to be related directly to Flink, but rather directly to Iceberg core.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please ignore. Will rewrite those tests.


import static org.assertj.core.api.Assertions.assertThat;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.junit.jupiter.api.Test;

public class CatalogTableLoaderTest {

private static final TableIdentifier IDENTIFIER = TableIdentifier.of("db", "tbl");

// Simple serializable CatalogLoader that creates a proxy Catalog on each loadCatalog() call.
static class SerializableCatalogLoader
implements org.apache.iceberg.flink.CatalogLoader, Serializable {
private static final long serialVersionUID = 1L;

@Override
public Catalog loadCatalog() {
// Create a simple Table proxy object
Table tableProxy =
(Table)
Proxy.newProxyInstance(
Table.class.getClassLoader(),
new Class[] {Table.class},
new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) {
// For tests the Table object does not need to implement methods
if (method.getName().equals("toString")) {
return "fake-table";
}
return null;
}
});

// Create a Catalog proxy that returns loadTable(identifier) and is Closeable
InvocationHandler handler =
new InvocationHandler() {
private boolean closed = false;

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String name = method.getName();
if ("loadTable".equals(name)) {
return tableProxy;
} else if ("close".equals(name)) {
closed = true;
return null;
} else if ("toString".equals(name)) {
return "fake-catalog";
}
throw new UnsupportedOperationException("Not implemented in test proxy: " + name);
}
};

return (Catalog)
Proxy.newProxyInstance(
Catalog.class.getClassLoader(),
new Class[] {Catalog.class, Closeable.class},
handler);
}

@Override
@SuppressWarnings({"checkstyle:NoClone", "checkstyle:SuperClone"})
public org.apache.iceberg.flink.CatalogLoader clone() {
// A new instance is sufficient for tests
return new SerializableCatalogLoader();
}
}

private static <T extends Serializable> T roundTripSerialize(T obj) throws Exception {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (ObjectOutputStream oos = new ObjectOutputStream(baos)) {
oos.writeObject(obj);
}
byte[] bytes = baos.toByteArray();
try (ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes))) {
@SuppressWarnings("unchecked")
T deserialized = (T) ois.readObject();
return deserialized;
}
}

@Test
public void testOpenLoadClose() throws Exception {
CatalogLoader catalogLoader = new SerializableCatalogLoader();
TableLoader loader = TableLoader.fromCatalog(catalogLoader, IDENTIFIER);

// initially closed
assertThat(loader.isOpen()).isFalse();

// open and load
loader.open();
assertThat(loader.isOpen()).isTrue();
Table table = loader.loadTable();
assertThat(table).isNotNull();
assertThat(table).hasToString("fake-table");

// close
loader.close();
assertThat(loader.isOpen()).isFalse();
}

@Test
public void testSerializationKeepsLoaderFunctional() throws Exception {
org.apache.iceberg.flink.CatalogLoader catalogLoader = new SerializableCatalogLoader();
TableLoader original = TableLoader.fromCatalog(catalogLoader, IDENTIFIER);

// serialize / deserialize the TableLoader
TableLoader deserialized = roundTripSerialize(original);

// should still work after deserialization
assertThat(deserialized.isOpen()).isFalse();
deserialized.open();
assertThat(deserialized.isOpen()).isTrue();
Table table = deserialized.loadTable();
assertThat(table).isNotNull();
assertThat(table).hasToString("fake-table");
deserialized.close();
assertThat(deserialized.isOpen()).isFalse();
}
}
Loading