Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Secor upgrade #4

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -639,15 +639,15 @@
</dependencies>
</profile>
<profile>
<id>kafka-0.10-dev</id>
<id>kafka-0.10</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.10.2.0</version>
<version>0.10.0.0</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ log4j.appender.CONSOLE.Target=System.err
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] (%C:%L) %-5p %m%n

log4j.appender.ROLLINGFILE=org.apache.log4j.RollingFileAppender
log4j.appender.ROLLINGFILE.Threshold=DEBUG
log4j.appender.ROLLINGFILE.File=/tmp/secor_dev/logs/secor.log
log4j.appender.ROLLINGFILE=org.apache.log4j.DailyRollingFileAppender
log4j.appender.ROLLINGFILE.Threshold=INFO
log4j.appender.ROLLINGFILE.File=/mnt/secor/logs/${secor_group}.log
log4j.appender.ROLLINGFILE.DatePattern='.'yyyy-MM-dd
# keep log files up to 1G
log4j.appender.ROLLINGFILE.MaxFileSize=20MB
log4j.appender.ROLLINGFILE.MaxBackupIndex=50
Expand Down
6 changes: 3 additions & 3 deletions src/main/config/secor.common.properties
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ secor.thrift.message.class.*=
statsd.prefixWithConsumerGroup=true

# Name of field that contains timestamp for JSON, MessagePack, or Thrift message parser. (1405970352123)
message.timestamp.name=timestamp
message.timestamp.name=@timestamp

# Separator for defining message.timestamp.name in a nested structure. E.g.
# {"meta_data": {"created": "1405911096123", "last_modified": "1405912096123"}, "data": "test"}
Expand All @@ -324,7 +324,7 @@ message.timestamp.type=i64

# Name of field that contains a timestamp, as a date Format, for JSON. (2014-08-07, Jul 23 02:16:57 2005, etc...)
# Should be used when there is no timestamp in a Long format. Also ignore time zones.
message.timestamp.input.pattern=
message.timestamp.input.pattern=yyyy-MM-dd

# whether timestamp field is required, it should always be required. But
# for historical reason, we didn't enforce this check, there might exist some
Expand Down Expand Up @@ -355,7 +355,7 @@ secor.max.message.size.bytes=100000

# Class that will manage uploads. Default is to use the hadoop
# interface to S3.
secor.upload.manager.class=com.pinterest.secor.uploader.HadoopS3UploadManager
secor.upload.manager.class=com.pinterest.secor.uploader.S3UploadManager

#Set below property to your timezone, and partitions in s3 will be created as per timezone provided
secor.parser.timezone=UTC
Expand Down
28 changes: 28 additions & 0 deletions src/main/config/secor.dev.partition.properties
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,31 @@ secor.local.path=/tmp/secor_dev/message_logs/partition
# Port of the Ostrich server.
ostrich.port=9998

# Secor custom properties

secor.s3.output_file_pattern={partition}-{currentTimestamp}.json

# Partition Date Output format. This is used along with PatternDateMessageParser. Defaults to 'yyyy-MM-dd' *New*
secor.partition.output_dt_format=yyyy-MM-dd

secor.partition.prefix.enable=false
# Name of field that contains timestamp for JSON, MessagePack, or Thrift message parser. (1405970352123)
secor.partition.prefix.identifier=eid

secor.partition.prefix.mapping={"DEFAULT":"secor-upgrade"}

secor.max.file.age.policy=oldest

# Output file pattern excluding prefix. Defaults to topic/partition/generation_kafkaPartition_fmOffset.gz.
# Available placeholders are
# topic - The topic name the data is being fetched
# partition - The partition name
# generation - Generation
# kafkaPartition - The kafka partition
# fmOffset - First Message offset in the file.
# randomHex - A 4 character random hex to append to the file name
# currentTimestamp - Time of upload in epoch format
# currentTime - Time of upload in HH-mm format
# currentDate - Time of upload in YYYYMMDD format
# folder - Folder to use based on message id map lookup
secor.s3.output_file_pattern={partition}-{currentTimestamp}.json
39 changes: 24 additions & 15 deletions src/main/java/com/pinterest/secor/common/FileRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.pinterest.secor.util.ReflectionUtil;
import com.pinterest.secor.util.StatsUtil;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -258,12 +259,13 @@ public long getModificationAgeSec(TopicPartition topicPartition) throws IOExcept

