Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,9 @@ public static boolean isHdfsOnOssEndpoint(String location) {

// Return the file system type and the file system identity.
// The file system identity is the scheme and authority of the URI, eg. "hdfs://host:port" or "s3://bucket".
public static Pair<FileSystemType, String> getFSIdentity(String location, String bindBrokerName) {
LocationPath locationPath = new LocationPath(location, Collections.emptyMap(), true);
public static Pair<FileSystemType, String> getFSIdentity(String location,
Map<String, String> properties, String bindBrokerName) {
LocationPath locationPath = new LocationPath(location, properties, true);
FileSystemType fsType = (bindBrokerName != null) ? FileSystemType.BROKER : locationPath.getFileSystemType();
URI uri = locationPath.getPath().toUri();
String fsIdent = Strings.nullToEmpty(uri.getScheme()) + "://" + Strings.nullToEmpty(uri.getAuthority());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,10 +349,11 @@ private FileCacheValue getFileCache(String location, String inputFormat,
List<String> partitionValues,
String bindBrokerName) throws UserException {
FileCacheValue result = new FileCacheValue();
Map<String, String> properties = catalog.getCatalogProperty().getProperties();
RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
new FileSystemCache.FileSystemCacheKey(LocationPath.getFSIdentity(
location, bindBrokerName),
catalog.getCatalogProperty().getProperties(),
location, properties, bindBrokerName),
properties,
bindBrokerName, jobConf));
result.setSplittable(HiveUtil.isSplittable(fs, inputFormat, location));
// For Tez engine, it may generate subdirectoies for "union" query.
Expand Down Expand Up @@ -743,6 +744,7 @@ public List<FileCacheValue> getFilesByTransaction(List<HivePartition> partitions
boolean isFullAcid, boolean skipCheckingAcidVersionFile, long tableId, String bindBrokerName) {
List<FileCacheValue> fileCacheValues = Lists.newArrayList();
try {
Map<String, String> properties = catalog.getCatalogProperty().getProperties();
for (HivePartition partition : partitions) {

AuthenticationConfig authenticationConfig = AuthenticationConfig.getKerberosConfig(jobConf);
Expand Down Expand Up @@ -777,8 +779,8 @@ public List<FileCacheValue> getFilesByTransaction(List<HivePartition> partitions
RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
new FileSystemCache.FileSystemCacheKey(
LocationPath.getFSIdentity(baseOrDeltaPath.toUri().toString(),
bindBrokerName),
catalog.getCatalogProperty().getProperties(),
properties, bindBrokerName),
properties,
bindBrokerName, jobConf));
Status status = fs.exists(acidVersionPath);
if (status != Status.OK) {
Expand All @@ -804,8 +806,8 @@ public List<FileCacheValue> getFilesByTransaction(List<HivePartition> partitions
String location = delta.getPath().toString();
RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
new FileSystemCache.FileSystemCacheKey(
LocationPath.getFSIdentity(location, bindBrokerName),
catalog.getCatalogProperty().getProperties(), bindBrokerName, jobConf));
LocationPath.getFSIdentity(location, properties, bindBrokerName),
properties, bindBrokerName, jobConf));
List<RemoteFile> remoteFiles = new ArrayList<>();
Status status = fs.listFiles(location, false, remoteFiles);
if (status.ok()) {
Expand All @@ -832,8 +834,8 @@ public List<FileCacheValue> getFilesByTransaction(List<HivePartition> partitions
String location = directory.getBaseDirectory().toString();
RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
new FileSystemCache.FileSystemCacheKey(
LocationPath.getFSIdentity(location, bindBrokerName),
catalog.getCatalogProperty().getProperties(), bindBrokerName, jobConf));
LocationPath.getFSIdentity(location, properties, bindBrokerName),
properties, bindBrokerName, jobConf));
List<RemoteFile> remoteFiles = new ArrayList<>();
Status status = fs.listFiles(location, false, remoteFiles);
if (status.ok()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public Status listDirectories(String remotePath, Set<String> result) {
public FileSystem fileSystem(String location) {
return extMetaCacheMgr.getFsCache().getRemoteFileSystem(
new FileSystemCache.FileSystemCacheKey(
LocationPath.getFSIdentity(location,
LocationPath.getFSIdentity(location, properties,
bindBrokerName), properties, bindBrokerName));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

Expand All @@ -39,7 +40,8 @@ public void testHdfsLocationConvert() {

String beLocation = locationPath.toStorageLocation().toString();
Assertions.assertTrue(beLocation.startsWith("hdfs://"));
Assertions.assertEquals(LocationPath.getFSIdentity(beLocation, null).first, FileSystemType.DFS);
Assertions.assertEquals(LocationPath.getFSIdentity(beLocation, Collections.emptyMap(), null).first,
FileSystemType.DFS);

// HA props
Map<String, String> props = new HashMap<>();
Expand Down Expand Up @@ -92,7 +94,8 @@ public void testJFSLocationConvert() {
// BE
loc = locationPath.toStorageLocation().toString();
Assertions.assertTrue(loc.startsWith("jfs://"));
Assertions.assertEquals(LocationPath.getFSIdentity(loc, null).first, FileSystemType.JFS);
Assertions.assertEquals(LocationPath.getFSIdentity(loc, Collections.emptyMap(), null).first,
FileSystemType.JFS);
}

@Test
Expand All @@ -106,7 +109,8 @@ public void testGSLocationConvert() {
// BE
String beLoc = locationPath.toStorageLocation().toString();
Assertions.assertTrue(beLoc.startsWith("s3://"));
Assertions.assertEquals(LocationPath.getFSIdentity(beLoc, null).first, FileSystemType.S3);
Assertions.assertEquals(LocationPath.getFSIdentity(beLoc, Collections.emptyMap(), null).first,
FileSystemType.S3);
}

@Test
Expand All @@ -118,17 +122,21 @@ public void testOSSLocationConvert() {
// BE
String beLocation = locationPath.toStorageLocation().toString();
Assertions.assertTrue(beLocation.startsWith("s3://"));
Assertions.assertEquals(LocationPath.getFSIdentity(beLocation, null).first, FileSystemType.S3);
Assertions.assertEquals(LocationPath.getFSIdentity(beLocation, Collections.emptyMap(), null).first,
FileSystemType.S3);

// test oss-hdfs
rangeProps.put(OssProperties.ENDPOINT, "oss-dls.aliyuncs.com");
locationPath = new LocationPath("oss://test.oss-dls.aliyuncs.com/path", rangeProps);
Assertions.assertEquals("oss://test.oss-dls.aliyuncs.com/path", locationPath.get());
Assertions.assertEquals(LocationPath.getFSIdentity(locationPath.get(), rangeProps, null).first,
FileSystemType.DFS);
// FE
Assertions.assertTrue(locationPath.get().startsWith("oss://test.oss-dls.aliyuncs"));
// BE
beLocation = locationPath.toStorageLocation().toString();
Assertions.assertTrue(beLocation.startsWith("oss://test.oss-dls.aliyuncs"));
Assertions.assertEquals(locationPath.getFileSystemType(), FileSystemType.DFS);

}

@Test
Expand All @@ -140,23 +148,26 @@ public void testCOSLocationConvert() {
String beLocation = locationPath.toStorageLocation().toString();
// BE
Assertions.assertTrue(beLocation.startsWith("s3://"));
Assertions.assertEquals(LocationPath.getFSIdentity(beLocation, null).first, FileSystemType.S3);
Assertions.assertEquals(LocationPath.getFSIdentity(beLocation, Collections.emptyMap(), null).first,
FileSystemType.S3);

locationPath = new LocationPath("cosn://test.com", rangeProps);
// FE
Assertions.assertTrue(locationPath.get().startsWith("cosn://"));
// BE
beLocation = locationPath.toStorageLocation().toString();
Assertions.assertTrue(beLocation.startsWith("s3://"));
Assertions.assertEquals(LocationPath.getFSIdentity(beLocation, null).first, FileSystemType.S3);
Assertions.assertEquals(LocationPath.getFSIdentity(beLocation, Collections.emptyMap(), null).first,
FileSystemType.S3);

locationPath = new LocationPath("ofs://test.com", rangeProps);
// FE
Assertions.assertTrue(locationPath.get().startsWith("ofs://"));
// BE
beLocation = locationPath.toStorageLocation().toString();
Assertions.assertTrue(beLocation.startsWith("ofs://"));
Assertions.assertEquals(LocationPath.getFSIdentity(beLocation, null).first, FileSystemType.OFS);
Assertions.assertEquals(LocationPath.getFSIdentity(beLocation, Collections.emptyMap(), null).first,
FileSystemType.OFS);

// GFS is now equals to DFS
locationPath = new LocationPath("gfs://test.com", rangeProps);
Expand All @@ -165,7 +176,8 @@ public void testCOSLocationConvert() {
// BE
beLocation = locationPath.toStorageLocation().toString();
Assertions.assertTrue(beLocation.startsWith("gfs://"));
Assertions.assertEquals(LocationPath.getFSIdentity(beLocation, null).first, FileSystemType.DFS);
Assertions.assertEquals(LocationPath.getFSIdentity(beLocation, Collections.emptyMap(), null).first,
FileSystemType.DFS);
}

@Test
Expand All @@ -177,7 +189,8 @@ public void testOBSLocationConvert() {
// BE
String beLocation = locationPath.toStorageLocation().toString();
Assertions.assertTrue(beLocation.startsWith("s3://"));
Assertions.assertEquals(LocationPath.getFSIdentity(beLocation, null).first, FileSystemType.S3);
Assertions.assertEquals(LocationPath.getFSIdentity(beLocation, Collections.emptyMap(), null).first,
FileSystemType.S3);
}

@Test
Expand Down
Loading