Skip to content

Commit b025f51

Browse files
authored
Add support for .tgz files in GeoIpDownloader (#70725)
We have to ship COPYRIGHT.txt and LICENSE.txt files alongside .mmdb files for legal compliance. Infra will pack these in single .tgz (gzipped tar) archive provided by GeoIP databases service. This change adds support for that format to GeoIpDownloader and DatabaseRegistry
1 parent 2bef4eb commit b025f51

File tree

18 files changed

+294
-44
lines changed

18 files changed

+294
-44
lines changed

modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderIT.java

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,12 @@
3434
import org.elasticsearch.test.junit.annotations.TestLogging;
3535
import org.junit.After;
3636

37-
import java.io.BufferedOutputStream;
3837
import java.io.ByteArrayInputStream;
3938
import java.io.IOException;
4039
import java.io.InputStream;
41-
import java.io.OutputStream;
4240
import java.nio.file.Files;
4341
import java.nio.file.Path;
42+
import java.nio.file.StandardCopyOption;
4443
import java.util.ArrayList;
4544
import java.util.Arrays;
4645
import java.util.Collection;
@@ -53,15 +52,12 @@
5352
import java.util.stream.StreamSupport;
5453
import java.util.zip.GZIPInputStream;
5554

56-
import static java.nio.file.StandardOpenOption.CREATE;
57-
import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
58-
import static java.nio.file.StandardOpenOption.WRITE;
5955
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
6056
import static org.hamcrest.Matchers.containsInAnyOrder;
6157
import static org.hamcrest.Matchers.empty;
62-
import static org.hamcrest.Matchers.endsWith;
6358
import static org.hamcrest.Matchers.equalTo;
6459
import static org.hamcrest.Matchers.is;
60+
import static org.hamcrest.Matchers.nullValue;
6561

6662
public class GeoIpDownloaderIT extends AbstractGeoIpIT {
6763

@@ -132,11 +128,16 @@ public void testGeoIpDatabasesDownload() throws Exception {
132128
data.add((byte[]) hit.getSourceAsMap().get("data"));
133129
}
134130

135-
GZIPInputStream stream = new GZIPInputStream(new MultiByteArrayInputStream(data));
136-
Path tempFile = createTempFile();
137-
try (OutputStream os = new BufferedOutputStream(Files.newOutputStream(tempFile, TRUNCATE_EXISTING, WRITE, CREATE))) {
138-
stream.transferTo(os);
131+
TarInputStream stream = new TarInputStream(new GZIPInputStream(new MultiByteArrayInputStream(data)));
132+
TarInputStream.TarEntry entry;
133+
while ((entry = stream.getNextEntry()) != null) {
134+
if (entry.getName().endsWith(".mmdb")) {
135+
break;
136+
}
139137
}
138+
139+
Path tempFile = createTempFile();
140+
Files.copy(stream, tempFile, StandardCopyOption.REPLACE_EXISTING);
140141
parseDatabase(tempFile);
141142
} catch (Exception e) {
142143
throw new AssertionError(e);
@@ -147,6 +148,7 @@ public void testGeoIpDatabasesDownload() throws Exception {
147148

148149
@TestLogging(value = "org.elasticsearch.ingest.geoip:TRACE", reason = "https://github.com/elastic/elasticsearch/issues/69972")
149150
public void testUseGeoIpProcessorWithDownloadedDBs() throws Exception {
151+
assumeTrue("only test with fixture to have stable results", ENDPOINT != null);
150152
// setup:
151153
BytesReference bytes;
152154
try (XContentBuilder builder = JsonXContent.contentBuilder()) {
@@ -243,9 +245,11 @@ public void testUseGeoIpProcessorWithDownloadedDBs() throws Exception {
243245
assertBusy(() -> {
244246
for (Path geoipTmpDir : geoipTmpDirs) {
245247
try (Stream<Path> list = Files.list(geoipTmpDir)) {
246-
List<String> files = list.map(Path::toString).collect(Collectors.toList());
247-
assertThat(files, containsInAnyOrder(endsWith("GeoLite2-City.mmdb"), endsWith("GeoLite2-Country.mmdb"),
248-
endsWith("GeoLite2-ASN.mmdb")));
248+
List<String> files = list.map(Path::getFileName).map(Path::toString).collect(Collectors.toList());
249+
assertThat(files, containsInAnyOrder("GeoLite2-City.mmdb", "GeoLite2-Country.mmdb", "GeoLite2-ASN.mmdb",
250+
"GeoLite2-City.mmdb_COPYRIGHT.txt","GeoLite2-Country.mmdb_COPYRIGHT.txt","GeoLite2-ASN.mmdb_COPYRIGHT.txt",
251+
"GeoLite2-City.mmdb_LICENSE.txt","GeoLite2-Country.mmdb_LICENSE.txt","GeoLite2-ASN.mmdb_LICENSE.txt",
252+
"GeoLite2-ASN.mmdb_README.txt"));
249253
}
250254
}
251255
});
@@ -256,6 +260,7 @@ public void testUseGeoIpProcessorWithDownloadedDBs() throws Exception {
256260
assertThat(simulateResponse.getPipelineId(), equalTo("_id"));
257261
assertThat(simulateResponse.getResults().size(), equalTo(1));
258262
SimulateDocumentBaseResult result = (SimulateDocumentBaseResult) simulateResponse.getResults().get(0);
263+
assertThat(result.getFailure(), nullValue());
259264
assertThat(result.getIngestDocument().getFieldValue("ip-city.city_name", String.class), equalTo("Linköping"));
260265
assertThat(result.getIngestDocument().getFieldValue("ip-asn.organization_name", String.class), equalTo("Bredband2 AB"));
261266
assertThat(result.getIngestDocument().getFieldValue("ip-country.country_name", String.class), equalTo("Sweden"));
@@ -268,7 +273,7 @@ public void testUseGeoIpProcessorWithDownloadedDBs() throws Exception {
268273
assertBusy(() -> {
269274
for (Path geoipTmpDir : geoipTmpDirs) {
270275
try (Stream<Path> list = Files.list(geoipTmpDir)) {
271-
List<String> files = list.map(Path::toString).collect(Collectors.toList());
276+
List<String> files = list.map(Path::toString).filter(p -> p.endsWith(".mmdb")).collect(Collectors.toList());
272277
assertThat(files, empty());
273278
}
274279
}

modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseRegistry.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.elasticsearch.search.SearchHit;
3131
import org.elasticsearch.watcher.ResourceWatcherService;
3232

33+
import java.io.BufferedInputStream;
3334
import java.io.Closeable;
3435
import java.io.IOException;
3536
import java.io.UncheckedIOException;
@@ -261,6 +262,25 @@ void retrieveAndUpdateDatabase(String databaseName, GeoIpTaskState.Metadata meta
261262
decompress(databaseTmpGzFile, databaseTmpFile);
262263

263264
Path databaseFile = geoipTmpDirectory.resolve(databaseName);
265+
// tarball contains <database_name>.mmdb, LICENSE.txt, COPYRIGHTS.txt and optional README.txt files.
266+
// we store mmdb file as is and prepend database name to all other entries to avoid conflicts
267+
try (TarInputStream is = new TarInputStream(new BufferedInputStream(Files.newInputStream(databaseTmpFile)))) {
268+
TarInputStream.TarEntry entry;
269+
while ((entry = is.getNextEntry()) != null) {
270+
//there might be ./ entry in tar, we should skip it
271+
if (entry.isNotFile()) {
272+
continue;
273+
}
274+
// flatten structure, remove any directories present from the path (should be ./ only)
275+
String name = entry.getName().substring(entry.getName().lastIndexOf('/') + 1);
276+
if (name.startsWith(databaseName)) {
277+
Files.copy(is, databaseTmpFile, StandardCopyOption.REPLACE_EXISTING);
278+
} else {
279+
Files.copy(is, geoipTmpDirectory.resolve(databaseName + "_" + name), StandardCopyOption.REPLACE_EXISTING);
280+
}
281+
}
282+
}
283+
264284
LOGGER.debug("moving database from [{}] to [{}]", databaseTmpFile, databaseFile);
265285
Files.move(databaseTmpFile, databaseFile, StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING);
266286
updateDatabase(databaseName, recordedMd5, databaseFile);

modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloader.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,9 @@ void updateDatabases() throws IOException {
104104
logger.info("updating geoip databases");
105105
List<Map<String, Object>> response = fetchDatabasesOverview();
106106
for (Map<String, Object> res : response) {
107-
processDatabase(res);
107+
if (res.get("name").toString().endsWith(".tgz")) {
108+
processDatabase(res);
109+
}
108110
}
109111
}
110112

@@ -121,7 +123,7 @@ private <T> List<T> fetchDatabasesOverview() throws IOException {
121123

122124
//visible for testing
123125
void processDatabase(Map<String, Object> databaseInfo) {
124-
String name = databaseInfo.get("name").toString().replace(".gz", "");
126+
String name = databaseInfo.get("name").toString().replace(".tgz", "") + ".mmdb";
125127
String md5 = (String) databaseInfo.get("md5_hash");
126128
if (state.contains(name) && Objects.equals(md5, state.get(name).getMd5())) {
127129
updateTimestamp(name, state.get(name));
@@ -234,7 +236,7 @@ protected void onCancelled() {
234236

235237
@Override
236238
public GeoIpDownloaderStats getStatus() {
237-
return isCancelled() || isCompleted() ? null: stats;
239+
return isCancelled() || isCompleted() ? null : stats;
238240
}
239241

240242
private void scheduleNextRun(TimeValue time) {
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.ingest.geoip;
10+
11+
import java.io.EOFException;
12+
import java.io.FilterInputStream;
13+
import java.io.IOException;
14+
import java.io.InputStream;
15+
import java.nio.charset.StandardCharsets;
16+
import java.util.Arrays;
17+
18+
/**
19+
* {@link InputStream} with very basic support for tar format, just enough to parse archives provided by GeoIP database service from Infra.
20+
* This class is not suitable for general purpose tar processing!
21+
*/
22+
class TarInputStream extends FilterInputStream {
23+
24+
private TarEntry currentEntry;
25+
private long remaining;
26+
private long reminder;
27+
private final byte[] buf = new byte[512];
28+
29+
TarInputStream(InputStream in) {
30+
super(in);
31+
}
32+
33+
public TarEntry getNextEntry() throws IOException {
34+
if (currentEntry != null) {
35+
//go to the end of the current entry
36+
skipN(remaining);
37+
if (reminder != 0) {
38+
skipN(512 - reminder);
39+
}
40+
}
41+
int read = in.readNBytes(buf, 0, 512);
42+
if (read == 0) {
43+
return null;
44+
}
45+
if (read != 512) {
46+
throw new EOFException();
47+
}
48+
if (Arrays.compare(buf, new byte[512]) == 0) {
49+
return null;
50+
}
51+
52+
String name = getString(0, 100);
53+
54+
boolean notFile = (buf[156] != 0 && buf[156] != '0') || name.endsWith("/");
55+
56+
if(notFile){
57+
remaining = 0;
58+
reminder = 0;
59+
} else {
60+
String sizeString = getString(124, 12);
61+
remaining = sizeString.isEmpty() ? 0 : Long.parseLong(sizeString, 8);
62+
reminder = remaining % 512;
63+
}
64+
65+
currentEntry = new TarEntry(name, notFile);
66+
return currentEntry;
67+
}
68+
69+
@Override
70+
public int read() throws IOException {
71+
if (remaining == 0) {
72+
return -1;
73+
}
74+
remaining--;
75+
return in.read();
76+
}
77+
78+
@Override
79+
public int read(byte[] b, int off, int len) throws IOException {
80+
if (remaining <= 0) {
81+
return -1;
82+
}
83+
int read = in.read(b, off, remaining > Integer.MAX_VALUE ? len : (int) Math.min(len, remaining));
84+
remaining -= read;
85+
return read;
86+
}
87+
88+
private String getString(int offset, int maxLen) {
89+
return new String(buf, offset, maxLen, StandardCharsets.UTF_8).trim();
90+
}
91+
92+
private void skipN(long n) throws IOException {
93+
while (n > 0) {
94+
long skip = in.skip(n);
95+
if (skip < n) {
96+
int read = in.read();
97+
if (read == -1) {
98+
throw new EOFException();
99+
}
100+
n--;
101+
}
102+
n -= skip;
103+
}
104+
}
105+
106+
static class TarEntry {
107+
private final String name;
108+
private final boolean notFile;
109+
110+
TarEntry(String name, boolean notFile) {
111+
this.name = name;
112+
this.notFile = notFile;
113+
}
114+
115+
public String getName() {
116+
return name;
117+
}
118+
119+
public boolean isNotFile() {
120+
return notFile;
121+
}
122+
}
123+
}
124+

modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/DatabaseRegistryTests.java

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,7 @@ public void testRetrieveDatabaseCorruption() throws Exception {
255255

256256
private String mockSearches(String databaseName, int firstChunk, int lastChunk) throws IOException {
257257
String dummyContent = "test: " + databaseName;
258-
List<byte[]> data = gzip(dummyContent, lastChunk - firstChunk + 1);
258+
List<byte[]> data = gzip(databaseName, dummyContent, lastChunk - firstChunk + 1);
259259
assertThat(gunzip(data), equalTo(dummyContent));
260260

261261
for (int i = firstChunk; i <= lastChunk; i++) {
@@ -302,10 +302,20 @@ private static RoutingTable createIndexRoutingTable() {
302302
return RoutingTable.builder().add(IndexRoutingTable.builder(index).addIndexShard(table).build()).build();
303303
}
304304

305-
private static List<byte[]> gzip(String content, int chunks) throws IOException {
305+
private static List<byte[]> gzip(String name, String content, int chunks) throws IOException {
306306
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
307307
GZIPOutputStream gzipOutputStream = new GZIPOutputStream(bytes);
308-
gzipOutputStream.write(content.getBytes(StandardCharsets.UTF_8));
308+
byte[] header = new byte[512];
309+
byte[] nameBytes = name.getBytes(StandardCharsets.UTF_8);
310+
byte[] contentBytes = content.getBytes(StandardCharsets.UTF_8);
311+
byte[] sizeBytes = String.format(Locale.ROOT, "%1$012o", contentBytes.length).getBytes(StandardCharsets.UTF_8);
312+
System.arraycopy(nameBytes, 0, header, 0, nameBytes.length);
313+
System.arraycopy(sizeBytes, 0, header, 124, 12);
314+
gzipOutputStream.write(header);
315+
gzipOutputStream.write(contentBytes);
316+
gzipOutputStream.write(512 - contentBytes.length);
317+
gzipOutputStream.write(new byte[512]);
318+
gzipOutputStream.write(new byte[512]);
309319
gzipOutputStream.close();
310320

311321
byte[] all = bytes.toByteArray();
@@ -321,9 +331,9 @@ private static List<byte[]> gzip(String content, int chunks) throws IOException
321331
from = to;
322332
}
323333

324-
if (data.size() > chunks) {
334+
while (data.size() > chunks) {
325335
byte[] last = data.remove(data.size() - 1);
326-
byte[] secondLast = data.remove(data.size() -1);
336+
byte[] secondLast = data.remove(data.size() - 1);
327337
byte[] merged = new byte[secondLast.length + last.length];
328338
System.arraycopy(secondLast, 0, merged, 0, secondLast.length);
329339
System.arraycopy(last, 0, merged, secondLast.length, last.length);
@@ -341,7 +351,8 @@ private static String gunzip(List<byte[]> chunks) throws IOException {
341351
System.arraycopy(chunk, 0, gzippedContent, written, chunk.length);
342352
written += chunk.length;
343353
}
344-
GZIPInputStream gzipInputStream = new GZIPInputStream(new ByteArrayInputStream(gzippedContent));
354+
TarInputStream gzipInputStream = new TarInputStream(new GZIPInputStream(new ByteArrayInputStream(gzippedContent)));
355+
gzipInputStream.getNextEntry();
345356
return Streams.readFully(gzipInputStream).utf8ToString();
346357
}
347358

0 commit comments

Comments
 (0)