Skip to content

Commit

Permalink
Add DatabaseRegistry for locally managing databases managed by GeoIpD…
Browse files Browse the repository at this point in the history
…ownloader (#69971)

Backport of #69540 to 7.x branch.

This component is responsible for making the databases maintained by GeoIpDownloader
available for ingest processors.

Also provided a lookup mechanism for geoip processors with fallback to {@link LocalDatabases}.
All databases are downloaded into a geoip tmp directory, which is created at node startup.

The following high level steps are executed after each cluster state update:
1) Check which databases are available in {@link GeoIpTaskState},
   which is part of the geoip downloader persistent task.
2) For each database check whether the databases have changed
   by comparing the local and remote md5 hash or are locally missing.
3) For each database identified in step 2 start downloading the database
   chunks. Each chunks is appended to a tmp file (inside geoip tmp dir) and
   after all chunks have been downloaded, the database is uncompressed and
   renamed to the final filename. After this the database is loaded and
   if there is an old instance of this database then that is closed.
4) Cleanup locally loaded databases that are no longer mentioned in {@link GeoIpTaskState}.

Relates to #68920

Other cherry-picked commits:

* Fix ReloadingDatabasesWhilePerformingGeoLookupsIT (#70163)

Wait for ingest threads to stop using the DatabaseReaderLazyLoader, so the during the next run the db update thread doesn't try to remove the db again (because the file hasn't yet been deleted).

Also delete tmp dirs this test create at the end of the test, so that when repeating this test many times, this test doesn't accumulate many directories with database files.

Closes #69980

