Skip to content

Commit

Permalink
[fix][io] Fix es index creation (apache#22654)
Browse files Browse the repository at this point in the history
Signed-off-by: Zixuan Liu <nodeces@gmail.com>

(cherry picked from commit efcedf6)
Signed-off-by: Zixuan Liu <nodeces@gmail.com>
  • Loading branch information
nodece committed May 13, 2024
1 parent b178084 commit 06946d3
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public boolean deleteIndex(String index) throws IOException {
public boolean deleteDocument(String index, String documentId) throws IOException {
final DeleteRequest req = new
DeleteRequest.Builder()
.index(config.getIndexName())
.index(index)
.id(documentId)
.build();

Expand All @@ -156,7 +156,7 @@ public boolean deleteDocument(String index, String documentId) throws IOExceptio
public boolean indexDocument(String index, String documentId, String documentSource) throws IOException {
final Map mapped = objectMapper.readValue(documentSource, Map.class);
final IndexRequest<Object> indexRequest = new IndexRequest.Builder<>()
.index(config.getIndexName())
.index(index)
.document(mapped)
.id(documentId)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,12 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;

import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.fail;
import co.elastic.clients.transport.ElasticsearchTransport;
import com.fasterxml.jackson.core.JsonParseException;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Locale;
Expand All @@ -51,6 +56,17 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.GenericObject;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.GenericSchema;
import org.apache.pulsar.client.api.schema.RecordSchemaBuilder;
import org.apache.pulsar.client.api.schema.SchemaBuilder;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.elasticsearch.client.BulkProcessor;
Expand Down Expand Up @@ -149,6 +165,7 @@ public Object getNativeObject() {
});

when(mockRecord.getSchema()).thenAnswer((Answer<Schema<KeyValue<String, UserProfile>>>) invocation -> kvSchema);
when(mockRecord.getEventTime()).thenAnswer(invocation -> Optional.of(System.currentTimeMillis()));
}

@AfterMethod(alwaysRun = true)
Expand Down Expand Up @@ -206,6 +223,16 @@ public final void send100Test() throws Exception {
verify(mockRecord, times(100)).ack();
}

@Test
public final void send1WithFormattedIndexTest() throws Exception {
map.put("indexName", "test-formatted-index-%{+yyyy-MM-dd}");
sink.open(map, mockSinkContext);
send(1);
verify(mockRecord, times(1)).ack();
String value = getHitIdAtIndex("test-formatted-index-*", 0);
assertTrue(StringUtils.isNotBlank(value));
}

@Test
public final void sendNoSchemaTest() throws Exception {

Expand Down

0 comments on commit 06946d3

Please sign in to comment.