Skip to content

Commit

Permalink
Cache the creation of parsers within DateProcessor (elastic#92880)
Browse files Browse the repository at this point in the history
cache potentially duped values in the `DateProcessor`, avoiding the creation of disposable objects during the different executions
  • Loading branch information
HiDAl authored and mark-vieira committed Jan 31, 2023
1 parent e68c258 commit 845178b
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 3 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/92880.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 92880
summary: Cache the creation of parsers within DateProcessor
area: Ingest Node
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,10 @@ enum DateFormat {
@Override
Function<String, ZonedDateTime> getFunction(String format, ZoneId timezone, Locale locale) {
return (date) -> {
TemporalAccessor accessor = DateFormatter.forPattern("iso8601").parse(date);
TemporalAccessor accessor = ISO_8601.parse(date);
// even though locale could be set to en-us, Locale.ROOT (following iso8601 calendar data rules) should be used
return DateFormatters.from(accessor, Locale.ROOT, timezone).withZoneSameInstant(timezone);
};

}
},
Unix {
Expand Down Expand Up @@ -115,6 +114,14 @@ Function<String, ZonedDateTime> getFunction(String format, ZoneId zoneId, Locale
}
};

/** It's important to keep this variable as a constant because {@link DateFormatter#forPattern(String)} is an expensive method and,
* in this case, it's a never changing value.
* <br>
* Also, we shouldn't inline it in the {@link DateFormat#Iso8601}'s enum because it'd make useless the cache used
* at {@link DateProcessor}).
*/
private static final DateFormatter ISO_8601 = DateFormatter.forPattern("iso8601");

abstract Function<String, ZonedDateTime> getFunction(String format, ZoneId timezone, Locale locale);

static DateFormat fromString(String format) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@
package org.elasticsearch.ingest.common;

import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.settings.SettingsException;
import org.elasticsearch.common.time.DateFormatter;
import org.elasticsearch.common.util.LocaleUtils;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.ConfigurationUtils;
Expand All @@ -19,14 +21,17 @@
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.script.TemplateScript;

import java.lang.ref.SoftReference;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
import java.util.function.Supplier;

public final class DateProcessor extends AbstractProcessor {

Expand Down Expand Up @@ -72,9 +77,17 @@ public final class DateProcessor extends AbstractProcessor {
this.targetField = targetField;
this.formats = formats;
this.dateParsers = new ArrayList<>(this.formats.size());

for (String format : formats) {
DateFormat dateFormat = DateFormat.fromString(format);
dateParsers.add((params) -> dateFormat.getFunction(format, newDateTimeZone(params), newLocale(params)));
dateParsers.add((params) -> {
var documentZoneId = newDateTimeZone(params);
var documentLocale = newLocale(params);
return Cache.INSTANCE.getOrCompute(
new Cache.Key(format, documentZoneId, documentLocale),
() -> dateFormat.getFunction(format, documentZoneId, documentLocale)
);
});
}
this.outputFormat = outputFormat;
formatter = DateFormatter.forPattern(this.outputFormat);
Expand Down Expand Up @@ -198,4 +211,50 @@ public DateProcessor create(
);
}
}

/**
* An ad-hoc cache class that just throws away the cached values once it's full because we don't want to affect the performance
* while applying eviction policies when adding new values or retrieving them.
*/
static final class Cache {

private static final String CACHE_CAPACITY_SETTING = "es.ingest.date_processor.cache_capacity";
static final Cache INSTANCE;

static {
var cacheSizeStr = System.getProperty(CACHE_CAPACITY_SETTING, "256");
try {
INSTANCE = new Cache(Integer.parseInt(cacheSizeStr));
} catch (NumberFormatException e) {
throw new SettingsException("{} must be a valid number but was [{}]", CACHE_CAPACITY_SETTING, cacheSizeStr);
}
}
private final ConcurrentMap<Key, SoftReference<Function<String, ZonedDateTime>>> map;
private final int capacity;

Cache(int capacity) {
if (capacity <= 0) {
throw new IllegalArgumentException("cache capacity must be a value greater than 0 but was " + capacity);
}
this.capacity = capacity;
this.map = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency(this.capacity);
}

Function<String, ZonedDateTime> getOrCompute(Key key, Supplier<Function<String, ZonedDateTime>> supplier) {
Function<String, ZonedDateTime> fn;
var element = map.get(key);
// element exist and wasn't GCed
if (element != null && (fn = element.get()) != null) {
return fn;
}
if (map.size() >= capacity) {
map.clear();
}
fn = supplier.get();
map.put(key, new SoftReference<>(fn));
return fn;
}

record Key(String format, ZoneId zoneId, Locale locale) {}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,15 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.function.Function;
import java.util.function.Supplier;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class DateProcessorTests extends ESTestCase {

Expand Down Expand Up @@ -335,4 +341,31 @@ public void testOutputFormat() {
String expectedDate = "00:00:00." + Strings.format("%09d", nanosAfterEpoch);
assertThat(ingestDocument.getFieldValue("date_as_date", String.class), equalTo(expectedDate));
}

@SuppressWarnings("unchecked")
public void testCacheIsEvictedAfterReachMaxCapacity() {
Supplier<Function<String, ZonedDateTime>> supplier1 = mock(Supplier.class);
Supplier<Function<String, ZonedDateTime>> supplier2 = mock(Supplier.class);
Function<String, ZonedDateTime> zonedDateTimeFunction1 = str -> ZonedDateTime.now();
Function<String, ZonedDateTime> zonedDateTimeFunction2 = str -> ZonedDateTime.now();
var cache = new DateProcessor.Cache(1);
var key1 = new DateProcessor.Cache.Key("format-1", ZoneId.systemDefault(), Locale.ROOT);
var key2 = new DateProcessor.Cache.Key("format-2", ZoneId.systemDefault(), Locale.ROOT);

when(supplier1.get()).thenReturn(zonedDateTimeFunction1);
when(supplier2.get()).thenReturn(zonedDateTimeFunction2);

assertEquals(cache.getOrCompute(key1, supplier1), zonedDateTimeFunction1); // 1 call to supplier1
assertEquals(cache.getOrCompute(key2, supplier2), zonedDateTimeFunction2); // 1 call to supplier2
assertEquals(cache.getOrCompute(key1, supplier1), zonedDateTimeFunction1); // 1 more call to supplier1
assertEquals(cache.getOrCompute(key1, supplier1), zonedDateTimeFunction1); // should use cached value
assertEquals(cache.getOrCompute(key2, supplier2), zonedDateTimeFunction2); // 1 more call to supplier2
assertEquals(cache.getOrCompute(key2, supplier2), zonedDateTimeFunction2); // should use cached value
assertEquals(cache.getOrCompute(key2, supplier2), zonedDateTimeFunction2); // should use cached value
assertEquals(cache.getOrCompute(key2, supplier2), zonedDateTimeFunction2); // should use cached value
assertEquals(cache.getOrCompute(key1, supplier1), zonedDateTimeFunction1); // 1 more to call to supplier1

verify(supplier1, times(3)).get();
verify(supplier2, times(2)).get();
}
}

0 comments on commit 845178b

Please sign in to comment.