Skip to content

Commit

Permalink
[CONNECTOR] Fix some issues in connector (#1487)
Browse files Browse the repository at this point in the history
  • Loading branch information
Haser0305 authored Feb 12, 2023
1 parent 707dbd3 commit 361b12b
Show file tree
Hide file tree
Showing 7 changed files with 61 additions and 16 deletions.
14 changes: 14 additions & 0 deletions common/src/main/java/org/astraea/common/Configuration.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,20 @@ default String requireString(String key) {
return string(key).orElseThrow(() -> new NoSuchElementException(key + " is nonexistent"));
}

/**
* @param prefix the string to be filtered and removed
* @return new Configuration only contains which the key value starts with the prefix, and the
* prefix string and the following dot will be removed from the key
*/
default Configuration filteredPrefixConfigs(String prefix) {
return of(
entrySet().stream()
.filter(k -> k.getKey().startsWith(prefix))
.collect(
Collectors.toMap(
i -> i.getKey().replaceFirst(prefix + '.', ""), Map.Entry::getValue)));
}

/**
* @param key the key whose associated value is to be returned
* @param separator to split string to multiple strings
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,10 @@ void testMap() {
Assertions.assertEquals(
Map.of("v0", 0, "v1", 1), config.map("key", ",", ":", Integer::valueOf));
}

@Test
void testFilteredConfigs() {
var config = Configuration.of(Map.of("key", "v1", "filtered.key", "v2", "key.filtered", "v3"));
Assertions.assertEquals(Map.of("key", "v2"), config.filteredPrefixConfigs("filtered").raw());
}
}
35 changes: 25 additions & 10 deletions connector/src/main/java/org/astraea/connector/backup/Exporter.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,32 +43,32 @@ public class Exporter extends SinkConnector {
Definition.builder()
.name("fs.schema")
.type(Definition.Type.STRING)
.documentation("decide which file system to use, such as FTP.")
.documentation("decide which file system to use, such as FTP, HDFS.")
.required()
.build();
static Definition HOSTNAME_KEY =
Definition.builder()
.name("fs.ftp.hostname")
.name("fs.<schema>.hostname")
.type(Definition.Type.STRING)
.documentation("the host name of the ftp server used.")
.documentation("the host name of the <schema> server used.")
.build();
static Definition PORT_KEY =
Definition.builder()
.name("fs.ftp.port")
.name("fs.<schema>.port")
.type(Definition.Type.STRING)
.documentation("the port of the ftp server used.")
.documentation("the port of the <schema> server used.")
.build();
static Definition USER_KEY =
Definition.builder()
.name("fs.ftp.user")
.name("fs.<schema>.user")
.type(Definition.Type.STRING)
.documentation("the user name required to login to the FTP server.")
.documentation("the user name required to login to the <schema> server.")
.build();
static Definition PASSWORD_KEY =
Definition.builder()
.name("fs.ftp.password")
.name("fs.<schema>.password")
.type(Definition.Type.PASSWORD)
.documentation("the password required to login to the ftp server.")
.documentation("the password required to login to the <schema> server.")
.build();
static Definition PATH_KEY =
Definition.builder()
Expand All @@ -94,6 +94,13 @@ public class Exporter extends SinkConnector {
.defaultValue("3s")
.documentation("the maximum time before a new archive file is rolling out.")
.build();

static Definition OVERRIDE_KEY =
Definition.builder()
.name("fs.<schema>.override.<property_name>")
.type(Definition.Type.STRING)
.documentation("a value that needs to be overridden in the file system.")
.build();
private Configuration configs;

@Override
Expand All @@ -113,7 +120,15 @@ protected List<Configuration> takeConfiguration(int maxTasks) {

@Override
protected List<Definition> definitions() {
return List.of(SCHEMA_KEY, HOSTNAME_KEY, PORT_KEY, USER_KEY, PASSWORD_KEY, PATH_KEY, SIZE_KEY);
return List.of(
SCHEMA_KEY,
HOSTNAME_KEY,
PORT_KEY,
USER_KEY,
PASSWORD_KEY,
PATH_KEY,
SIZE_KEY,
OVERRIDE_KEY);
}

public static class Task extends SinkTask {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,9 @@ void testHdfsSinkTask() {
"tasks.max",
"1",
"roll.duration",
"100m");
"100m",
"fs.hdfs.override.dfs.client.use.datanode.hostname",
"true");

task.start(configs);

Expand Down
2 changes: 1 addition & 1 deletion docs/connector/exporter.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@
| 參數名稱 | 說明 | 預設值 |
|:--------------------------|--------------------------------------------------------------------------------------------------------------|-------|
| fs.schema | (必填) 決定儲存目標為何種檔案系統,例如: `local`, `ftp`||
| path | (必填) 填入目標檔案系統要儲存的資料夾目錄之目標位置 ||
| fs.{file System}.hostname | (選填) 如果最初的 `fs.schema` 選定為非 `local` 之項目,需要填入目標 `host name``file System` 取決於前者之內容 ||
| fs.{file System}.port | (選填) 填入目標檔案系統之 `port` ||
| fs.{file System}.user | (選填) 填入目標檔案系統之登入 `user` ||
| fs.{file System}.password | (選填) 填入目標檔案系統之登入 `password` ||
| path | (選填) 填入目標檔案系統要儲存的資料夾目錄之目標位置 ||
| size | (選填) 寫入檔案目標超過此設定之大小上限時會創見新檔案,並且寫入目標改為新創建之檔案。 <br/>檔案大小單位: `Bit`, `Kb`, `KiB`, `Mb`, etc. | 100MB |
| roll.duration | (選填) 如果 `connector` 在超過此時間沒有任何資料流入,會把當下所有已創建之檔案關閉,並在之後有新資料時會創建新檔案並寫入。 <br/>時間單位: `s`, `m`, `h`, `day`, etc. | 3s |

Expand Down
14 changes: 11 additions & 3 deletions fs/src/main/java/org/astraea/fs/hdfs/HdfsFileSystem.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class HdfsFileSystem implements FileSystem {
public static final String HOSTNAME_KEY = "fs.hdfs.hostname";
public static final String PORT_KEY = "fs.hdfs.port";
public static final String USER_KEY = "fs.hdfs.user";
public static final String OVERRIDE_KEY = "fs.hdfs.override";

private org.apache.hadoop.fs.FileSystem fs;

Expand All @@ -48,7 +49,14 @@ public HdfsFileSystem(Configuration config) {
+ ":"
+ config.requireString(PORT_KEY));

fs = org.apache.hadoop.fs.FileSystem.get(uri, new org.apache.hadoop.conf.Configuration());
var conf = new org.apache.hadoop.conf.Configuration();

config
.filteredPrefixConfigs(OVERRIDE_KEY)
.entrySet()
.forEach(configItem -> conf.set(configItem.getKey(), configItem.getValue()));

fs = org.apache.hadoop.fs.FileSystem.get(uri, conf, config.requireString(USER_KEY));
});
}

Expand Down Expand Up @@ -78,7 +86,7 @@ public List<String> listFiles(String path) {
return Utils.packException(
() -> {
if (type(path) != Type.FOLDER)
throw new IllegalArgumentException(path + " is nto a folder");
throw new IllegalArgumentException(path + " is not a folder");
return Arrays.stream(fs.listStatus(new Path(path)))
.filter(FileStatus::isFile)
.map(f -> f.getPath().toUri().getPath())
Expand All @@ -91,7 +99,7 @@ public List<String> listFolders(String path) {
return Utils.packException(
() -> {
if (type(path) != Type.FOLDER)
throw new IllegalArgumentException(path + " is nto a folder");
throw new IllegalArgumentException(path + " is not a folder");
return Arrays.stream(fs.listStatus(new Path(path)))
.filter(FileStatus::isDirectory)
.map(f -> f.getPath().getName())
Expand Down
2 changes: 1 addition & 1 deletion it/src/main/java/org/astraea/it/HdfsServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public int port() {

@Override
public String user() {
return "root";
return System.getProperty("user.name");
}

@Override
Expand Down

0 comments on commit 361b12b

Please sign in to comment.