Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support to timestamp stage to parse Unix seconds, milliseconds, and nanosecond timestamps #732

Merged
merged 1 commit into from
Jul 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion docs/logentry/processing-log-lines.md
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,18 @@ RFC3339 = "2006-01-02T15:04:05Z07:00"
RFC3339Nano = "2006-01-02T15:04:05.999999999Z07:00"
```

Additionally support for common Unix timestamps is supported:

```go
Unix = 1562708916
UnixMs = 1562708916414
UnixNs = 1562708916000000123
```

Finally any custom format can be supplied, and will be passed directly in as the layout parameter in time.Parse()

__Read the [time.parse](https://golang.org/pkg/time/#Parse) docs closely if passing a custom format and make sure your custom format uses the special date they specify: `Mon Jan 2 15:04:05 -0700 MST 2006`__

##### Example:

```yaml
Expand Down Expand Up @@ -453,4 +465,4 @@ Gauge examples will be very similar to Counter examples with additional `action`
buckets: [0.001,0.0025,0.005,0.010,0.025,0.050]
```

This would create a Histogram which looks for _response_time_ in the `extracted` data and applies the value to the histogram.
This would create a Histogram which looks for _response_time_ in the `extracted` data and applies the value to the histogram.
24 changes: 16 additions & 8 deletions pkg/logentry/stages/timestamp.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ const (
ErrEmptyTimestampStageConfig = "timestamp stage config cannot be empty"
ErrTimestampSourceRequired = "timestamp source value is required if timestamp is specified"
ErrTimestampFormatRequired = "timestamp format is required"

Unix = "Unix"
UnixMs = "UnixMs"
UnixNs = "UnixNs"
)

// TimestampConfig configures timestamp extraction
Expand All @@ -23,16 +27,19 @@ type TimestampConfig struct {
Format string `mapstructure:"format"`
}

// parser can convert the time string into a time.Time value
type parser func(string) (time.Time, error)

// validateTimestampConfig validates a timestampStage configuration
func validateTimestampConfig(cfg *TimestampConfig) (string, error) {
func validateTimestampConfig(cfg *TimestampConfig) (parser, error) {
if cfg == nil {
return "", errors.New(ErrEmptyTimestampStageConfig)
return nil, errors.New(ErrEmptyTimestampStageConfig)
}
if cfg.Source == "" {
return "", errors.New(ErrTimestampSourceRequired)
return nil, errors.New(ErrTimestampSourceRequired)
}
if cfg.Format == "" {
return "", errors.New(ErrTimestampFormatRequired)
return nil, errors.New(ErrTimestampFormatRequired)
}
return convertDateLayout(cfg.Format), nil

Expand All @@ -45,22 +52,22 @@ func newTimestampStage(logger log.Logger, config interface{}) (*timestampStage,
if err != nil {
return nil, err
}
format, err := validateTimestampConfig(cfg)
parser, err := validateTimestampConfig(cfg)
if err != nil {
return nil, err
}
return &timestampStage{
cfgs: cfg,
logger: logger,
format: format,
parser: parser,
}, nil
}

// timestampStage will set the timestamp using extracted data
type timestampStage struct {
cfgs *TimestampConfig
logger log.Logger
format string
parser parser
}

// Process implements Stage
Expand All @@ -73,7 +80,8 @@ func (ts *timestampStage) Process(labels model.LabelSet, extracted map[string]in
if err != nil {
level.Debug(ts.logger).Log("msg", "failed to convert extracted time to string", "err", err, "type", reflect.TypeOf(v).String())
}
parsedTs, err := time.Parse(ts.format, s)

parsedTs, err := ts.parser(s)
if err != nil {
level.Debug(ts.logger).Log("msg", "failed to parse time", "err", err, "format", ts.cfgs.Format, "value", s)
} else {
Expand Down
74 changes: 62 additions & 12 deletions pkg/logentry/stages/timestamp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,10 @@ func TestTimestampPipeline(t *testing.T) {

func TestTimestampValidation(t *testing.T) {
tests := map[string]struct {
config *TimestampConfig
err error
expectedFormat string
config *TimestampConfig
err error
testString string
expectedTime time.Time
}{
"missing config": {
config: nil,
Expand All @@ -68,23 +69,34 @@ func TestTimestampValidation(t *testing.T) {
Source: "source1",
Format: time.RFC3339,
},
err: nil,
expectedFormat: time.RFC3339,
err: nil,
testString: "2012-11-01T22:08:41-04:00",
expectedTime: time.Date(2012, 11, 01, 22, 8, 41, 0, time.FixedZone("", -4*60*60)),
},
"custom format": {
config: &TimestampConfig{
Source: "source1",
Format: "2006-01-23",
Format: "2006-01-02",
},
err: nil,
expectedFormat: "2006-01-23",
err: nil,
testString: "2009-01-01",
expectedTime: time.Date(2009, 01, 01, 00, 00, 00, 0, time.UTC),
},
"unix_ms": {
config: &TimestampConfig{
Source: "source1",
Format: "UnixMs",
},
err: nil,
testString: "1562708916919",
expectedTime: time.Date(2019, 7, 9, 21, 48, 36, 919*1000000, time.UTC),
},
}
for name, test := range tests {
test := test
t.Run(name, func(t *testing.T) {
t.Parallel()
format, err := validateTimestampConfig(test.config)
parser, err := validateTimestampConfig(test.config)
if (err != nil) != (test.err != nil) {
t.Errorf("validateOutputConfig() expected error = %v, actual error = %v", test.err, err)
return
Expand All @@ -93,8 +105,13 @@ func TestTimestampValidation(t *testing.T) {
t.Errorf("validateOutputConfig() expected error = %v, actual error = %v", test.err, err)
return
}
if test.expectedFormat != "" {
assert.Equal(t, test.expectedFormat, format)
if test.testString != "" {
ts, err := parser(test.testString)
if err != nil {
t.Errorf("validateOutputConfig() unexpected error parsing test time: %v", err)
return
}
assert.Equal(t, test.expectedTime.UnixNano(), ts.UnixNano())
}
})
}
Expand All @@ -117,6 +134,39 @@ func TestTimestampStage_Process(t *testing.T) {
},
time.Date(2106, 01, 02, 23, 04, 05, 0, time.FixedZone("", -4*60*60)),
},
"unix success": {
TimestampConfig{
Source: "ts",
Format: "Unix",
},
map[string]interface{}{
"somethigelse": "notimportant",
"ts": "1562708916",
},
time.Date(2019, 7, 9, 21, 48, 36, 0, time.UTC),
},
"unix millisecond success": {
TimestampConfig{
Source: "ts",
Format: "UnixMs",
},
map[string]interface{}{
"somethigelse": "notimportant",
"ts": "1562708916414",
},
time.Date(2019, 7, 9, 21, 48, 36, 414*1000000, time.UTC),
},
"unix nano success": {
TimestampConfig{
Source: "ts",
Format: "UnixNs",
},
map[string]interface{}{
"somethigelse": "notimportant",
"ts": "1562708916000000123",
},
time.Date(2019, 7, 9, 21, 48, 36, 123, time.UTC),
},
}
for name, test := range tests {
test := test
Expand All @@ -129,7 +179,7 @@ func TestTimestampStage_Process(t *testing.T) {
ts := time.Now()
lbls := model.LabelSet{}
st.Process(lbls, test.extracted, &ts, nil)
assert.Equal(t, test.expected, ts)
assert.Equal(t, test.expected.UnixNano(), ts.UnixNano())

})
}
Expand Down
70 changes: 58 additions & 12 deletions pkg/logentry/stages/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,30 +7,76 @@ import (
)

// convertDateLayout converts pre-defined date format layout into date format
func convertDateLayout(predef string) string {
func convertDateLayout(predef string) parser {
switch predef {
case "ANSIC":
return time.ANSIC
return func(t string) (time.Time, error) {
return time.Parse(time.ANSIC, t)
}
case "UnixDate":
return time.UnixDate
return func(t string) (time.Time, error) {
return time.Parse(time.UnixDate, t)
}
case "RubyDate":
return time.RubyDate
return func(t string) (time.Time, error) {
return time.Parse(time.RubyDate, t)
}
case "RFC822":
return time.RFC822
return func(t string) (time.Time, error) {
return time.Parse(time.RFC822, t)
}
case "RFC822Z":
return time.RFC822Z
return func(t string) (time.Time, error) {
return time.Parse(time.RFC822Z, t)
}
case "RFC850":
return time.RFC850
return func(t string) (time.Time, error) {
return time.Parse(time.RFC850, t)
}
case "RFC1123":
return time.RFC1123
return func(t string) (time.Time, error) {
return time.Parse(time.RFC1123, t)
}
case "RFC1123Z":
return time.RFC1123Z
return func(t string) (time.Time, error) {
return time.Parse(time.RFC1123Z, t)
}
case "RFC3339":
return time.RFC3339
return func(t string) (time.Time, error) {
return time.Parse(time.RFC3339, t)
}
case "RFC3339Nano":
return time.RFC3339Nano
return func(t string) (time.Time, error) {
return time.Parse(time.RFC3339Nano, t)
}
case "Unix":
return func(t string) (time.Time, error) {
i, err := strconv.ParseInt(t, 10, 64)
if err != nil {
return time.Time{}, err
}
return time.Unix(i, 0), nil
}
case "UnixMs":
return func(t string) (time.Time, error) {
i, err := strconv.ParseInt(t, 10, 64)
if err != nil {
return time.Time{}, err
}
return time.Unix(0, i*int64(time.Millisecond)), nil
}
case "UnixNs":
return func(t string) (time.Time, error) {
i, err := strconv.ParseInt(t, 10, 64)
if err != nil {
return time.Time{}, err
}
return time.Unix(0, i), nil
}
default:
return predef
return func(t string) (time.Time, error) {
return time.Parse(predef, t)
}
}
}

Expand Down