Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,18 @@
import com.maxmind.geoip2.DatabaseReader;
import org.apache.lucene.search.TotalHits;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.elasticsearch.action.ingest.SimulateDocumentBaseResult;
import org.elasticsearch.action.ingest.SimulatePipelineRequest;
import org.elasticsearch.action.ingest.SimulatePipelineResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.collect.Set;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.MatchQueryBuilder;
import org.elasticsearch.index.query.RangeQueryBuilder;
Expand All @@ -24,6 +32,7 @@
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.junit.After;

import java.io.BufferedOutputStream;
Expand All @@ -37,11 +46,20 @@
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import java.util.zip.GZIPInputStream;

import static java.nio.file.StandardOpenOption.CREATE;
import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
import static java.nio.file.StandardOpenOption.WRITE;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.endsWith;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;

@ClusterScope(scope = Scope.TEST, maxNumDataNodes = 1)
public class GeoIpDownloaderIT extends AbstractGeoIpIT {
Expand Down Expand Up @@ -121,6 +139,132 @@ public void testGeoIpDatabasesDownload() throws Exception {
}
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/69972")
@TestLogging(value = "org.elasticsearch.ingest.geoip:TRACE", reason = "https://github.com/elastic/elasticsearch/issues/69972")
public void testUseGeoIpProcessorWithDownloadedDBs() throws Exception {
// setup:
BytesReference bytes;
try (XContentBuilder builder = JsonXContent.contentBuilder()) {
builder.startObject();
{
builder.startArray("processors");
{
builder.startObject();
{
builder.startObject("geoip");
{
builder.field("field", "ip");
builder.field("target_field", "ip-city");
builder.field("database_file", "GeoLite2-City.mmdb");
}
builder.endObject();
}
builder.endObject();
builder.startObject();
{
builder.startObject("geoip");
{
builder.field("field", "ip");
builder.field("target_field", "ip-country");
builder.field("database_file", "GeoLite2-Country.mmdb");
}
builder.endObject();
}
builder.endObject();
builder.startObject();
{
builder.startObject("geoip");
{
builder.field("field", "ip");
builder.field("target_field", "ip-asn");
builder.field("database_file", "GeoLite2-ASN.mmdb");
}
builder.endObject();
}
builder.endObject();
}
builder.endArray();
}
builder.endObject();
bytes = BytesReference.bytes(builder);
}
assertAcked(client().admin().cluster().preparePutPipeline("_id", bytes, XContentType.JSON).get());

// verify before updating dbs
try (XContentBuilder builder = JsonXContent.contentBuilder()) {
builder.startObject();
builder.startArray("docs");
{
builder.startObject();
builder.field("_index", "my-index");
{
builder.startObject("_source");
builder.field("ip", "89.160.20.128");
builder.endObject();
}
builder.endObject();
}
builder.endArray();
builder.endObject();
bytes = BytesReference.bytes(builder);
}
SimulatePipelineRequest simulateRequest = new SimulatePipelineRequest(bytes, XContentType.JSON);
simulateRequest.setId("_id");
{
SimulatePipelineResponse simulateResponse = client().admin().cluster().simulatePipeline(simulateRequest).actionGet();
assertThat(simulateResponse.getPipelineId(), equalTo("_id"));
assertThat(simulateResponse.getResults().size(), equalTo(1));
SimulateDocumentBaseResult result = (SimulateDocumentBaseResult) simulateResponse.getResults().get(0);
assertThat(result.getIngestDocument().getFieldValue("ip-city.city_name", String.class), equalTo("Tumba"));
assertThat(result.getIngestDocument().getFieldValue("ip-asn.organization_name", String.class), equalTo("Bredband2 AB"));
assertThat(result.getIngestDocument().getFieldValue("ip-country.country_name", String.class), equalTo("Sweden"));
}

// Enable downloader:
Settings.Builder settings = Settings.builder().put(GeoIpDownloaderTaskExecutor.ENABLED_SETTING.getKey(), true);
assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(settings));

final List<Path> geoipTmpDirs = StreamSupport.stream(internalCluster().getInstances(Environment.class).spliterator(), false)
.map(env -> {
Path geoipTmpDir = env.tmpFile().resolve("geoip-databases");
assertThat(Files.exists(geoipTmpDir), is(true));
return geoipTmpDir;
}).collect(Collectors.toList());
assertBusy(() -> {
for (Path geoipTmpDir : geoipTmpDirs) {
try (Stream<Path> list = Files.list(geoipTmpDir)) {
List<String> files = list.map(Path::toString).collect(Collectors.toList());
assertThat(files, containsInAnyOrder(endsWith("GeoLite2-City.mmdb"), endsWith("GeoLite2-Country.mmdb"),
endsWith("GeoLite2-ASN.mmdb")));
}
}
});

// Verify after updating dbs:
assertBusy(() -> {
SimulatePipelineResponse simulateResponse = client().admin().cluster().simulatePipeline(simulateRequest).actionGet();
assertThat(simulateResponse.getPipelineId(), equalTo("_id"));
assertThat(simulateResponse.getResults().size(), equalTo(1));
SimulateDocumentBaseResult result = (SimulateDocumentBaseResult) simulateResponse.getResults().get(0);
assertThat(result.getIngestDocument().getFieldValue("ip-city.city_name", String.class), equalTo("Linköping"));
assertThat(result.getIngestDocument().getFieldValue("ip-asn.organization_name", String.class), equalTo("Bredband2 AB"));
assertThat(result.getIngestDocument().getFieldValue("ip-country.country_name", String.class), equalTo("Sweden"));
});

// Disable downloader:
settings = Settings.builder().put(GeoIpDownloaderTaskExecutor.ENABLED_SETTING.getKey(), false);
assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(settings));

assertBusy(() -> {
for (Path geoipTmpDir : geoipTmpDirs) {
try (Stream<Path> list = Files.list(geoipTmpDir)) {
List<String> files = list.map(Path::toString).collect(Collectors.toList());
assertThat(files, empty());
}
}
});
}

@SuppressForbidden(reason = "Maxmind API requires java.io.File")
private void parseDatabase(Path tempFile) throws IOException {
try (DatabaseReader databaseReader = new DatabaseReader.Builder(tempFile.toFile()).build()) {
Expand Down
Loading