diff --git a/pom.xml b/pom.xml index 792e229de..6a5274973 100644 --- a/pom.xml +++ b/pom.xml @@ -639,7 +639,7 @@ - kafka-0.10-dev + kafka-0.10 true @@ -647,7 +647,7 @@ org.apache.kafka kafka_2.10 - 0.10.2.0 + 0.10.0.0 org.slf4j diff --git a/src/main/config/log4j.dev.properties b/src/main/config/log4j.properties similarity index 73% rename from src/main/config/log4j.dev.properties rename to src/main/config/log4j.properties index a0b78828a..7008cf3c4 100644 --- a/src/main/config/log4j.dev.properties +++ b/src/main/config/log4j.properties @@ -9,9 +9,10 @@ log4j.appender.CONSOLE.Target=System.err log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] (%C:%L) %-5p %m%n -log4j.appender.ROLLINGFILE=org.apache.log4j.RollingFileAppender -log4j.appender.ROLLINGFILE.Threshold=DEBUG -log4j.appender.ROLLINGFILE.File=/tmp/secor_dev/logs/secor.log +log4j.appender.ROLLINGFILE=org.apache.log4j.DailyRollingFileAppender +log4j.appender.ROLLINGFILE.Threshold=INFO +log4j.appender.ROLLINGFILE.File=/mnt/secor/logs/${secor_group}.log +log4j.appender.ROLLINGFILE.DatePattern='.'yyyy-MM-dd # keep log files up to 1G log4j.appender.ROLLINGFILE.MaxFileSize=20MB log4j.appender.ROLLINGFILE.MaxBackupIndex=50 diff --git a/src/main/config/secor.common.properties b/src/main/config/secor.common.properties index 600060988..e9a11aeef 100644 --- a/src/main/config/secor.common.properties +++ b/src/main/config/secor.common.properties @@ -306,7 +306,7 @@ secor.thrift.message.class.*= statsd.prefixWithConsumerGroup=true # Name of field that contains timestamp for JSON, MessagePack, or Thrift message parser. (1405970352123) -message.timestamp.name=timestamp +message.timestamp.name=@timestamp # Separator for defining message.timestamp.name in a nested structure. E.g. # {"meta_data": {"created": "1405911096123", "last_modified": "1405912096123"}, "data": "test"} @@ -324,7 +324,7 @@ message.timestamp.type=i64 # Name of field that contains a timestamp, as a date Format, for JSON. (2014-08-07, Jul 23 02:16:57 2005, etc...) # Should be used when there is no timestamp in a Long format. Also ignore time zones. -message.timestamp.input.pattern= +message.timestamp.input.pattern=yyyy-MM-dd # whether timestamp field is required, it should always be required. But # for historical reason, we didn't enforce this check, there might exist some @@ -355,7 +355,7 @@ secor.max.message.size.bytes=100000 # Class that will manage uploads. Default is to use the hadoop # interface to S3. -secor.upload.manager.class=com.pinterest.secor.uploader.HadoopS3UploadManager +secor.upload.manager.class=com.pinterest.secor.uploader.S3UploadManager #Set below property to your timezone, and partitions in s3 will be created as per timezone provided secor.parser.timezone=UTC diff --git a/src/main/config/secor.dev.partition.properties b/src/main/config/secor.dev.partition.properties index d91ff69b1..7d243e995 100644 --- a/src/main/config/secor.dev.partition.properties +++ b/src/main/config/secor.dev.partition.properties @@ -33,3 +33,31 @@ secor.local.path=/tmp/secor_dev/message_logs/partition # Port of the Ostrich server. ostrich.port=9998 +# Secor custom properties + +secor.s3.output_file_pattern={partition}-{currentTimestamp}.json + +# Partition Date Output format. This is used along with PatternDateMessageParser. Defaults to 'yyyy-MM-dd' *New* +secor.partition.output_dt_format=yyyy-MM-dd + +secor.partition.prefix.enable=false +# Name of field that contains timestamp for JSON, MessagePack, or Thrift message parser. (1405970352123) +secor.partition.prefix.identifier=eid + +secor.partition.prefix.mapping={"DEFAULT":"secor-upgrade"} + +secor.max.file.age.policy=oldest + +# Output file pattern excluding prefix. Defaults to topic/partition/generation_kafkaPartition_fmOffset.gz. +# Available placeholders are +# topic - The topic name the data is being fetched +# partition - The partition name +# generation - Generation +# kafkaPartition - The kafka partition +# fmOffset - First Message offset in the file. +# randomHex - A 4 character random hex to append to the file name +# currentTimestamp - Time of upload in epoch format +# currentTime - Time of upload in HH-mm format +# currentDate - Time of upload in YYYYMMDD format +# folder - Folder to use based on message id map lookup +secor.s3.output_file_pattern={partition}-{currentTimestamp}.json \ No newline at end of file diff --git a/src/main/java/com/pinterest/secor/common/FileRegistry.java b/src/main/java/com/pinterest/secor/common/FileRegistry.java index eb67e0213..5b77e6aa9 100644 --- a/src/main/java/com/pinterest/secor/common/FileRegistry.java +++ b/src/main/java/com/pinterest/secor/common/FileRegistry.java @@ -21,6 +21,7 @@ import com.pinterest.secor.util.ReflectionUtil; import com.pinterest.secor.util.StatsUtil; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.io.compress.CompressionCodec; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -258,12 +259,13 @@ public long getModificationAgeSec(TopicPartition topicPartition) throws IOExcept public long getModificationAgeSec(TopicPartitionGroup topicPartitionGroup) throws IOException { long now = System.currentTimeMillis() / 1000L; - long result; - if (mConfig.getFileAgeYoungest()) { - result = Long.MAX_VALUE; - } else { - result = -1; - } + long result = Long.MAX_VALUE; + boolean useOldestFile = StringUtils.equals("oldest", mConfig.getMaxFileAgePolicy()); +// if (mConfig.getFileAgeYoungest()) { +// result = Long.MAX_VALUE; +// } else { +// result = -1; +// } Collection paths = getPaths(topicPartitionGroup); for (LogFilePath path : paths) { Long creationTime = mCreationTimes.get(path); @@ -272,15 +274,22 @@ public long getModificationAgeSec(TopicPartitionGroup topicPartitionGroup) throw creationTime = now; } long age = now - creationTime; - if (mConfig.getFileAgeYoungest()) { - if (age < result) { - result = age; - } - } else { - if (age > result) { - result = age; - } - } +// if (mConfig.getFileAgeYoungest()) { +// if (age < result) { +// result = age; +// } +// } else { +// if (age > result) { +// result = age; +// } +// } + if (result == Long.MAX_VALUE) { + result = age; + } else if (!useOldestFile && age < result) { + result = age; + } else if (useOldestFile && age > result) { + result = age; + } } if (result == Long.MAX_VALUE) { result = -1; diff --git a/src/main/java/com/pinterest/secor/common/LogFilePath.java b/src/main/java/com/pinterest/secor/common/LogFilePath.java index 6f0e8b14c..c20cc2dc1 100644 --- a/src/main/java/com/pinterest/secor/common/LogFilePath.java +++ b/src/main/java/com/pinterest/secor/common/LogFilePath.java @@ -19,12 +19,19 @@ import com.pinterest.secor.message.ParsedMessage; import org.apache.commons.codec.binary.Base64; import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang.text.StrSubstitutor; import java.io.UnsupportedEncodingException; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; +import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Arrays; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; /** * LogFilePath represents path of a log file. It contains convenience method for building and @@ -54,6 +61,10 @@ public class LogFilePath { private final long[] mOffsets; private final String mExtension; private MessageDigest messageDigest; + + private String mOutputFilePattern; + private SimpleDateFormat timeFormat = new SimpleDateFormat("HH-mm"); + private SimpleDateFormat dateFormat = new SimpleDateFormat("YYYYMMdd"); public LogFilePath(String prefix, String topic, String[] partitions, int generation, @@ -139,10 +150,23 @@ private static String[] subArray(String[] array, int startIndex, int endIndex) { return result; } - public LogFilePath withPrefix(String prefix) { - return new LogFilePath(prefix, mTopic, mPartitions, mGeneration, mKafkaPartitions, mOffsets, - mExtension); - } + public LogFilePath withPrefix(String prefix, SecorConfig mConfig) { + return new LogFilePath(prefix, mTopic, mPartitions, mGeneration, mKafkaPartitions, mOffsets, mExtension, + mConfig); + } + + public LogFilePath(String prefix, String topic, String[] partitions, int generation, int[] kafkaPartition, + long[] offset, String extension, SecorConfig config) { + + mPrefix = prefix; + mTopic = topic; + mPartitions = partitions; + mGeneration = generation; + mKafkaPartitions = kafkaPartition; + mOffsets = offset; + mExtension = extension; + mOutputFilePattern = config.getS3OutputFilePattern(); + } public String getLogFileParentDir() { ArrayList elements = new ArrayList(); @@ -191,6 +215,9 @@ private String getLogFileBasename() { } public String getLogFilePath() { + if (StringUtils.isNotBlank(mOutputFilePattern)) { + return getLogFilePath(mOutputFilePattern); + } String basename = getLogFileBasename(); ArrayList pathElements = new ArrayList(); @@ -199,6 +226,36 @@ public String getLogFilePath() { return StringUtils.join(pathElements, "/") + mExtension; } + + private String getLogFilePath(String pattern) { + + List pathElements = new ArrayList(); + pathElements.add(mPrefix); + pathElements.add(StrSubstitutor.replace(pattern, getValueMap(), "{", "}")); + System.out.println("Path:" + StringUtils.join(pathElements, "/") + mExtension); + return StringUtils.join(pathElements, "/") + mExtension; + } + + private Map getValueMap() { + + Map valueMap = new HashMap(); + valueMap.put("randomHex", getRandomHex()); + valueMap.put("partition", mPartitions[0]); + valueMap.put("topic", mTopic); + valueMap.put("generation", mGeneration + ""); + valueMap.put("kafkaPartition", mKafkaPartitions[0] + ""); + valueMap.put("fmOffset", String.format("%020d", mOffsets[0])); + valueMap.put("currentTimestamp", System.currentTimeMillis() + ""); + valueMap.put("currentTime", timeFormat.format(new Date())); + valueMap.put("currentDate", dateFormat.format(new Date())); + return valueMap; + } + + public static String getRandomHex() { + + Random random = new Random(); + return StringUtils.substring(Integer.toHexString(random.nextInt()), 0, 4); + } public String getLogFileCrcPath() { String basename = "." + getLogFileBasename() + ".crc"; diff --git a/src/main/java/com/pinterest/secor/common/SecorConfig.java b/src/main/java/com/pinterest/secor/common/SecorConfig.java index 993a3163a..41a088597 100644 --- a/src/main/java/com/pinterest/secor/common/SecorConfig.java +++ b/src/main/java/com/pinterest/secor/common/SecorConfig.java @@ -611,4 +611,31 @@ public Map getORCMessageSchema() { public String getORCSchemaProviderClass(){ return getString("secor.orc.schema.provider"); } + + public String getS3OutputFilePattern() { + return getString("secor.s3.output_file_pattern"); + } + + public String getPartitionPrefixMapping() { + String[] map = getStringArray("secor.partition.prefix.mapping"); + if (null != map) + return StringUtils.join(map, ','); + return ""; + } + + public boolean isPartitionPrefixEnabled() { + return getBoolean("secor.partition.prefix.enable", false); + } + + public String getPartitionPrefixIdentifier() { + return getString("secor.partition.prefix.identifier", ""); + } + + public String getPartitionOutputDtFormat() { + return getString("secor.partition.output_dt_format"); + } + + public String getMaxFileAgePolicy() { + return getString("secor.max.file.age.policy"); + } } diff --git a/src/main/java/com/pinterest/secor/parser/ChannelDateMessageParser.java b/src/main/java/com/pinterest/secor/parser/ChannelDateMessageParser.java new file mode 100644 index 000000000..85edc4bc4 --- /dev/null +++ b/src/main/java/com/pinterest/secor/parser/ChannelDateMessageParser.java @@ -0,0 +1,110 @@ +package com.pinterest.secor.parser; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; + +import net.minidev.json.JSONObject; +import net.minidev.json.JSONValue; + +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.pinterest.secor.common.SecorConfig; +import com.pinterest.secor.message.Message; + +/** + * ChannelDateMessageParser extracts timestamp field (specified by 'message.timestamp.name') and the date pattern (specified by 'message.timestamp.input.pattern'). + * The output file pattern is fetched from the property 'secor.partition.output_dt_format'. + * + * This generic class can even handle the DateMessageParse functionality. For ex: it will generate the same partition when the 'secor.partition.output_dt_format' property is set to "'dt='yyyy-MM-dd" + * + * @see http://docs.oracle.com/javase/6/docs/api/java/text/SimpleDateFormat.html + * + * @author Santhosh Vasabhaktula (santhosh.vasabhaktula@gmail.com) + * + */ +public class ChannelDateMessageParser extends MessageParser { + + private static final Logger LOG = LoggerFactory.getLogger(PatternDateMessageParser.class); + protected static final String defaultDate = "1970-01-01"; + protected static final String defaultFormatter = "yyyy-MM-dd"; + private Map partitionPrefixMap; + + public ChannelDateMessageParser(SecorConfig config) { + super(config); + partitionPrefixMap = new HashMap(); + String partitionMapping = config.getPartitionPrefixMapping(); + if(null != partitionMapping) { + JSONObject jsonObject = (JSONObject) JSONValue.parse(partitionMapping); + for(Entry entry: jsonObject.entrySet()) { + partitionPrefixMap.put(entry.getKey(), entry.getValue().toString() + "/"); + } + } + } + + @Override + public String[] extractPartitions(Message message) { + + JSONObject jsonObject = (JSONObject) JSONValue.parse(message.getPayload()); + boolean prefixEnabled = mConfig.isPartitionPrefixEnabled(); + String result[] = { prefixEnabled ? partitionPrefixMap.get("DEFAULT") + defaultDate : defaultDate }; + + if (jsonObject != null) { + Object fieldValue = jsonObject.get(mConfig.getMessageTimestampName()); + Object eventValue = jsonObject.get(mConfig.getPartitionPrefixIdentifier()); + Object inputPattern = mConfig.getMessageTimestampInputPattern(); + if (fieldValue != null && inputPattern != null) { + try { + SimpleDateFormat outputFormatter = new SimpleDateFormat(StringUtils.defaultIfBlank(mConfig.getPartitionOutputDtFormat(), defaultFormatter)); + Date dateFormat = null; + if(fieldValue instanceof Number) { + dateFormat = new Date(((Number)fieldValue).longValue()); + } else { + SimpleDateFormat inputFormatter = new SimpleDateFormat(inputPattern.toString()); + dateFormat = inputFormatter.parse(fieldValue.toString()); + } + Map context = (HashMap)jsonObject.get("context"); + String channel = (String) context.get("channel"); + String path = "channel-exhaust/" + channel + "/" + outputFormatter.format(dateFormat); + result[0] = prefixEnabled ? getPrefix(eventValue.toString()) + path : path; + return result; + } catch (Exception e) { + LOG.warn("Impossible to convert date = " + fieldValue.toString() + + " for the input pattern = " + inputPattern.toString() + + ". Using date default=" + result[0]); + } + } + } + + return result; + } + + private String getPrefix(String prefixIdentifier) { + String prefix = partitionPrefixMap.get(prefixIdentifier); + if (StringUtils.isBlank(prefix)) prefix = partitionPrefixMap.get("DEFAULT"); + return prefix; + } + +} \ No newline at end of file diff --git a/src/main/java/com/pinterest/secor/parser/PartitionFinalizer.java b/src/main/java/com/pinterest/secor/parser/PartitionFinalizer.java index 3ab672895..d8187bac7 100644 --- a/src/main/java/com/pinterest/secor/parser/PartitionFinalizer.java +++ b/src/main/java/com/pinterest/secor/parser/PartitionFinalizer.java @@ -103,7 +103,7 @@ private void finalizePartitionsUpTo(String topic, String[] uptoPartitions) throw mConfig.getGeneration(), 0, 0, mFileExtension); if (FileUtil.s3PathPrefixIsAltered(logFilePath.getLogFilePath(), mConfig)) { - logFilePath = logFilePath.withPrefix(FileUtil.getS3AlternativePrefix(mConfig)); + logFilePath = logFilePath.withPrefix(FileUtil.getS3AlternativePrefix(mConfig), mConfig); } String logFileDir = logFilePath.getLogFileDir(); @@ -182,7 +182,7 @@ private void finalizePartitionsUpTo(String topic, String[] uptoPartitions) throw mConfig.getGeneration(), 0, 0, mFileExtension); if (FileUtil.s3PathPrefixIsAltered(logFilePath.getLogFilePath(), mConfig)) { - logFilePath = logFilePath.withPrefix(FileUtil.getS3AlternativePrefix(mConfig)); + logFilePath = logFilePath.withPrefix(FileUtil.getS3AlternativePrefix(mConfig), mConfig); LOG.info("Will finalize alternative s3 logFilePath {}", logFilePath); } diff --git a/src/main/java/com/pinterest/secor/parser/PatternDateMessageParser.java b/src/main/java/com/pinterest/secor/parser/PatternDateMessageParser.java new file mode 100644 index 000000000..0e7139657 --- /dev/null +++ b/src/main/java/com/pinterest/secor/parser/PatternDateMessageParser.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.pinterest.secor.parser; + +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; + +import net.minidev.json.JSONObject; +import net.minidev.json.JSONValue; + +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.pinterest.secor.common.SecorConfig; +import com.pinterest.secor.message.Message; + +/** + * PatternDateMessageParser extracts timestamp field (specified by 'message.timestamp.name') and the date pattern (specified by 'message.timestamp.input.pattern'). + * The output file pattern is fetched from the property 'secor.partition.output_dt_format'. + * + * This generic class can even handle the DateMessageParse functionality. For ex: it will generate the same partition when the 'secor.partition.output_dt_format' property is set to "'dt='yyyy-MM-dd" + * + * @see http://docs.oracle.com/javase/6/docs/api/java/text/SimpleDateFormat.html + * + * @author Santhosh Vasabhaktula (santhosh.vasabhaktula@gmail.com) + * + */ +public class PatternDateMessageParser extends MessageParser { + + private static final Logger LOG = LoggerFactory.getLogger(PatternDateMessageParser.class); + protected static final String defaultDate = "1970-01-01"; + protected static final String defaultFormatter = "yyyy-MM-dd"; + private Map partitionPrefixMap; + + public PatternDateMessageParser(SecorConfig config) { + super(config); + partitionPrefixMap = new HashMap(); + String partitionMapping = config.getPartitionPrefixMapping(); + if(null != partitionMapping) { + JSONObject jsonObject = (JSONObject) JSONValue.parse(partitionMapping); + for(Entry entry: jsonObject.entrySet()) { + partitionPrefixMap.put(entry.getKey(), entry.getValue().toString() + "/"); + } + } + } + + @Override + public String[] extractPartitions(Message message) { + + JSONObject jsonObject = (JSONObject) JSONValue.parse(message.getPayload()); + boolean prefixEnabled = mConfig.isPartitionPrefixEnabled(); + String result[] = { prefixEnabled ? partitionPrefixMap.get("DEFAULT") + defaultDate : defaultDate }; + if (jsonObject != null) { + Object fieldValue = jsonObject.get(mConfig.getMessageTimestampName()); + Object eventValue = jsonObject.get(mConfig.getPartitionPrefixIdentifier()); + Object inputPattern = mConfig.getMessageTimestampInputPattern(); + if (fieldValue != null && inputPattern != null) { + try { + SimpleDateFormat outputFormatter = new SimpleDateFormat(StringUtils.defaultIfBlank(mConfig.getPartitionOutputDtFormat(), defaultFormatter)); + Date dateFormat = null; + if(fieldValue instanceof Number) { + dateFormat = new Date(((Number)fieldValue).longValue()); + } else { + SimpleDateFormat inputFormatter = new SimpleDateFormat(inputPattern.toString()); + dateFormat = inputFormatter.parse(fieldValue.toString()); + } + result[0] = prefixEnabled ? getPrefix(eventValue.toString()) + outputFormatter.format(dateFormat) : outputFormatter.format(dateFormat); + return result; + } catch (Exception e) { + LOG.warn("Impossible to convert date = " + fieldValue.toString() + + " for the input pattern = " + inputPattern.toString() + + ". Using date default=" + result[0]); + } + } + } + + return result; + } + + private String getPrefix(String prefixIdentifier) { + String prefix = partitionPrefixMap.get(prefixIdentifier); + if (StringUtils.isBlank(prefix)) prefix = partitionPrefixMap.get("DEFAULT"); + return prefix; + } + +} \ No newline at end of file diff --git a/src/main/java/com/pinterest/secor/uploader/AzureUploadManager.java b/src/main/java/com/pinterest/secor/uploader/AzureUploadManager.java index 412ab6341..873e4a6be 100644 --- a/src/main/java/com/pinterest/secor/uploader/AzureUploadManager.java +++ b/src/main/java/com/pinterest/secor/uploader/AzureUploadManager.java @@ -47,7 +47,7 @@ public AzureUploadManager(SecorConfig config) throws Exception { @java.lang.Override public Handle upload(LogFilePath localPath) throws Exception { final String azureContainer = mConfig.getAzureContainer(); - final String azureKey = localPath.withPrefix(mConfig.getAzurePath()).getLogFilePath(); + final String azureKey = localPath.withPrefix(mConfig.getAzurePath(), mConfig).getLogFilePath(); final File localFile = new File(localPath.getLogFilePath()); LOG.info("uploading file {} to azure://{}/{}", localFile, azureContainer, azureKey); diff --git a/src/main/java/com/pinterest/secor/uploader/GsUploadManager.java b/src/main/java/com/pinterest/secor/uploader/GsUploadManager.java index b7e3a175d..634b6b531 100644 --- a/src/main/java/com/pinterest/secor/uploader/GsUploadManager.java +++ b/src/main/java/com/pinterest/secor/uploader/GsUploadManager.java @@ -70,7 +70,7 @@ public GsUploadManager(SecorConfig config) throws Exception { @Override public Handle upload(LogFilePath localPath) throws Exception { final String gsBucket = mConfig.getGsBucket(); - final String gsKey = localPath.withPrefix(mConfig.getGsPath()).getLogFilePath(); + final String gsKey = localPath.withPrefix(mConfig.getGsPath(), mConfig).getLogFilePath(); final File localFile = new File(localPath.getLogFilePath()); final boolean directUpload = mConfig.getGsDirectUpload(); diff --git a/src/main/java/com/pinterest/secor/uploader/HadoopS3UploadManager.java b/src/main/java/com/pinterest/secor/uploader/HadoopS3UploadManager.java index 166795d1a..d4b0b5c7e 100644 --- a/src/main/java/com/pinterest/secor/uploader/HadoopS3UploadManager.java +++ b/src/main/java/com/pinterest/secor/uploader/HadoopS3UploadManager.java @@ -45,12 +45,12 @@ public HadoopS3UploadManager(SecorConfig config) { public Handle upload(LogFilePath localPath) throws Exception { String prefix = FileUtil.getPrefix(localPath.getTopic(), mConfig); - LogFilePath path = localPath.withPrefix(prefix); + LogFilePath path = localPath.withPrefix(prefix, mConfig); final String localLogFilename = localPath.getLogFilePath(); final String logFileName; if (FileUtil.s3PathPrefixIsAltered(path.getLogFilePath(), mConfig)) { - logFileName = localPath.withPrefix(FileUtil.getS3AlternativePrefix(mConfig)).getLogFilePath(); + logFileName = localPath.withPrefix(FileUtil.getS3AlternativePrefix(mConfig), mConfig).getLogFilePath(); LOG.info("Will upload file to alternative s3 prefix path {}", logFileName); } else { diff --git a/src/main/java/com/pinterest/secor/uploader/S3UploadManager.java b/src/main/java/com/pinterest/secor/uploader/S3UploadManager.java index 4d43d9f9d..391a4f6b8 100644 --- a/src/main/java/com/pinterest/secor/uploader/S3UploadManager.java +++ b/src/main/java/com/pinterest/secor/uploader/S3UploadManager.java @@ -141,7 +141,7 @@ public Handle upload(LogFilePath localPath) throws Exception { File localFile = new File(localPath.getLogFilePath()); - if (FileUtil.s3PathPrefixIsAltered(localPath.withPrefix(curS3Path).getLogFilePath(), mConfig)) { + if (FileUtil.s3PathPrefixIsAltered(localPath.withPrefix(curS3Path, mConfig).getLogFilePath(), mConfig)) { curS3Path = FileUtil.getS3AlternativePathPrefix(mConfig); LOG.info("Will upload file {} to alternative s3 path s3://{}/{}", localFile, s3Bucket, curS3Path); } @@ -149,12 +149,13 @@ public Handle upload(LogFilePath localPath) throws Exception { if (mConfig.getS3MD5HashPrefix()) { // add MD5 hash to the prefix to have proper partitioning of the secor logs on s3 String md5Hash = FileUtil.getMd5Hash(localPath.getTopic(), localPath.getPartitions()); - s3Key = localPath.withPrefix(md5Hash + "/" + curS3Path).getLogFilePath(); + s3Key = localPath.withPrefix(md5Hash + "/" + curS3Path, mConfig).getLogFilePath(); } else { - s3Key = localPath.withPrefix(curS3Path).getLogFilePath(); + s3Key = localPath.withPrefix(curS3Path, mConfig).getLogFilePath(); } - + if(s3Key.charAt(0) == '/') s3Key = s3Key.substring(1); + // make upload request, taking into account configured options for encryption PutObjectRequest uploadRequest = new PutObjectRequest(s3Bucket, s3Key, localFile); if (!mConfig.getAwsSseType().isEmpty()) { diff --git a/src/main/scripts/run_kafka_class.sh b/src/main/scripts/run_kafka_class.sh index 5788ca7c7..9ccf01953 100755 --- a/src/main/scripts/run_kafka_class.sh +++ b/src/main/scripts/run_kafka_class.sh @@ -28,7 +28,7 @@ fi KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false " # Log4j settings -KAFKA_LOG4J_OPTS="-Dlog4j.configuration=log4j.dev.properties" +KAFKA_LOG4J_OPTS="-Dlog4j.configuration=log4j.properties" # Generic jvm settings you want to add KAFKA_OPTS="" diff --git a/src/test/java/com/pinterest/secor/common/FileRegistryTest.java b/src/test/java/com/pinterest/secor/common/FileRegistryTest.java index a40844cff..8b8ea5bf6 100644 --- a/src/test/java/com/pinterest/secor/common/FileRegistryTest.java +++ b/src/test/java/com/pinterest/secor/common/FileRegistryTest.java @@ -58,6 +58,7 @@ public void setUp() throws Exception { properties.addProperty("secor.file.reader.writer.factory", "com.pinterest.secor.io.impl.SequenceFileReaderWriterFactory"); properties.addProperty("secor.file.age.youngest", true); + properties.addProperty("secor.max.file.age.policy", ""); SecorConfig secorConfig = new SecorConfig(properties); mRegistry = new FileRegistry(secorConfig); mLogFilePath = new LogFilePath("/some_parent_dir", PATH);