Skip to content

Commit

Permalink
Implement the ability to toggle dropping CLP processed field from the…
Browse files Browse the repository at this point in the history
… original generic record. (apache#14534)

* Implement the ability to toggle dropping CLP processed field from the original generic record.

* Fix unit test
  • Loading branch information
jackluo923 authored Nov 27, 2024
1 parent 34b43a3 commit da897dd
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -253,5 +253,9 @@ private void encodeFieldWithClp(String key, Object value, GenericRow to) {
to.putValue(key + ClpRewriter.LOGTYPE_COLUMN_SUFFIX, logtype);
to.putValue(key + ClpRewriter.DICTIONARY_VARS_COLUMN_SUFFIX, dictVars);
to.putValue(key + ClpRewriter.ENCODED_VARS_COLUMN_SUFFIX, encodedVars);

if (!_config.getRemoveProcessedFields()) {
to.putValue(key, value);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
public class CLPLogRecordExtractorConfig implements RecordExtractorConfig {
public static final String FIELDS_FOR_CLP_ENCODING_CONFIG_KEY = "fieldsForClpEncoding";
public static final String FIELDS_FOR_CLP_ENCODING_SEPARATOR = ",";
public static final String REMOVE_PROCESSED_FIELDS_CONFIG_KEY = "removeProcessedFields";
public static final String UNENCODABLE_FIELD_SUFFIX_CONFIG_KEY = "unencodableFieldSuffix";
public static final String UNENCODABLE_FIELD_ERROR_CONFIG_KEY = "unencodableFieldError";

Expand All @@ -54,6 +55,7 @@ public class CLPLogRecordExtractorConfig implements RecordExtractorConfig {
private final Set<String> _fieldsForClpEncoding = new HashSet<>();
private String _unencodableFieldSuffix = null;
private String _unencodableFieldError = null;
private boolean _removeProcessedFields = false;

@Override
public void init(Map<String, String> props) {
Expand All @@ -75,6 +77,9 @@ public void init(Map<String, String> props) {
}
}

// True if field is specified and equal to true (ignore case), false for all other cases
_removeProcessedFields = Boolean.parseBoolean(props.get(REMOVE_PROCESSED_FIELDS_CONFIG_KEY));

String unencodableFieldSuffix = props.get(UNENCODABLE_FIELD_SUFFIX_CONFIG_KEY);
if (null != unencodableFieldSuffix) {
if (unencodableFieldSuffix.length() == 0) {
Expand All @@ -98,6 +103,10 @@ public Set<String> getFieldsForClpEncoding() {
return _fieldsForClpEncoding;
}

public boolean getRemoveProcessedFields() {
return _removeProcessedFields;
}

public String getUnencodableFieldSuffix() {
return _unencodableFieldSuffix;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

import static org.apache.pinot.plugin.inputformat.clplog.CLPLogRecordExtractorConfig.FIELDS_FOR_CLP_ENCODING_CONFIG_KEY;
import static org.apache.pinot.plugin.inputformat.clplog.CLPLogRecordExtractorConfig.FIELDS_FOR_CLP_ENCODING_SEPARATOR;
import static org.apache.pinot.plugin.inputformat.clplog.CLPLogRecordExtractorConfig.REMOVE_PROCESSED_FIELDS_CONFIG_KEY;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNull;
Expand All @@ -54,11 +55,17 @@ public class CLPLogRecordExtractorTest {

@Test
public void testCLPEncoding() {
testCLPEncoding(true);
testCLPEncoding(false);
}

public void testCLPEncoding(boolean removeProcessedField) {
Map<String, String> props = new HashMap<>();
Set<String> fieldsToRead = new HashSet<>();
// Add some fields for CLP encoding
props.put(FIELDS_FOR_CLP_ENCODING_CONFIG_KEY,
_MESSAGE_1_FIELD_NAME + FIELDS_FOR_CLP_ENCODING_SEPARATOR + _MESSAGE_2_FIELD_NAME);
props.put(REMOVE_PROCESSED_FIELDS_CONFIG_KEY, String.valueOf(removeProcessedField));
addCLPEncodedField(_MESSAGE_1_FIELD_NAME, fieldsToRead);
addCLPEncodedField(_MESSAGE_2_FIELD_NAME, fieldsToRead);
// Add some unencoded fields
Expand All @@ -70,25 +77,27 @@ public void testCLPEncoding() {
row = extract(props, fieldsToRead);
assertEquals(row.getValue(_TIMESTAMP_FIELD_NAME), _TIMESTAMP_FIELD_VALUE);
assertNull(row.getValue(_LEVEL_FIELD_NAME));
validateClpEncodedField(row, _MESSAGE_1_FIELD_NAME, _MESSAGE_1_FIELD_VALUE);
validateClpEncodedField(row, _MESSAGE_2_FIELD_NAME, _MESSAGE_2_FIELD_VALUE);
validateClpEncodedField(row, _MESSAGE_1_FIELD_NAME, _MESSAGE_1_FIELD_VALUE, removeProcessedField);
validateClpEncodedField(row, _MESSAGE_2_FIELD_NAME, _MESSAGE_2_FIELD_VALUE, removeProcessedField);

// Test extracting all fields
row = extract(props, null);
assertEquals(row.getValue(_TIMESTAMP_FIELD_NAME), _TIMESTAMP_FIELD_VALUE);
assertEquals(row.getValue(_LEVEL_FIELD_NAME), _LEVEL_FIELD_VALUE);
validateClpEncodedField(row, _MESSAGE_1_FIELD_NAME, _MESSAGE_1_FIELD_VALUE);
validateClpEncodedField(row, _MESSAGE_2_FIELD_NAME, _MESSAGE_2_FIELD_VALUE);
validateClpEncodedField(row, _MESSAGE_1_FIELD_NAME, _MESSAGE_1_FIELD_VALUE, removeProcessedField);
validateClpEncodedField(row, _MESSAGE_2_FIELD_NAME, _MESSAGE_2_FIELD_VALUE, removeProcessedField);
}

@Test
public void testBadCLPEncodingConfig() {
Map<String, String> props = new HashMap<>();
Set<String> fieldsToRead = new HashSet<>();
boolean removeProcessedField = true;
// Add some fields for CLP encoding with some mistakenly empty field names
String separator = FIELDS_FOR_CLP_ENCODING_SEPARATOR;
props.put(FIELDS_FOR_CLP_ENCODING_CONFIG_KEY, separator + _MESSAGE_1_FIELD_NAME
+ separator + separator + _MESSAGE_2_FIELD_NAME + separator);
props.put(REMOVE_PROCESSED_FIELDS_CONFIG_KEY, String.valueOf(removeProcessedField));
addCLPEncodedField(_MESSAGE_1_FIELD_NAME, fieldsToRead);
addCLPEncodedField(_MESSAGE_2_FIELD_NAME, fieldsToRead);
// Add some unencoded fields
Expand All @@ -100,16 +109,16 @@ public void testBadCLPEncodingConfig() {
row = extract(props, fieldsToRead);
assertEquals(row.getValue(_TIMESTAMP_FIELD_NAME), _TIMESTAMP_FIELD_VALUE);
assertNull(row.getValue(_LEVEL_FIELD_NAME));
validateClpEncodedField(row, _MESSAGE_1_FIELD_NAME, _MESSAGE_1_FIELD_VALUE);
validateClpEncodedField(row, _MESSAGE_2_FIELD_NAME, _MESSAGE_2_FIELD_VALUE);
validateClpEncodedField(row, _MESSAGE_1_FIELD_NAME, _MESSAGE_1_FIELD_VALUE, removeProcessedField);
validateClpEncodedField(row, _MESSAGE_2_FIELD_NAME, _MESSAGE_2_FIELD_VALUE, removeProcessedField);

// Test extracting all fields
row = extract(props, null);
assertEquals(row.getValue(_TIMESTAMP_FIELD_NAME), _TIMESTAMP_FIELD_VALUE);
assertEquals(row.getValue(_LEVEL_FIELD_NAME), _LEVEL_FIELD_VALUE);
assertEquals(row.getValue(_BOOLEAN_FIELD_NAME), _BOOLEAN_FIELD_VALUE);
validateClpEncodedField(row, _MESSAGE_1_FIELD_NAME, _MESSAGE_1_FIELD_VALUE);
validateClpEncodedField(row, _MESSAGE_2_FIELD_NAME, _MESSAGE_2_FIELD_VALUE);
validateClpEncodedField(row, _MESSAGE_1_FIELD_NAME, _MESSAGE_1_FIELD_VALUE, removeProcessedField);
validateClpEncodedField(row, _MESSAGE_2_FIELD_NAME, _MESSAGE_2_FIELD_VALUE, removeProcessedField);
}

@Test
Expand Down Expand Up @@ -163,10 +172,13 @@ private GenericRow extract(Map<String, String> props, Set<String> fieldsToRead)
return row;
}

private void validateClpEncodedField(GenericRow row, String fieldName, String expectedFieldValue) {
private void validateClpEncodedField(GenericRow row, String fieldName, String expectedFieldValue,
boolean removeProcessedField) {
try {
// Decode and validate field
assertNull(row.getValue(fieldName));
if (removeProcessedField) {
assertNull(row.getValue(fieldName));
}
String logtype = (String) row.getValue(fieldName + ClpRewriter.LOGTYPE_COLUMN_SUFFIX);
assertNotEquals(logtype, null);
String[] dictionaryVars =
Expand Down

0 comments on commit da897dd

Please sign in to comment.