Skip to content

Commit ea4f7eb

Browse files
authored
[fix][io] Fix the Alluxio sink to write messages successfully after the first file rotation (#19247)
1 parent cfd7e60 commit ea4f7eb

File tree

2 files changed

+38
-7
lines changed

2 files changed

+38
-7
lines changed

pulsar-io/alluxio/src/main/java/org/apache/pulsar/io/alluxio/sink/AlluxioSink.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public class AlluxioSink implements Sink<GenericObject> {
6767

6868
private FileSystem fileSystem;
6969
private FileOutStream fileOutStream;
70-
private CreateFilePOptions.Builder optionsBuilder;
70+
7171
private long recordsNum;
7272
private String tmpFilePath;
7373
private String fileDirPath;
@@ -114,8 +114,6 @@ public void open(Map<String, Object> config, SinkContext sinkContext) throws Exc
114114
fileSystem.createDirectory(tmpAlluxioDirPath);
115115
}
116116

117-
optionsBuilder = FileSystemOptions.createFileDefaults(configuration).toBuilder();
118-
119117
recordsNum = 0;
120118
recordsToAck = Lists.newArrayList();
121119
tmpFilePath = "";
@@ -206,6 +204,8 @@ private void writeToAlluxio(Record<GenericObject> record) throws AlluxioExceptio
206204
}
207205

208206
private void createTmpFile() throws AlluxioException, IOException {
207+
CreateFilePOptions.Builder optionsBuilder =
208+
FileSystemOptions.createFileDefaults(configuration).toBuilder();
209209
UUID id = UUID.randomUUID();
210210
String fileExtension = alluxioSinkConfig.getFileExtension();
211211
tmpFilePath = tmpFileDirPath + "/" + id.toString() + "_tmp" + fileExtension;

pulsar-io/alluxio/src/test/java/org/apache/pulsar/io/alluxio/sink/AlluxioSinkTest.java

+35-4
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.mockito.invocation.InvocationOnMock;
4141
import org.mockito.stubbing.Answer;
4242
import org.testng.Assert;
43+
import org.testng.annotations.AfterMethod;
4344
import org.testng.annotations.BeforeClass;
4445
import org.testng.annotations.BeforeMethod;
4546
import org.testng.annotations.Test;
@@ -125,6 +126,13 @@ public Object getNativeObject() {
125126
when(mockRecord.getSchema()).thenAnswer((Answer<Schema<KeyValue<String, Foobar>>>) invocation -> kvSchema);
126127
}
127128

129+
@AfterMethod
130+
public void tearDown() throws Exception {
131+
if (cluster != null) {
132+
cluster.stop();
133+
}
134+
}
135+
128136
@Test
129137
public void openTest() throws Exception {
130138
map.put("filePrefix", "TopicA");
@@ -147,7 +155,6 @@ public void openTest() throws Exception {
147155
Assert.assertTrue(client.exists(alluxioTmpURI));
148156

149157
sink.close();
150-
cluster.stop();
151158
}
152159

153160
@Test
@@ -186,11 +193,36 @@ public Object getNativeObject() {
186193
Assert.assertTrue(client.exists(alluxioTmpURI));
187194

188195
List<URIStatus> listAlluxioDirStatus = client.listStatus(alluxioURI);
189-
190196
List<String> pathList = listAlluxioDirStatus.stream().map(URIStatus::getPath).collect(Collectors.toList());
191-
192197
Assert.assertEquals(pathList.size(), 2);
193198

199+
for (String path : pathList) {
200+
if (path.contains("tmp")) {
201+
// Ensure that the temporary file is rotated and the directory is empty
202+
Assert.assertEquals(path, "/pulsar/tmp");
203+
} else {
204+
// Ensure that all rotated files conform the naming convention
205+
Assert.assertTrue(path.startsWith("/pulsar/TopicA-"));
206+
}
207+
}
208+
209+
// Ensure the subsequent writes are also successful
210+
sink.write(() -> new GenericObject() {
211+
@Override
212+
public SchemaType getSchemaType() {
213+
return SchemaType.KEY_VALUE;
214+
}
215+
216+
@Override
217+
public Object getNativeObject() {
218+
return new KeyValue<>((String) fooBar.getField("address"), fooBar);
219+
}
220+
});
221+
222+
listAlluxioDirStatus = client.listStatus(alluxioURI);
223+
pathList = listAlluxioDirStatus.stream().map(URIStatus::getPath).collect(Collectors.toList());
224+
Assert.assertEquals(pathList.size(), 3);
225+
194226
for (String path : pathList) {
195227
if (path.contains("tmp")) {
196228
Assert.assertEquals(path, "/pulsar/tmp");
@@ -200,7 +232,6 @@ public Object getNativeObject() {
200232
}
201233

202234
sink.close();
203-
cluster.stop();
204235
}
205236

206237
private LocalAlluxioCluster setupSingleMasterCluster() throws Exception {

0 commit comments

Comments
 (0)