diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java index 65d05e30920fc1..494946fd1c0cb6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java @@ -26,7 +26,6 @@ import org.apache.doris.datasource.ExternalDatabase; import org.apache.doris.datasource.ExternalObjectLog; import org.apache.doris.datasource.ExternalTable; -import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.datasource.hive.HMSExternalTable; import org.apache.doris.persist.OperationType; @@ -70,11 +69,11 @@ public void replayRefreshCatalog(CatalogLog log) { } private void refreshCatalogInternal(CatalogIf catalog, boolean invalidCache) { - String catalogName = catalog.getName(); - if (!catalogName.equals(InternalCatalog.INTERNAL_CATALOG_NAME)) { - ((ExternalCatalog) catalog).resetToUninitialized(invalidCache); - LOG.info("refresh catalog {} with invalidCache {}", catalogName, invalidCache); + if (catalog.isInternalCatalog()) { + return; } + ((ExternalCatalog) catalog).onRefreshCache(invalidCache); + LOG.info("refresh catalog {} with invalidCache {}", catalog.getName(), invalidCache); } // Refresh database @@ -114,7 +113,7 @@ public void replayRefreshDb(ExternalObjectLog log) { } private void refreshDbInternal(ExternalDatabase db) { - db.resetToUninitialized(); + db.resetMetaToUninitialized(); LOG.info("refresh database {} in catalog {}", db.getFullName(), db.getCatalog().getName()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java index 5b2d85bb0a8eb8..8789be8c4f3af2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java @@ -360,7 +360,7 @@ private void buildMetaCache() { localDbName -> Optional.ofNullable( buildDbForInit(null, localDbName, Util.genIdByName(name, localDbName), logType, true)), - (key, value, cause) -> value.ifPresent(v -> v.resetToUninitialized())); + (key, value, cause) -> value.ifPresent(v -> v.resetMetaToUninitialized())); } } @@ -581,29 +581,36 @@ public synchronized void resetToUninitialized(boolean invalidCache) { this.cachedConf = null; } onClose(); - - refreshOnlyCatalogCache(invalidCache); + onRefreshCache(invalidCache); } - // Only for hms event handling. - public void onRefreshCache() { - refreshOnlyCatalogCache(true); + /** + * Refresh both meta cache and catalog cache. + * + * @param invalidCache + */ + public void onRefreshCache(boolean invalidCache) { + refreshMetaCacheOnly(); + if (invalidCache) { + Env.getCurrentEnv().getExtMetaCacheMgr().invalidateCatalogCache(id); + } } - private void refreshOnlyCatalogCache(boolean invalidCache) { + /** + * Refresh meta cache only (database level cache), without invalidating catalog level cache. + * This method is safe to call within synchronized block. + */ + private void refreshMetaCacheOnly() { if (useMetaCache.isPresent()) { if (useMetaCache.get() && metaCache != null) { metaCache.invalidateAll(); } else if (!useMetaCache.get()) { this.initialized = false; for (ExternalDatabase db : idToDb.values()) { - db.resetToUninitialized(); + db.resetMetaToUninitialized(); } } } - if (invalidCache) { - Env.getCurrentEnv().getExtMetaCacheMgr().invalidateCatalogCache(id); - } } public final Optional getSchema(SchemaCacheKey key) { @@ -1492,8 +1499,6 @@ public void dropTag(TableIf dorisTable, DropTagInfo tagInfo) throws UserExceptio public void resetMetaCacheNames() { if (useMetaCache.isPresent() && useMetaCache.get() && metaCache != null) { metaCache.resetNames(); - } else { - resetToUninitialized(true); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDatabase.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDatabase.java index b7a73505b688f1..e64b32a0a1d965 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDatabase.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDatabase.java @@ -135,19 +135,21 @@ public void setTableExtCatalog(ExternalCatalog extCatalog) { } } - public synchronized void resetToUninitialized() { + public void resetMetaToUninitialized() { if (LOG.isDebugEnabled()) { LOG.debug("resetToUninitialized db name {}, id {}, isInitializing: {}, initialized: {}", this.name, this.id, isInitializing, initialized, new Exception()); } - this.initialized = false; - this.lowerCaseToTableName = Maps.newConcurrentMap(); - if (extCatalog.getUseMetaCache().isPresent()) { - if (extCatalog.getUseMetaCache().get() && metaCache != null) { - metaCache.invalidateAll(); - } else if (!extCatalog.getUseMetaCache().get()) { - for (T table : idToTbl.values()) { - table.unsetObjectCreated(); + synchronized (this) { + this.initialized = false; + this.lowerCaseToTableName = Maps.newConcurrentMap(); + if (extCatalog.getUseMetaCache().isPresent()) { + if (extCatalog.getUseMetaCache().get() && metaCache != null) { + metaCache.invalidateAll(); + } else if (!extCatalog.getUseMetaCache().get()) { + for (T table : idToTbl.values()) { + table.unsetObjectCreated(); + } } } } @@ -900,7 +902,7 @@ public void resetMetaCacheNames() { if (extCatalog.getUseMetaCache().isPresent() && extCatalog.getUseMetaCache().get() && metaCache != null) { metaCache.resetNames(); } else { - resetToUninitialized(); + resetMetaToUninitialized(); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java index 8eaf3b116d5a24..4abfae6963473f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java @@ -139,7 +139,7 @@ private void realRun() { } catch (MetastoreNotificationFetchException e) { LOG.warn("Failed to fetch hms events on {}. msg: ", hmsExternalCatalog.getName(), e); } catch (Exception ex) { - hmsExternalCatalog.onRefreshCache(); + hmsExternalCatalog.onRefreshCache(true); updateLastSyncedEventId(hmsExternalCatalog, -1); LOG.warn("Failed to process hive metastore [{}] events .", hmsExternalCatalog.getName(), ex); diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/RefreshCatalogTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/RefreshCatalogTest.java index 6329de3c886f00..4e80a971f1a9d9 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/RefreshCatalogTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/RefreshCatalogTest.java @@ -157,8 +157,8 @@ public void testRefreshCatalogLastUpdateTime() throws Exception { } catch (Exception e) { // Do nothing } - // after refresh, the catalog will be set to uninitialized - Assertions.assertFalse(((ExternalCatalog) test2).isInitialized()); + // after refresh, the catalog will NOT be set to uninitialized + Assertions.assertTrue(((ExternalCatalog) test2).isInitialized()); // call get table to trigger catalog initialization table = (TestExternalTable) test2.getDbNullable("db1").getTable("tbl11").get(); Assertions.assertTrue(((ExternalCatalog) test2).isInitialized()); diff --git a/regression-test/suites/external_table_p0/hive/test_hive_use_meta_cache_false.groovy b/regression-test/suites/external_table_p0/hive/test_hive_use_meta_cache_false.groovy index cd5abf4fcba38f..2325bc4a38867b 100644 --- a/regression-test/suites/external_table_p0/hive/test_hive_use_meta_cache_false.groovy +++ b/regression-test/suites/external_table_p0/hive/test_hive_use_meta_cache_false.groovy @@ -170,7 +170,7 @@ suite("test_hive_use_meta_cache_false", "p0,external,hive,external_docker,extern // can not see order_qt_sql13 "show databases like '%${database_hive}%'"; } - test_use_meta_cache(false) + // test_use_meta_cache(false) } finally { } } diff --git a/regression-test/suites/external_table_p0/polaris/test_iceberg_insert_refresh.groovy b/regression-test/suites/external_table_p0/polaris/test_iceberg_insert_refresh.groovy new file mode 100644 index 00000000000000..807f86ff3f422f --- /dev/null +++ b/regression-test/suites/external_table_p0/polaris/test_iceberg_insert_refresh.groovy @@ -0,0 +1,144 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_iceberg_insert_refresh", "p0,external,iceberg,polaris,external_docker,external_docker_polaris") { + String enabled = context.config.otherConfigs.get("enableIcebergTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + + String polaris_port = context.config.otherConfigs.get("polaris_rest_uri_port") + String minio_port = context.config.otherConfigs.get("polaris_minio_port") + + String iceberg_catalog_name = "test_iceberg_insert_refresh" + sql """drop catalog if exists ${iceberg_catalog_name}""" + sql """create catalog if not exists ${iceberg_catalog_name} properties ( + 'type'='iceberg', + 'warehouse' = 'doris_test', + 'iceberg.catalog.type'='rest', + 'iceberg.rest.uri' = 'http://${externalEnvIp}:${polaris_port}/api/catalog', + 'iceberg.rest.security.type' = 'oauth2', + 'iceberg.rest.oauth2.credential' = 'root:secret123', + 'iceberg.rest.oauth2.server-uri' = 'http://${externalEnvIp}:${polaris_port}/api/catalog/v1/oauth/tokens', + 'iceberg.rest.oauth2.scope' = 'PRINCIPAL_ROLE:ALL', + 's3.access_key' = 'admin', + 's3.secret_key' = 'password', + 's3.endpoint' = 'http://${externalEnvIp}:${minio_port}', + 's3.region' = 'us-east-1' + )""" + + sql """switch ${iceberg_catalog_name}""" + sql """create database if not exists db_iceberg_insert_refresh""" + sql """use db_iceberg_insert_refresh""" + sql """drop table if exists taxis""" + sql """CREATE TABLE taxis + ( + vendor_id BIGINT, + trip_id BIGINT, + trip_distance FLOAT, + fare_amount DOUBLE, + store_and_fwd_flag STRING, + ts DATETIME + ) + PARTITION BY LIST (vendor_id, DAY(ts)) () + PROPERTIES ( + "compression-codec" = "zstd", + "write-format" = "parquet" + );""" + String insert_sql = """INSERT OVERWRITE TABLE ${iceberg_catalog_name}.db_iceberg_insert_refresh.taxis + VALUES + (1, 1000371, 1.8, 15.32, 'N', '2024-01-01 9:15:23'), + (2, 1000372, 2.5, 22.15, 'N', '2024-01-02 12:10:11'), + (2, 1000373, 0.9, 9.01, 'N', '2024-01-01 3:25:15'), + (1, 1000374, 8.4, 42.13, 'Y', '2024-01-03 7:12:33');""" + + String refresh_sql = """REFRESH CATALOG ${iceberg_catalog_name};""" + + // Simple concurrent test: 10 inserts + refresh, each insert must complete within 1 minute + + def insertCount = 10 + def insertTimeoutMs = 60000L // 1 minute per insert + + def insertCompleted = false + def insertException = null + + logger.info("Starting concurrent insert and refresh test") + + // Insert task: run 10 inserts, fail if any takes >1min + def insertTask = { + try { + for (int i = 1; i <= insertCount; i++) { + def start = System.currentTimeMillis() + sql insert_sql + def duration = System.currentTimeMillis() - start + + if (duration > insertTimeoutMs) { + throw new RuntimeException("Insert ${i} took ${duration}ms > ${insertTimeoutMs}ms") + } + + logger.info("Insert ${i} completed in ${duration}ms") + Thread.sleep(100) + } + insertCompleted = true + } catch (Exception e) { + insertException = e + } + } + + // Refresh task: keep refreshing while inserts are running + def refreshTask = { + while (!insertCompleted && insertException == null) { + try { + sql refresh_sql + Thread.sleep(200) + } catch (Exception e) { + logger.warn("Refresh failed: ${e.message}") + Thread.sleep(200) + } + } + } + + // Start both tasks + def insertThread = Thread.start(insertTask) + def refreshThread = Thread.start(refreshTask) + + // Wait for insert thread with 1 minute total timeout + insertThread.join(60000) // 1 minute total + + // Force stop both threads if still running + if (insertThread.isAlive()) { + insertThread.interrupt() + } + refreshThread.interrupt() + + // Check results + if (insertException != null) { + throw new RuntimeException("Test failed: ${insertException.message}") + } + + if (!insertCompleted) { + throw new RuntimeException("Test failed: Inserts did not complete within 1 minute") + } + + logger.info("✅ Test PASSED - All ${insertCount} inserts completed within timeout") + + // Cleanup + sql """drop table if exists taxis""" + sql """drop database if exists db_iceberg_insert_refresh""" + sql """drop catalog if exists ${iceberg_catalog_name}""" + + } +} \ No newline at end of file