Skip to content

Commit

Permalink
[fix][io] Fix Alluxio sink to respect the alluxioMasterHost property (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
sekikn authored Jan 11, 2023
1 parent 82b1357 commit dda1437
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public void open(Map<String, Object> config, SinkContext sinkContext) throws Exc
// initialize FileSystem
String alluxioMasterHost = alluxioSinkConfig.getAlluxioMasterHost();
int alluxioMasterPort = alluxioSinkConfig.getAlluxioMasterPort();
InstancedConfiguration.defaults().set(PropertyKey.MASTER_HOSTNAME, alluxioMasterHost);
configuration.set(PropertyKey.MASTER_HOSTNAME, alluxioMasterHost);
configuration.set(PropertyKey.MASTER_RPC_PORT, alluxioMasterPort);
if (alluxioSinkConfig.getSecurityLoginUser() != null) {
configuration.set(PropertyKey.SECURITY_LOGIN_USERNAME, alluxioSinkConfig.getSecurityLoginUser());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public class AlluxioSinkTest {

protected Map<String, Object> map;
protected AlluxioSink sink;
protected LocalAlluxioCluster cluster;

@Mock
protected Record<GenericObject> mockRecord;
Expand All @@ -86,10 +87,16 @@ public static void init() {
}

@BeforeMethod
public final void setUp() {
public final void setUp() throws Exception {
cluster = setupSingleMasterCluster();

map = new HashMap<>();
map.put("alluxioMasterHost", "localhost");
map.put("alluxioMasterPort", "19998");
// alluxioMasterHost should be set via LocalAlluxioCluster#getHostname
// instead of using a fixed value "localhost", since it seems that
// LocalAlluxioCluster may bind other address than localhost
// when the node has multiple network interfaces.
map.put("alluxioMasterHost", cluster.getHostname());
map.put("alluxioMasterPort", cluster.getMasterRpcPort());
map.put("alluxioDir", "/pulsar");
map.put("filePrefix", "prefix");
map.put("schemaEnable", "true");
Expand Down Expand Up @@ -127,8 +134,6 @@ public void openTest() throws Exception {

String alluxioDir = "/pulsar";

LocalAlluxioCluster cluster = setupSingleMasterCluster();

sink = new AlluxioSink();
sink.open(map, mockSinkContext);

Expand Down Expand Up @@ -156,8 +161,6 @@ public void writeAndCloseTest() throws Exception {

String alluxioDir = "/pulsar";

LocalAlluxioCluster cluster = setupSingleMasterCluster();

sink = new AlluxioSink();
sink.open(map, mockSinkContext);

Expand Down

0 comments on commit dda1437

Please sign in to comment.