Skip to content

Commit

Permalink
GeoIP database downloader (elastic#68424)
Browse files Browse the repository at this point in the history
This change adds component that will download new GeoIP databases from infra service
New databases are downloaded in chunks and stored in .geoip_databases index
Downloads are verified against MD5 checksum provided by the server
Current state of all stored databases is stored in cluster state in persistent task state

Relates to elastic#68920
# Conflicts:
#	modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java
  • Loading branch information
probakowski committed Feb 23, 2021
1 parent 132c4f8 commit cc949b1
Show file tree
Hide file tree
Showing 20 changed files with 1,650 additions and 62 deletions.
58 changes: 43 additions & 15 deletions modules/ingest-geoip/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import org.apache.tools.ant.taskdefs.condition.Os
apply plugin: 'elasticsearch.yaml-rest-test'
apply plugin: 'elasticsearch.internal-cluster-test'

final Project fixture = project(':test:fixtures:geoip-fixture')

esplugin {
description 'Ingest processor that uses looksup geo data based on ip adresses using the Maxmind geo database'
classname 'org.elasticsearch.ingest.geoip.IngestGeoIpPlugin'
Expand All @@ -24,6 +26,7 @@ dependencies {
api('com.maxmind.db:maxmind-db:1.3.1')

testImplementation 'org.elasticsearch:geolite2-databases:20191119'
internalClusterTestImplementation project(path: ":modules:reindex")
}

restResources {
Expand All @@ -32,6 +35,31 @@ restResources {
}
}

def useFixture = System.getenv("geoip_use_service") != "true"

if (useFixture) {
apply plugin: 'elasticsearch.test.fixtures'
testFixtures.useFixture(fixture.path, 'geoip-fixture')

task "beforeInternalClusterTest" {
dependsOn ':test:fixtures:geoip-fixture:postProcessFixture'
doLast {
int ephemeralPort = fixture.postProcessFixture.ext."test.fixtures.geoip-fixture.tcp.80"
assert ephemeralPort > 0
internalClusterTest {
nonInputProperties.systemProperty "geoip_endpoint", "http://127.0.0.1:" + ephemeralPort
}
}
}
}

internalClusterTest {
systemProperty "es.geoip_v2_feature_flag_enabled", "true"
if (useFixture) {
dependsOn "beforeInternalClusterTest"
}
}

tasks.register("copyDefaultGeoIp2DatabaseFiles", Copy) {
from { zipTree(configurations.testCompileClasspath.files.find { it.name.contains('geolite2-databases') }) }
into "${project.buildDir}/ingest-geoip"
Expand All @@ -47,21 +75,21 @@ tasks.named("bundlePlugin").configure {

tasks.named("thirdPartyAudit").configure {
ignoreMissingClasses(
// geoip WebServiceClient needs apache http client, but we're not using WebServiceClient:
'org.apache.http.HttpEntity',
'org.apache.http.HttpHost',
'org.apache.http.HttpResponse',
'org.apache.http.StatusLine',
'org.apache.http.auth.UsernamePasswordCredentials',
'org.apache.http.client.config.RequestConfig$Builder',
'org.apache.http.client.config.RequestConfig',
'org.apache.http.client.methods.CloseableHttpResponse',
'org.apache.http.client.methods.HttpGet',
'org.apache.http.client.utils.URIBuilder',
'org.apache.http.impl.auth.BasicScheme',
'org.apache.http.impl.client.CloseableHttpClient',
'org.apache.http.impl.client.HttpClientBuilder',
'org.apache.http.util.EntityUtils'
// geoip WebServiceClient needs apache http client, but we're not using WebServiceClient:
'org.apache.http.HttpEntity',
'org.apache.http.HttpHost',
'org.apache.http.HttpResponse',
'org.apache.http.StatusLine',
'org.apache.http.auth.UsernamePasswordCredentials',
'org.apache.http.client.config.RequestConfig$Builder',
'org.apache.http.client.config.RequestConfig',
'org.apache.http.client.methods.CloseableHttpResponse',
'org.apache.http.client.methods.HttpGet',
'org.apache.http.client.utils.URIBuilder',
'org.apache.http.impl.auth.BasicScheme',
'org.apache.http.impl.client.CloseableHttpClient',
'org.apache.http.impl.client.HttpClientBuilder',
'org.apache.http.util.EntityUtils'
)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.ingest.geoip;

import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.StreamsUtils;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;

public abstract class AbstractGeoIpIT extends ESIntegTestCase {
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(IngestGeoIpPlugin.class, IngestGeoIpSettingsPlugin.class);
}

@Override
protected Settings nodeSettings(final int nodeOrdinal) {
final Path databasePath = createTempDir();
try {
Files.createDirectories(databasePath);
Files.copy(
new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-City.mmdb")),
databasePath.resolve("GeoLite2-City.mmdb"));
Files.copy(
new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-Country.mmdb")),
databasePath.resolve("GeoLite2-Country.mmdb"));
Files.copy(
new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-ASN.mmdb")),
databasePath.resolve("GeoLite2-ASN.mmdb"));
} catch (final IOException e) {
throw new UncheckedIOException(e);
}
return Settings.builder()
.put("ingest.geoip.database_path", databasePath)
.put(super.nodeSettings(nodeOrdinal))
.build();
}

public static class IngestGeoIpSettingsPlugin extends Plugin {

@Override
public List<Setting<?>> getSettings() {
return Collections.singletonList(Setting.simpleString("ingest.geoip.database_path", Setting.Property.NodeScope));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.ingest.geoip;

import com.maxmind.geoip2.DatabaseReader;
import org.apache.lucene.search.TotalHits;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.MatchQueryBuilder;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.index.reindex.ReindexPlugin;
import org.elasticsearch.persistent.PersistentTaskParams;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope;

import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.zip.GZIPInputStream;

import static java.nio.file.StandardOpenOption.CREATE;
import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
import static java.nio.file.StandardOpenOption.WRITE;

@ClusterScope(scope = Scope.TEST, maxNumDataNodes = 1)
public class GeoIpDownloaderIT extends AbstractGeoIpIT {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(ReindexPlugin.class, IngestGeoIpPlugin.class, GeoIpProcessorNonIngestNodeIT.IngestGeoIpSettingsPlugin.class);
}

@Override
protected Settings nodeSettings(int nodeOrdinal) {
Settings.Builder settings = Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(GeoIpDownloaderTaskExecutor.ENABLED_SETTING.getKey(), false);
String endpoint = System.getProperty("geoip_endpoint");
if (endpoint != null) {
settings.put(GeoIpDownloader.ENDPOINT_SETTING.getKey(), endpoint);
}
return settings.build();
}

public void testGeoIpDatabasesDownload() throws Exception {
ClusterUpdateSettingsResponse settingsResponse = client().admin().cluster()
.prepareUpdateSettings()
.setPersistentSettings(Settings.builder().put(GeoIpDownloaderTaskExecutor.ENABLED_SETTING.getKey(), true))
.get();
assertTrue(settingsResponse.isAcknowledged());
assertBusy(() -> {
PersistentTasksCustomMetadata.PersistentTask<PersistentTaskParams> task = getTask();
assertNotNull(task);
GeoIpTaskState state = (GeoIpTaskState) task.getState();
assertNotNull(state);
assertEquals(Set.of("GeoLite2-ASN.mmdb", "GeoLite2-City.mmdb", "GeoLite2-Country.mmdb"), state.getDatabases().keySet());
}, 2, TimeUnit.MINUTES);

GeoIpTaskState state = (GeoIpTaskState) getTask().getState();
for (String id : List.of("GeoLite2-ASN.mmdb", "GeoLite2-City.mmdb", "GeoLite2-Country.mmdb")) {
assertBusy(() -> {
GeoIpTaskState.Metadata metadata = state.get(id);
BoolQueryBuilder queryBuilder = new BoolQueryBuilder()
.filter(new MatchQueryBuilder("name", id))
.filter(new RangeQueryBuilder("chunk")
.from(metadata.getFirstChunk())
.to(metadata.getLastChunk(), true));
int size = metadata.getLastChunk() - metadata.getFirstChunk() + 1;
SearchResponse res = client().prepareSearch(GeoIpDownloader.DATABASES_INDEX)
.setSize(size)
.setQuery(queryBuilder)
.addSort("chunk", SortOrder.ASC)
.get();
TotalHits totalHits = res.getHits().getTotalHits();
assertEquals(TotalHits.Relation.EQUAL_TO, totalHits.relation);
assertEquals(size, totalHits.value);
assertEquals(size, res.getHits().getHits().length);

List<byte[]> data = new ArrayList<>();

for (SearchHit hit : res.getHits().getHits()) {
data.add((byte[]) hit.getSourceAsMap().get("data"));
}

GZIPInputStream stream = new GZIPInputStream(new MultiByteArrayInputStream(data));
Path tempFile = createTempFile();
try (OutputStream os = new BufferedOutputStream(Files.newOutputStream(tempFile, TRUNCATE_EXISTING, WRITE, CREATE))) {
stream.transferTo(os);
}

parseDatabase(tempFile);
});
}
}

@SuppressForbidden(reason = "Maxmind API requires java.io.File")
private void parseDatabase(Path tempFile) throws IOException {
try (DatabaseReader databaseReader = new DatabaseReader.Builder(tempFile.toFile()).build()) {
assertNotNull(databaseReader.getMetadata());
}
}

private PersistentTasksCustomMetadata.PersistentTask<PersistentTaskParams> getTask() {
return PersistentTasksCustomMetadata.getTaskWithId(clusterService().state(), GeoIpDownloader.GEOIP_DOWNLOADER);
}

private static class MultiByteArrayInputStream extends InputStream {

private final Iterator<byte[]> data;
private ByteArrayInputStream current;

private MultiByteArrayInputStream(List<byte[]> data) {
this.data = data.iterator();
}

@Override
public int read() {
if (current == null) {
if (data.hasNext() == false) {
return -1;
}

current = new ByteArrayInputStream(data.next());
}
int read = current.read();
if (read == -1) {
current = null;
return read();
}
return read;
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
if (current == null) {
if (data.hasNext() == false) {
return -1;
}

current = new ByteArrayInputStream(data.next());
}
int read = current.read(b, off, len);
if (read == -1) {
current = null;
return read(b, off, len);
}
return read;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,69 +13,27 @@
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.NodeRoles;
import org.elasticsearch.test.StreamsUtils;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;

import static org.elasticsearch.test.NodeRoles.nonIngestNode;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;

public class GeoIpProcessorNonIngestNodeIT extends ESIntegTestCase {

public static class IngestGeoIpSettingsPlugin extends Plugin {

@Override
public List<Setting<?>> getSettings() {
return Collections.singletonList(Setting.simpleString("ingest.geoip.database_path", Setting.Property.NodeScope));
}
}
public class GeoIpProcessorNonIngestNodeIT extends AbstractGeoIpIT {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(IngestGeoIpPlugin.class, IngestGeoIpSettingsPlugin.class);
}

@Override
protected Settings nodeSettings(final int nodeOrdinal) {
final Path databasePath = createTempDir();
try {
Files.createDirectories(databasePath);
Files.copy(
new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-City.mmdb")),
databasePath.resolve("GeoLite2-City.mmdb"));
Files.copy(
new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-Country.mmdb")),
databasePath.resolve("GeoLite2-Country.mmdb"));
Files.copy(
new ByteArrayInputStream(StreamsUtils.copyToBytesFromClasspath("/GeoLite2-ASN.mmdb")),
databasePath.resolve("GeoLite2-ASN.mmdb"));
} catch (final IOException e) {
throw new UncheckedIOException(e);
}
return Settings.builder()
.put("ingest.geoip.database_path", databasePath)
.put(nonIngestNode())
.put(super.nodeSettings(nodeOrdinal))
.build();
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(nonIngestNode()).build();
}

/**
Expand Down
Loading

0 comments on commit cc949b1

Please sign in to comment.