Skip to content

Commit 7e0252e

Browse files
committed
Add support for .tgz files in GeoIpDownloader (elastic#70725)
We have to ship COPYRIGHT.txt and LICENSE.txt files alongside .mmdb files for legal compliance. Infra will pack these in single .tgz (gzipped tar) archive provided by GeoIP databases service. This change adds support for that format to GeoIpDownloader and DatabaseRegistry # Conflicts: # modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderIT.java # modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTests.java
1 parent 3c27dc1 commit 7e0252e

File tree

18 files changed

+295
-48
lines changed

18 files changed

+295
-48
lines changed

modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderIT.java

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,12 @@
3434
import org.elasticsearch.test.junit.annotations.TestLogging;
3535
import org.junit.After;
3636

37-
import java.io.BufferedOutputStream;
3837
import java.io.ByteArrayInputStream;
3938
import java.io.IOException;
4039
import java.io.InputStream;
41-
import java.io.OutputStream;
4240
import java.nio.file.Files;
4341
import java.nio.file.Path;
42+
import java.nio.file.StandardCopyOption;
4443
import java.util.ArrayList;
4544
import java.util.Iterator;
4645
import java.util.List;
@@ -50,15 +49,12 @@
5049
import java.util.stream.StreamSupport;
5150
import java.util.zip.GZIPInputStream;
5251

53-
import static java.nio.file.StandardOpenOption.CREATE;
54-
import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
55-
import static java.nio.file.StandardOpenOption.WRITE;
5652
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
5753
import static org.hamcrest.Matchers.containsInAnyOrder;
5854
import static org.hamcrest.Matchers.empty;
59-
import static org.hamcrest.Matchers.endsWith;
6055
import static org.hamcrest.Matchers.equalTo;
6156
import static org.hamcrest.Matchers.is;
57+
import static org.hamcrest.Matchers.nullValue;
6258

6359
public class GeoIpDownloaderIT extends AbstractGeoIpIT {
6460

@@ -124,15 +120,16 @@ public void testGeoIpDatabasesDownload() throws Exception {
124120
data.add((byte[]) hit.getSourceAsMap().get("data"));
125121
}
126122

127-
GZIPInputStream stream = new GZIPInputStream(new MultiByteArrayInputStream(data));
128-
Path tempFile = createTempFile();
129-
try (OutputStream os = new BufferedOutputStream(Files.newOutputStream(tempFile, TRUNCATE_EXISTING, WRITE, CREATE))) {
130-
byte[] bytes = new byte[4096];
131-
int read;
132-
while ((read = stream.read(bytes)) != -1) {
133-
os.write(bytes, 0, read);
134-
}
123+
TarInputStream stream = new TarInputStream(new GZIPInputStream(new MultiByteArrayInputStream(data)));
124+
TarInputStream.TarEntry entry;
125+
while ((entry = stream.getNextEntry()) != null) {
126+
if (entry.getName().endsWith(".mmdb")) {
127+
break;
128+
}
135129
}
130+
131+
Path tempFile = createTempFile();
132+
Files.copy(stream, tempFile, StandardCopyOption.REPLACE_EXISTING);
136133
parseDatabase(tempFile);
137134
} catch (Exception e) {
138135
throw new AssertionError(e);
@@ -143,6 +140,7 @@ public void testGeoIpDatabasesDownload() throws Exception {
143140

144141
@TestLogging(value = "org.elasticsearch.ingest.geoip:TRACE", reason = "https://github.com/elastic/elasticsearch/issues/69972")
145142
public void testUseGeoIpProcessorWithDownloadedDBs() throws Exception {
143+
assumeTrue("only test with fixture to have stable results", ENDPOINT != null);
146144
// setup:
147145
BytesReference bytes;
148146
try (XContentBuilder builder = JsonXContent.contentBuilder()) {
@@ -240,9 +238,11 @@ public void testUseGeoIpProcessorWithDownloadedDBs() throws Exception {
240238
assertBusy(() -> {
241239
for (Path geoipTmpDir : geoipTmpDirs) {
242240
try (Stream<Path> list = Files.list(geoipTmpDir)) {
243-
List<String> files = list.map(Path::toString).collect(Collectors.toList());
244-
assertThat(files, containsInAnyOrder(endsWith("GeoLite2-City.mmdb"), endsWith("GeoLite2-Country.mmdb"),
245-
endsWith("GeoLite2-ASN.mmdb")));
241+
List<String> files = list.map(Path::getFileName).map(Path::toString).collect(Collectors.toList());
242+
assertThat(files, containsInAnyOrder("GeoLite2-City.mmdb", "GeoLite2-Country.mmdb", "GeoLite2-ASN.mmdb",
243+
"GeoLite2-City.mmdb_COPYRIGHT.txt","GeoLite2-Country.mmdb_COPYRIGHT.txt","GeoLite2-ASN.mmdb_COPYRIGHT.txt",
244+
"GeoLite2-City.mmdb_LICENSE.txt","GeoLite2-Country.mmdb_LICENSE.txt","GeoLite2-ASN.mmdb_LICENSE.txt",
245+
"GeoLite2-ASN.mmdb_README.txt"));
246246
}
247247
}
248248
});
@@ -253,6 +253,7 @@ public void testUseGeoIpProcessorWithDownloadedDBs() throws Exception {
253253
assertThat(simulateResponse.getPipelineId(), equalTo("_id"));
254254
assertThat(simulateResponse.getResults().size(), equalTo(1));
255255
SimulateDocumentBaseResult result = (SimulateDocumentBaseResult) simulateResponse.getResults().get(0);
256+
assertThat(result.getFailure(), nullValue());
256257
assertThat(result.getIngestDocument().getFieldValue("ip-city.city_name", String.class), equalTo("Linköping"));
257258
assertThat(result.getIngestDocument().getFieldValue("ip-asn.organization_name", String.class), equalTo("Bredband2 AB"));
258259
assertThat(result.getIngestDocument().getFieldValue("ip-country.country_name", String.class), equalTo("Sweden"));
@@ -265,7 +266,7 @@ public void testUseGeoIpProcessorWithDownloadedDBs() throws Exception {
265266
assertBusy(() -> {
266267
for (Path geoipTmpDir : geoipTmpDirs) {
267268
try (Stream<Path> list = Files.list(geoipTmpDir)) {
268-
List<String> files = list.map(Path::toString).collect(Collectors.toList());
269+
List<String> files = list.map(Path::toString).filter(p -> p.endsWith(".mmdb")).collect(Collectors.toList());
269270
assertThat(files, empty());
270271
}
271272
}

modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseRegistry.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.elasticsearch.search.SearchHit;
3131
import org.elasticsearch.watcher.ResourceWatcherService;
3232

33+
import java.io.BufferedInputStream;
3334
import java.io.Closeable;
3435
import java.io.IOException;
3536
import java.io.UncheckedIOException;
@@ -261,6 +262,25 @@ void retrieveAndUpdateDatabase(String databaseName, GeoIpTaskState.Metadata meta
261262
decompress(databaseTmpGzFile, databaseTmpFile);
262263

263264
Path databaseFile = geoipTmpDirectory.resolve(databaseName);
265+
// tarball contains <database_name>.mmdb, LICENSE.txt, COPYRIGHTS.txt and optional README.txt files.
266+
// we store mmdb file as is and prepend database name to all other entries to avoid conflicts
267+
try (TarInputStream is = new TarInputStream(new BufferedInputStream(Files.newInputStream(databaseTmpFile)))) {
268+
TarInputStream.TarEntry entry;
269+
while ((entry = is.getNextEntry()) != null) {
270+
//there might be ./ entry in tar, we should skip it
271+
if (entry.isNotFile()) {
272+
continue;
273+
}
274+
// flatten structure, remove any directories present from the path (should be ./ only)
275+
String name = entry.getName().substring(entry.getName().lastIndexOf('/') + 1);
276+
if (name.startsWith(databaseName)) {
277+
Files.copy(is, databaseTmpFile, StandardCopyOption.REPLACE_EXISTING);
278+
} else {
279+
Files.copy(is, geoipTmpDirectory.resolve(databaseName + "_" + name), StandardCopyOption.REPLACE_EXISTING);
280+
}
281+
}
282+
}
283+
264284
LOGGER.debug("moving database from [{}] to [{}]", databaseTmpFile, databaseFile);
265285
Files.move(databaseTmpFile, databaseFile, StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING);
266286
updateDatabase(databaseName, recordedMd5, databaseFile);

modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloader.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,9 @@ void updateDatabases() throws IOException {
104104
logger.info("updating geoip databases");
105105
List<Map<String, Object>> response = fetchDatabasesOverview();
106106
for (Map<String, Object> res : response) {
107-
processDatabase(res);
107+
if (res.get("name").toString().endsWith(".tgz")) {
108+
processDatabase(res);
109+
}
108110
}
109111
}
110112

@@ -121,7 +123,7 @@ private <T> List<T> fetchDatabasesOverview() throws IOException {
121123

122124
//visible for testing
123125
void processDatabase(Map<String, Object> databaseInfo) {
124-
String name = databaseInfo.get("name").toString().replace(".gz", "");
126+
String name = databaseInfo.get("name").toString().replace(".tgz", "") + ".mmdb";
125127
String md5 = (String) databaseInfo.get("md5_hash");
126128
if (state.contains(name) && Objects.equals(md5, state.get(name).getMd5())) {
127129
updateTimestamp(name, state.get(name));
@@ -234,7 +236,7 @@ protected void onCancelled() {
234236

235237
@Override
236238
public GeoIpDownloaderStats getStatus() {
237-
return isCancelled() || isCompleted() ? null: stats;
239+
return isCancelled() || isCompleted() ? null : stats;
238240
}
239241

240242
private void scheduleNextRun(TimeValue time) {
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.ingest.geoip;
10+
11+
import java.io.EOFException;
12+
import java.io.FilterInputStream;
13+
import java.io.IOException;
14+
import java.io.InputStream;
15+
import java.nio.charset.StandardCharsets;
16+
import java.util.Arrays;
17+
18+
/**
19+
* {@link InputStream} with very basic support for tar format, just enough to parse archives provided by GeoIP database service from Infra.
20+
* This class is not suitable for general purpose tar processing!
21+
*/
22+
class TarInputStream extends FilterInputStream {
23+
24+
private TarEntry currentEntry;
25+
private long remaining;
26+
private long reminder;
27+
private final byte[] buf = new byte[512];
28+
29+
TarInputStream(InputStream in) {
30+
super(in);
31+
}
32+
33+
public TarEntry getNextEntry() throws IOException {
34+
if (currentEntry != null) {
35+
//go to the end of the current entry
36+
skipN(remaining);
37+
if (reminder != 0) {
38+
skipN(512 - reminder);
39+
}
40+
}
41+
int read = in.readNBytes(buf, 0, 512);
42+
if (read == 0) {
43+
return null;
44+
}
45+
if (read != 512) {
46+
throw new EOFException();
47+
}
48+
if (Arrays.compare(buf, new byte[512]) == 0) {
49+
return null;
50+
}
51+
52+
String name = getString(0, 100);
53+
54+
boolean notFile = (buf[156] != 0 && buf[156] != '0') || name.endsWith("/");
55+
56+
if(notFile){
57+
remaining = 0;
58+
reminder = 0;
59+
} else {
60+
String sizeString = getString(124, 12);
61+
remaining = sizeString.isEmpty() ? 0 : Long.parseLong(sizeString, 8);
62+
reminder = remaining % 512;
63+
}
64+
65+
currentEntry = new TarEntry(name, notFile);
66+
return currentEntry;
67+
}
68+
69+
@Override
70+
public int read() throws IOException {
71+
if (remaining == 0) {
72+
return -1;
73+
}
74+
remaining--;
75+
return in.read();
76+
}
77+
78+
@Override
79+
public int read(byte[] b, int off, int len) throws IOException {
80+
if (remaining <= 0) {
81+
return -1;
82+
}
83+
int read = in.read(b, off, remaining > Integer.MAX_VALUE ? len : (int) Math.min(len, remaining));
84+
remaining -= read;
85+
return read;
86+
}
87+
88+
private String getString(int offset, int maxLen) {
89+
return new String(buf, offset, maxLen, StandardCharsets.UTF_8).trim();
90+
}
91+
92+
private void skipN(long n) throws IOException {
93+
while (n > 0) {
94+
long skip = in.skip(n);
95+
if (skip < n) {
96+
int read = in.read();
97+
if (read == -1) {
98+
throw new EOFException();
99+
}
100+
n--;
101+
}
102+
n -= skip;
103+
}
104+
}
105+
106+
static class TarEntry {
107+
private final String name;
108+
private final boolean notFile;
109+
110+
TarEntry(String name, boolean notFile) {
111+
this.name = name;
112+
this.notFile = notFile;
113+
}
114+
115+
public String getName() {
116+
return name;
117+
}
118+
119+
public boolean isNotFile() {
120+
return notFile;
121+
}
122+
}
123+
}
124+

modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/DatabaseRegistryTests.java

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,7 @@ public void testRetrieveDatabaseCorruption() throws Exception {
255255

256256
private String mockSearches(String databaseName, int firstChunk, int lastChunk) throws IOException {
257257
String dummyContent = "test: " + databaseName;
258-
List<byte[]> data = gzip(dummyContent, lastChunk - firstChunk + 1);
258+
List<byte[]> data = gzip(databaseName, dummyContent, lastChunk - firstChunk + 1);
259259
assertThat(gunzip(data), equalTo(dummyContent));
260260

261261
for (int i = firstChunk; i <= lastChunk; i++) {
@@ -302,10 +302,20 @@ private static RoutingTable createIndexRoutingTable() {
302302
return RoutingTable.builder().add(IndexRoutingTable.builder(index).addIndexShard(table).build()).build();
303303
}
304304

305-
private static List<byte[]> gzip(String content, int chunks) throws IOException {
305+
private static List<byte[]> gzip(String name, String content, int chunks) throws IOException {
306306
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
307307
GZIPOutputStream gzipOutputStream = new GZIPOutputStream(bytes);
308-
gzipOutputStream.write(content.getBytes(StandardCharsets.UTF_8));
308+
byte[] header = new byte[512];
309+
byte[] nameBytes = name.getBytes(StandardCharsets.UTF_8);
310+
byte[] contentBytes = content.getBytes(StandardCharsets.UTF_8);
311+
byte[] sizeBytes = String.format(Locale.ROOT, "%1$012o", contentBytes.length).getBytes(StandardCharsets.UTF_8);
312+
System.arraycopy(nameBytes, 0, header, 0, nameBytes.length);
313+
System.arraycopy(sizeBytes, 0, header, 124, 12);
314+
gzipOutputStream.write(header);
315+
gzipOutputStream.write(contentBytes);
316+
gzipOutputStream.write(512 - contentBytes.length);
317+
gzipOutputStream.write(new byte[512]);
318+
gzipOutputStream.write(new byte[512]);
309319
gzipOutputStream.close();
310320

311321
byte[] all = bytes.toByteArray();
@@ -321,9 +331,9 @@ private static List<byte[]> gzip(String content, int chunks) throws IOException
321331
from = to;
322332
}
323333

324-
if (data.size() > chunks) {
334+
while (data.size() > chunks) {
325335
byte[] last = data.remove(data.size() - 1);
326-
byte[] secondLast = data.remove(data.size() -1);
336+
byte[] secondLast = data.remove(data.size() - 1);
327337
byte[] merged = new byte[secondLast.length + last.length];
328338
System.arraycopy(secondLast, 0, merged, 0, secondLast.length);
329339
System.arraycopy(last, 0, merged, secondLast.length, last.length);
@@ -341,7 +351,8 @@ private static String gunzip(List<byte[]> chunks) throws IOException {
341351
System.arraycopy(chunk, 0, gzippedContent, written, chunk.length);
342352
written += chunk.length;
343353
}
344-
GZIPInputStream gzipInputStream = new GZIPInputStream(new ByteArrayInputStream(gzippedContent));
354+
TarInputStream gzipInputStream = new TarInputStream(new GZIPInputStream(new ByteArrayInputStream(gzippedContent)));
355+
gzipInputStream.getNextEntry();
345356
return Streams.readFully(gzipInputStream).utf8ToString();
346357
}
347358

0 commit comments

Comments
 (0)