public long getModificationAgeSec(TopicPartitionGroup topicPartitionGroup) throws IOException {
long now = System.currentTimeMillis() / 1000L;
long result;
if (mConfig.getFileAgeYoungest()) {
result = Long.MAX_VALUE;
} else {
result = -1;
}
long result = Long.MAX_VALUE;
boolean useOldestFile = StringUtils.equals("oldest", mConfig.getMaxFileAgePolicy());
// if (mConfig.getFileAgeYoungest()) {
// result = Long.MAX_VALUE;
// } else {
// result = -1;
// }
Collection<LogFilePath> paths = getPaths(topicPartitionGroup);
for (LogFilePath path : paths) {
Long creationTime = mCreationTimes.get(path);
Expand All @@ -272,15 +274,22 @@ public long getModificationAgeSec(TopicPartitionGroup topicPartitionGroup) throw
creationTime = now;
}
long age = now - creationTime;
if (mConfig.getFileAgeYoungest()) {
if (age < result) {
result = age;
}
} else {
if (age > result) {
result = age;
}
}
// if (mConfig.getFileAgeYoungest()) {
// if (age < result) {
// result = age;
// }
// } else {
// if (age > result) {
// result = age;
// }
// }
if (result == Long.MAX_VALUE) {
result = age;
} else if (!useOldestFile && age < result) {
result = age;
} else if (useOldestFile && age > result) {
result = age;
}
}
if (result == Long.MAX_VALUE) {
result = -1;
Expand Down
65 changes: 61 additions & 4 deletions src/main/java/com/pinterest/secor/common/LogFilePath.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,19 @@
import com.pinterest.secor.message.ParsedMessage;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.text.StrSubstitutor;

import java.io.UnsupportedEncodingException;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;

/**
* LogFilePath represents path of a log file. It contains convenience method for building and
Expand Down Expand Up @@ -54,6 +61,10 @@ public class LogFilePath {
private final long[] mOffsets;
private final String mExtension;
private MessageDigest messageDigest;

private String mOutputFilePattern;
private SimpleDateFormat timeFormat = new SimpleDateFormat("HH-mm");
private SimpleDateFormat dateFormat = new SimpleDateFormat("YYYYMMdd");


public LogFilePath(String prefix, String topic, String[] partitions, int generation,
Expand Down Expand Up @@ -139,10 +150,23 @@ private static String[] subArray(String[] array, int startIndex, int endIndex) {
return result;
}

public LogFilePath withPrefix(String prefix) {
return new LogFilePath(prefix, mTopic, mPartitions, mGeneration, mKafkaPartitions, mOffsets,
mExtension);
}
public LogFilePath withPrefix(String prefix, SecorConfig mConfig) {
return new LogFilePath(prefix, mTopic, mPartitions, mGeneration, mKafkaPartitions, mOffsets, mExtension,
mConfig);
}

public LogFilePath(String prefix, String topic, String[] partitions, int generation, int[] kafkaPartition,
long[] offset, String extension, SecorConfig config) {

mPrefix = prefix;
mTopic = topic;
mPartitions = partitions;
mGeneration = generation;
mKafkaPartitions = kafkaPartition;
mOffsets = offset;
mExtension = extension;
mOutputFilePattern = config.getS3OutputFilePattern();
}

public String getLogFileParentDir() {
ArrayList<String> elements = new ArrayList<String>();
Expand Down Expand Up @@ -191,6 +215,9 @@ private String getLogFileBasename() {
}

public String getLogFilePath() {
if (StringUtils.isNotBlank(mOutputFilePattern)) {
return getLogFilePath(mOutputFilePattern);
}
String basename = getLogFileBasename();

ArrayList<String> pathElements = new ArrayList<String>();
Expand All @@ -199,6 +226,36 @@ public String getLogFilePath() {

return StringUtils.join(pathElements, "/") + mExtension;
}

private String getLogFilePath(String pattern) {

List<String> pathElements = new ArrayList<String>();
pathElements.add(mPrefix);
pathElements.add(StrSubstitutor.replace(pattern, getValueMap(), "{", "}"));
System.out.println("Path:" + StringUtils.join(pathElements, "/") + mExtension);
return StringUtils.join(pathElements, "/") + mExtension;
}

private Map<String, String> getValueMap() {

Map<String, String> valueMap = new HashMap<String, String>();
valueMap.put("randomHex", getRandomHex());
valueMap.put("partition", mPartitions[0]);
valueMap.put("topic", mTopic);
valueMap.put("generation", mGeneration + "");
valueMap.put("kafkaPartition", mKafkaPartitions[0] + "");
valueMap.put("fmOffset", String.format("%020d", mOffsets[0]));
valueMap.put("currentTimestamp", System.currentTimeMillis() + "");
valueMap.put("currentTime", timeFormat.format(new Date()));
valueMap.put("currentDate", dateFormat.format(new Date()));
return valueMap;
}

public static String getRandomHex() {

Random random = new Random();
return StringUtils.substring(Integer.toHexString(random.nextInt()), 0, 4);
}

public String getLogFileCrcPath() {
String basename = "." + getLogFileBasename() + ".crc";
Expand Down
27 changes: 27 additions & 0 deletions src/main/java/com/pinterest/secor/common/SecorConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -611,4 +611,31 @@ public Map<String, String> getORCMessageSchema() {
public String getORCSchemaProviderClass(){
return getString("secor.orc.schema.provider");
}

public String getS3OutputFilePattern() {
return getString("secor.s3.output_file_pattern");
}

public String getPartitionPrefixMapping() {
String[] map = getStringArray("secor.partition.prefix.mapping");
if (null != map)
return StringUtils.join(map, ',');
return "";
}

public boolean isPartitionPrefixEnabled() {
return getBoolean("secor.partition.prefix.enable", false);
}

public String getPartitionPrefixIdentifier() {
return getString("secor.partition.prefix.identifier", "");
}

public String getPartitionOutputDtFormat() {
return getString("secor.partition.output_dt_format");
}

public String getMaxFileAgePolicy() {
return getString("secor.max.file.age.policy");
}
}
Loading