Skip to content

Commit 3d5281c

Browse files
authored
Changed how geoip cache is integrated with geoip processor. (#68890)
Backport #68581 of to 7.x branch. This change helps facilitate allowing maxmind databases to be updated at runtime. This will make is easier to purge the cache if a database changes. Made the following changes: * Changed how geoip processor integrates with the cache. The cache is moved from the geoip processor to DatabaseReaderLazyLoader class. * Changed the cache key from ip + response class to ip + database_path. * Moved GeoIpCache from IngestGeoIpPlugin class to be a top level class.
1 parent 79d4cb3 commit 3d5281c

File tree

7 files changed

+216
-218
lines changed

7 files changed

+216
-218
lines changed

modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,18 +9,28 @@
99
package org.elasticsearch.ingest.geoip;
1010

1111
import com.maxmind.geoip2.DatabaseReader;
12+
import com.maxmind.geoip2.exception.AddressNotFoundException;
13+
import com.maxmind.geoip2.model.AbstractResponse;
14+
import com.maxmind.geoip2.model.AsnResponse;
15+
import com.maxmind.geoip2.model.CityResponse;
16+
import com.maxmind.geoip2.model.CountryResponse;
1217
import org.apache.logging.log4j.LogManager;
1318
import org.apache.logging.log4j.Logger;
1419
import org.apache.lucene.util.SetOnce;
20+
import org.elasticsearch.SpecialPermission;
21+
import org.elasticsearch.common.CheckedBiFunction;
1522
import org.elasticsearch.common.CheckedSupplier;
1623
import org.elasticsearch.core.internal.io.IOUtils;
1724

1825
import java.io.Closeable;
1926
import java.io.IOException;
2027
import java.io.InputStream;
28+
import java.net.InetAddress;
2129
import java.nio.charset.StandardCharsets;
2230
import java.nio.file.Files;
2331
import java.nio.file.Path;
32+
import java.security.AccessController;
33+
import java.security.PrivilegedAction;
2434
import java.util.Objects;
2535

2636
/**
@@ -31,14 +41,16 @@ class DatabaseReaderLazyLoader implements Closeable {
3141

3242
private static final Logger LOGGER = LogManager.getLogger(DatabaseReaderLazyLoader.class);
3343

44+
private final GeoIpCache cache;
3445
private final Path databasePath;
3546
private final CheckedSupplier<DatabaseReader, IOException> loader;
3647
final SetOnce<DatabaseReader> databaseReader;
3748

3849
// cache the database type so that we do not re-read it on every pipeline execution
3950
final SetOnce<String> databaseType;
4051

41-
DatabaseReaderLazyLoader(final Path databasePath, final CheckedSupplier<DatabaseReader, IOException> loader) {
52+
DatabaseReaderLazyLoader(final GeoIpCache cache, final Path databasePath, final CheckedSupplier<DatabaseReader, IOException> loader) {
53+
this.cache = cache;
4254
this.databasePath = Objects.requireNonNull(databasePath);
4355
this.loader = Objects.requireNonNull(loader);
4456
this.databaseReader = new SetOnce<>();
@@ -123,7 +135,34 @@ InputStream databaseInputStream() throws IOException {
123135
return Files.newInputStream(databasePath);
124136
}
125137

126-
DatabaseReader get() throws IOException {
138+
CityResponse getCity(InetAddress ipAddress) {
139+
return getResponse(ipAddress, DatabaseReader::city);
140+
}
141+
142+
CountryResponse getCountry(InetAddress ipAddress) {
143+
return getResponse(ipAddress, DatabaseReader::country);
144+
}
145+
146+
AsnResponse getAsn(InetAddress ipAddress) {
147+
return getResponse(ipAddress, DatabaseReader::asn);
148+
}
149+
150+
private <T extends AbstractResponse> T getResponse(InetAddress ipAddress,
151+
CheckedBiFunction<DatabaseReader, InetAddress, T, Exception> responseProvider) {
152+
SpecialPermission.check();
153+
return AccessController.doPrivileged((PrivilegedAction<T>) () ->
154+
cache.putIfAbsent(ipAddress, databasePath.toString(), ip -> {
155+
try {
156+
return responseProvider.apply(get(), ipAddress);
157+
} catch (AddressNotFoundException e) {
158+
throw new GeoIpProcessor.AddressNotFoundRuntimeException(e);
159+
} catch (Exception e) {
160+
throw new RuntimeException(e);
161+
}
162+
}));
163+
}
164+
165+
private DatabaseReader get() throws IOException {
127166
if (databaseReader.get() == null) {
128167
synchronized (databaseReader) {
129168
if (databaseReader.get() == null) {
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
package org.elasticsearch.ingest.geoip;
9+
10+
import com.maxmind.db.NodeCache;
11+
import com.maxmind.geoip2.model.AbstractResponse;
12+
import org.elasticsearch.common.cache.Cache;
13+
import org.elasticsearch.common.cache.CacheBuilder;
14+
15+
import java.net.InetAddress;
16+
import java.util.Objects;
17+
import java.util.function.Function;
18+
19+
/**
20+
* The in-memory cache for the geoip data. There should only be 1 instance of this class..
21+
* This cache differs from the maxmind's {@link NodeCache} such that this cache stores the deserialized Json objects to avoid the
22+
* cost of deserialization for each lookup (cached or not). This comes at slight expense of higher memory usage, but significant
23+
* reduction of CPU usage.
24+
*/
25+
final class GeoIpCache {
26+
private final Cache<CacheKey, AbstractResponse> cache;
27+
28+
//package private for testing
29+
GeoIpCache(long maxSize) {
30+
if (maxSize < 0) {
31+
throw new IllegalArgumentException("geoip max cache size must be 0 or greater");
32+
}
33+
this.cache = CacheBuilder.<CacheKey, AbstractResponse>builder().setMaximumWeight(maxSize).build();
34+
}
35+
36+
@SuppressWarnings("unchecked")
37+
<T extends AbstractResponse> T putIfAbsent(InetAddress ip,
38+
String databasePath,
39+
Function<InetAddress, AbstractResponse> retrieveFunction) {
40+
41+
//can't use cache.computeIfAbsent due to the elevated permissions for the jackson (run via the cache loader)
42+
CacheKey cacheKey = new CacheKey(ip, databasePath);
43+
//intentionally non-locking for simplicity...it's OK if we re-put the same key/value in the cache during a race condition.
44+
AbstractResponse response = cache.get(cacheKey);
45+
if (response == null) {
46+
response = retrieveFunction.apply(ip);
47+
cache.put(cacheKey, response);
48+
}
49+
return (T) response;
50+
}
51+
52+
//only useful for testing
53+
AbstractResponse get(InetAddress ip, String databasePath) {
54+
CacheKey cacheKey = new CacheKey(ip, databasePath);
55+
return cache.get(cacheKey);
56+
}
57+
58+
/**
59+
* The key to use for the cache. Since this cache can span multiple geoip processors that all use different databases, the database
60+
* path is needed to be included in the cache key. For example, if we only used the IP address as the key the City and ASN the same
61+
* IP may be in both with different values and we need to cache both.
62+
*/
63+
private static class CacheKey {
64+
65+
private final InetAddress ip;
66+
private final String databasePath;
67+
68+
private CacheKey(InetAddress ip, String databasePath) {
69+
this.ip = ip;
70+
this.databasePath = databasePath;
71+
}
72+
73+
//generated
74+
@Override
75+
public boolean equals(Object o) {
76+
if (this == o) return true;
77+
if (o == null || getClass() != o.getClass()) return false;
78+
CacheKey cacheKey = (CacheKey) o;
79+
return Objects.equals(ip, cacheKey.ip) &&
80+
Objects.equals(databasePath, cacheKey.databasePath);
81+
}
82+
83+
//generated
84+
@Override
85+
public int hashCode() {
86+
return Objects.hash(ip, databasePath);
87+
}
88+
}
89+
}

modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java

Lines changed: 5 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
package org.elasticsearch.ingest.geoip;
1010

1111
import com.maxmind.db.Network;
12-
import com.maxmind.geoip2.exception.AddressNotFoundException;
1312
import com.maxmind.geoip2.model.AsnResponse;
1413
import com.maxmind.geoip2.model.CityResponse;
1514
import com.maxmind.geoip2.model.CountryResponse;
@@ -19,18 +18,14 @@
1918
import com.maxmind.geoip2.record.Location;
2019
import com.maxmind.geoip2.record.Subdivision;
2120
import org.elasticsearch.ElasticsearchParseException;
22-
import org.elasticsearch.SpecialPermission;
2321
import org.elasticsearch.common.network.InetAddresses;
2422
import org.elasticsearch.common.network.NetworkAddress;
2523
import org.elasticsearch.ingest.AbstractProcessor;
2624
import org.elasticsearch.ingest.IngestDocument;
2725
import org.elasticsearch.ingest.Processor;
28-
import org.elasticsearch.ingest.geoip.IngestGeoIpPlugin.GeoIpCache;
2926

3027
import java.io.IOException;
3128
import java.net.InetAddress;
32-
import java.security.AccessController;
33-
import java.security.PrivilegedAction;
3429
import java.util.ArrayList;
3530
import java.util.Arrays;
3631
import java.util.Collections;
@@ -58,7 +53,6 @@ public final class GeoIpProcessor extends AbstractProcessor {
5853
private final DatabaseReaderLazyLoader lazyLoader;
5954
private final Set<Property> properties;
6055
private final boolean ignoreMissing;
61-
private final GeoIpCache cache;
6256
private final boolean firstOnly;
6357

6458
/**
@@ -70,7 +64,6 @@ public final class GeoIpProcessor extends AbstractProcessor {
7064
* @param targetField the target field
7165
* @param properties the properties; ideally this is lazily-loaded once on first use
7266
* @param ignoreMissing true if documents with a missing value for the field should be ignored
73-
* @param cache a geo-IP cache
7467
* @param firstOnly true if only first result should be returned in case of array
7568
*/
7669
GeoIpProcessor(
@@ -80,15 +73,13 @@ public final class GeoIpProcessor extends AbstractProcessor {
8073
final String targetField,
8174
final Set<Property> properties,
8275
final boolean ignoreMissing,
83-
final GeoIpCache cache,
8476
boolean firstOnly) {
8577
super(tag, description);
8678
this.field = field;
8779
this.targetField = targetField;
8880
this.lazyLoader = lazyLoader;
8981
this.properties = properties;
9082
this.ignoreMissing = ignoreMissing;
91-
this.cache = cache;
9283
this.firstOnly = firstOnly;
9384
}
9485

@@ -190,18 +181,7 @@ Set<Property> getProperties() {
190181
}
191182

192183
private Map<String, Object> retrieveCityGeoData(InetAddress ipAddress) {
193-
SpecialPermission.check();
194-
CityResponse response = AccessController.doPrivileged((PrivilegedAction<CityResponse>) () ->
195-
cache.putIfAbsent(ipAddress, CityResponse.class, ip -> {
196-
try {
197-
return lazyLoader.get().city(ip);
198-
} catch (AddressNotFoundException e) {
199-
throw new AddressNotFoundRuntimeException(e);
200-
} catch (Exception e) {
201-
throw new RuntimeException(e);
202-
}
203-
}));
204-
184+
CityResponse response = lazyLoader.getCity(ipAddress);
205185
Country country = response.getCountry();
206186
City city = response.getCity();
207187
Location location = response.getLocation();
@@ -276,18 +256,7 @@ private Map<String, Object> retrieveCityGeoData(InetAddress ipAddress) {
276256
}
277257

278258
private Map<String, Object> retrieveCountryGeoData(InetAddress ipAddress) {
279-
SpecialPermission.check();
280-
CountryResponse response = AccessController.doPrivileged((PrivilegedAction<CountryResponse>) () ->
281-
cache.putIfAbsent(ipAddress, CountryResponse.class, ip -> {
282-
try {
283-
return lazyLoader.get().country(ip);
284-
} catch (AddressNotFoundException e) {
285-
throw new AddressNotFoundRuntimeException(e);
286-
} catch (Exception e) {
287-
throw new RuntimeException(e);
288-
}
289-
}));
290-
259+
CountryResponse response = lazyLoader.getCountry(ipAddress);
291260
Country country = response.getCountry();
292261
Continent continent = response.getContinent();
293262

@@ -321,18 +290,7 @@ private Map<String, Object> retrieveCountryGeoData(InetAddress ipAddress) {
321290
}
322291

323292
private Map<String, Object> retrieveAsnGeoData(InetAddress ipAddress) {
324-
SpecialPermission.check();
325-
AsnResponse response = AccessController.doPrivileged((PrivilegedAction<AsnResponse>) () ->
326-
cache.putIfAbsent(ipAddress, AsnResponse.class, ip -> {
327-
try {
328-
return lazyLoader.get().asn(ip);
329-
} catch (AddressNotFoundException e) {
330-
throw new AddressNotFoundRuntimeException(e);
331-
} catch (Exception e) {
332-
throw new RuntimeException(e);
333-
}
334-
}));
335-
293+
AsnResponse response = lazyLoader.getAsn(ipAddress);
336294
Integer asn = response.getAutonomousSystemNumber();
337295
String organization_name = response.getAutonomousSystemOrganization();
338296
Network network = response.getNetwork();
@@ -381,11 +339,8 @@ Map<String, DatabaseReaderLazyLoader> databaseReaders() {
381339
return Collections.unmodifiableMap(databaseReaders);
382340
}
383341

384-
private final GeoIpCache cache;
385-
386-
public Factory(Map<String, DatabaseReaderLazyLoader> databaseReaders, GeoIpCache cache) {
342+
public Factory(Map<String, DatabaseReaderLazyLoader> databaseReaders) {
387343
this.databaseReaders = databaseReaders;
388-
this.cache = cache;
389344
}
390345

391346
@Override
@@ -432,8 +387,7 @@ public GeoIpProcessor create(
432387
}
433388
}
434389

435-
return new GeoIpProcessor(processorTag, description, ipField, lazyLoader, targetField, properties, ignoreMissing, cache,
436-
firstOnly);
390+
return new GeoIpProcessor(processorTag, description, ipField, lazyLoader, targetField, properties, ignoreMissing, firstOnly);
437391
}
438392
}
439393

0 commit comments

Comments
 (0)