* Fix clean up of old entries in DatabaseRegistry.initialize (#70135)

This change switches clean up in DatabaseRegistry.initialize from using Files.walk and stream operations to Files.walkFileTree which can be made more robust in case of errors

* Fix DatabaseRegistryTests (#70180)

This test predefined expected md5 hashes in constants, that were expected with java15.
However java16 creates different md5 hashes and so the expected md5 hashes don't match
with the actual md5 hashes, which caused tests in this test suite to fail (running
with java16 only).

The tests now generates the expected md5 hash during the test instead of using predefined constants.

Closes #69986

* Fix GeoIpDownloaderIT#testUseGeoIpProcessorWithDownloadedDBs(...) test (#70215)

The test failure looks legit, because there is a possibility that the same databases
was downloaded twice. See added comment in DatabaseRegistry class.

Relates to #69972

* Muted GeoIpDownloaderIT#testUseGeoIpProcessorWithDownloadedDBs(...) test,
see #69972

Co-authored-by: Przemko Robakowski <przemko.robakowski@elastic.co>
  • Loading branch information
martijnvg and probakowski authored Mar 10, 2021
1 parent 369ee14 commit 050f00b
Show file tree
Hide file tree
Showing 12 changed files with 1,131 additions and 183 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,18 @@
import com.maxmind.geoip2.DatabaseReader;
import org.apache.lucene.search.TotalHits;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.elasticsearch.action.ingest.SimulateDocumentBaseResult;
import org.elasticsearch.action.ingest.SimulatePipelineRequest;
import org.elasticsearch.action.ingest.SimulatePipelineResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.collect.Set;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.MatchQueryBuilder;
import org.elasticsearch.index.query.RangeQueryBuilder;
Expand All @@ -24,6 +32,7 @@
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.junit.After;

import java.io.BufferedOutputStream;
Expand All @@ -37,11 +46,20 @@
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import java.util.zip.GZIPInputStream;

import static java.nio.file.StandardOpenOption.CREATE;
import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
import static java.nio.file.StandardOpenOption.WRITE;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.endsWith;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;

@ClusterScope(scope = Scope.TEST, maxNumDataNodes = 1)
public class GeoIpDownloaderIT extends AbstractGeoIpIT {
Expand Down Expand Up @@ -121,6 +139,132 @@ public void testGeoIpDatabasesDownload() throws Exception {
}
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/69972")
@TestLogging(value = "org.elasticsearch.ingest.geoip:TRACE", reason = "https://github.com/elastic/elasticsearch/issues/69972")
public void testUseGeoIpProcessorWithDownloadedDBs() throws Exception {
// setup:
BytesReference bytes;
try (XContentBuilder builder = JsonXContent.contentBuilder()) {
builder.startObject();
{
builder.startArray("processors");
{
builder.startObject();
{
builder.startObject("geoip");
{
builder.field("field", "ip");
builder.field("target_field", "ip-city");
builder.field("database_file", "GeoLite2-City.mmdb");
}
builder.endObject();
}
builder.endObject();
builder.startObject();
{
builder.startObject("geoip");
{
builder.field("field", "ip");
builder.field("target_field", "ip-country");
builder.field("database_file", "GeoLite2-Country.mmdb");
}
builder.endObject();
}
builder.endObject();
builder.startObject();
{
builder.startObject("geoip");
{
builder.field("field", "ip");
builder.field("target_field", "ip-asn");
builder.field("database_file", "GeoLite2-ASN.mmdb");
}
builder.endObject();
}
builder.endObject();
}
builder.endArray();
}
builder.endObject();
bytes = BytesReference.bytes(builder);
}
assertAcked(client().admin().cluster().preparePutPipeline("_id", bytes, XContentType.JSON).get());

// verify before updating dbs
try (XContentBuilder builder = JsonXContent.contentBuilder()) {
builder.startObject();
builder.startArray("docs");
{
builder.startObject();
builder.field("_index", "my-index");
{
builder.startObject("_source");
builder.field("ip", "89.160.20.128");
builder.endObject();
}
builder.endObject();
}
builder.endArray();
builder.endObject();
bytes = BytesReference.bytes(builder);
}
SimulatePipelineRequest simulateRequest = new SimulatePipelineRequest(bytes, XContentType.JSON);
simulateRequest.setId("_id");
{
SimulatePipelineResponse simulateResponse = client().admin().cluster().simulatePipeline(simulateRequest).actionGet();
assertThat(simulateResponse.getPipelineId(), equalTo("_id"));
assertThat(simulateResponse.getResults().size(), equalTo(1));
SimulateDocumentBaseResult result = (SimulateDocumentBaseResult) simulateResponse.getResults().get(0);
assertThat(result.getIngestDocument().getFieldValue("ip-city.city_name", String.class), equalTo("Tumba"));
assertThat(result.getIngestDocument().getFieldValue("ip-asn.organization_name", String.class), equalTo("Bredband2 AB"));
assertThat(result.getIngestDocument().getFieldValue("ip-country.country_name", String.class), equalTo("Sweden"));
}

// Enable downloader:
Settings.Builder settings = Settings.builder().put(GeoIpDownloaderTaskExecutor.ENABLED_SETTING.getKey(), true);
assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(settings));

final List<Path> geoipTmpDirs = StreamSupport.stream(internalCluster().getInstances(Environment.class).spliterator(), false)
.map(env -> {
Path geoipTmpDir = env.tmpFile().resolve("geoip-databases");
assertThat(Files.exists(geoipTmpDir), is(true));
return geoipTmpDir;
}).collect(Collectors.toList());
assertBusy(() -> {
for (Path geoipTmpDir : geoipTmpDirs) {
try (Stream<Path> list = Files.list(geoipTmpDir)) {
List<String> files = list.map(Path::toString).collect(Collectors.toList());
assertThat(files, containsInAnyOrder(endsWith("GeoLite2-City.mmdb"), endsWith("GeoLite2-Country.mmdb"),
endsWith("GeoLite2-ASN.mmdb")));
}
}
});

// Verify after updating dbs:
assertBusy(() -> {
SimulatePipelineResponse simulateResponse = client().admin().cluster().simulatePipeline(simulateRequest).actionGet();
assertThat(simulateResponse.getPipelineId(), equalTo("_id"));
assertThat(simulateResponse.getResults().size(), equalTo(1));
SimulateDocumentBaseResult result = (SimulateDocumentBaseResult) simulateResponse.getResults().get(0);
assertThat(result.getIngestDocument().getFieldValue("ip-city.city_name", String.class), equalTo("Linköping"));
assertThat(result.getIngestDocument().getFieldValue("ip-asn.organization_name", String.class), equalTo("Bredband2 AB"));
assertThat(result.getIngestDocument().getFieldValue("ip-country.country_name", String.class), equalTo("Sweden"));
});

// Disable downloader:
settings = Settings.builder().put(GeoIpDownloaderTaskExecutor.ENABLED_SETTING.getKey(), false);
assertAcked(client().admin().cluster().prepareUpdateSettings().setPersistentSettings(settings));

assertBusy(() -> {
for (Path geoipTmpDir : geoipTmpDirs) {
try (Stream<Path> list = Files.list(geoipTmpDir)) {
List<String> files = list.map(Path::toString).collect(Collectors.toList());
assertThat(files, empty());
}
}
});
}

@SuppressForbidden(reason = "Maxmind API requires java.io.File")
private void parseDatabase(Path tempFile) throws IOException {
try (DatabaseReader databaseReader = new DatabaseReader.Builder(tempFile.toFile()).build()) {
Expand Down
Loading

0 comments on commit 050f00b

Please sign in to comment.