Skip to content

Commit

Permalink
Merge pull request #37 from JackBister/wip-zap-json
Browse files Browse the repository at this point in the history
Switch logging system to Zap, add support for parsing JSON logs
  • Loading branch information
JackBister authored Feb 5, 2023
2 parents a20f83e + ccb2472 commit f5ab933
Show file tree
Hide file tree
Showing 43 changed files with 1,205 additions and 422 deletions.
24 changes: 11 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,14 @@ Enables "static config" mode. In this mode, you cannot modify the configuration
`-help`
Print information about command line options and quit.

`-json` Parse the given files as JSON instead of using Regex to parse. The fieldexctractor flag will be ignored. Disabled by default.

`-recipient <address>` Sets Logsuck to run in recipient mode and receives events on the given address. By default, this is disabled.

`-timefield <string>` The name of the field which will contain the timestamp of the event. Default '\_time'.

`-timelayout <string>`
The layout of the timestamp which will be extracted in the \_time field. For more information on how to write a timelayout and examples, see https://golang.org/pkg/time/#Parse and https://golang.org/pkg/time/#pkg-constants. (default "2006/01/02 15:04:05")
The layout of the timestamp which will be extracted in the \_time field. For more information on how to write a timelayout and examples, see https://golang.org/pkg/time/#Parse and https://golang.org/pkg/time/#pkg-constants. There are also the special timelayouts "UNIX", "UNIX_MILLIS", and "UNIX_DECIMAL_NANOS". "UNIX" expects the \_time field to contain the number of seconds since the Unix epoch, "UNIX_MILLIS" expects it to contain the number of milliseconds since the Unix epoch, and UNIX_DECIMAL_NANOS expects it to contain a string of the form `<UNIX>.<NANOS>` where `<UNIX>` is the number of seconds since the Unix epoch and `<NANOS>` is the number of elapsed nanoseconds in that second. (default "2006/01/02 15:04:05")

`-version`
Print version info and quit.
Expand Down Expand Up @@ -249,24 +253,18 @@ With those two steps done, you don't need to restart Logsuck to see the changes

## Upcoming features

Logsuck is still heavily in development, so there are many features still being worked on as we race towards version 1.0.

### Before version 1.0
Logsuck is still heavily in development, so there are many features still being worked on.

- [x] Glob patterns for finding log files
- [x] Retention setting to delete old events after a certain period of time
- [x] "Show source" / "Show context" button to view events from the same source that are close in time to the selected event
- [x] Ability to search via time spans that are not relative to the current time, such as "All events between 2020-01-01 and 2020-01-05"
- [x] Ad hoc field extraction using pipes in the search command (equivalent to Splunk's "| rex")
- [ ] E-mail alerts

### After version 1.0

- Authentication and authorization
- Creating tables and charts from searches
- Dashboards
- Plugins
- Advanced search commands like dedup, eval, transaction, etc.
- Support for an external database other than logsuck.db
- [ ] Authentication and authorization
- [ ] Plugins
- [ ] Creating tables and charts from searches
- [ ] Dashboards
- [ ] Advanced search commands like dedup, eval, transaction, etc.
- [Your favorite feature](https://github.com/JackBister/logsuck/issues/new)
- ... And many more great things!
134 changes: 89 additions & 45 deletions cmd/logsuck/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/jackbister/logsuck/internal/recipient"
"github.com/jackbister/logsuck/internal/tasks"
"github.com/jackbister/logsuck/internal/web"
"go.uber.org/zap"

_ "github.com/mattn/go-sqlite3"
)
Expand Down Expand Up @@ -89,8 +90,11 @@ var forceStaticConfigFlag bool
var forwarderFlag string
var fieldExtractorFlags flagStringArray
var hostNameFlag string
var jsonParserFlag bool
var logTypeFlag string
var printVersion bool
var recipientFlag string
var timeFieldFlag string
var timeLayoutFlag string
var webAddrFlag string

Expand All @@ -109,9 +113,12 @@ func main() {
flag.BoolVar(&forceStaticConfigFlag, "forceStaticConfig", false, "If enabled, the JSON configuration file will be used instead of the configuration saved in the database. This means that you cannot alter configuration at runtime and must instead update the JSON file and restart logsuck. Has no effect in forwarder mode. Default false.")
flag.StringVar(&forwarderFlag, "forwarder", "", "Enables forwarder mode and sets the address to forward events to. Forwarding is off by default.")
flag.StringVar(&hostNameFlag, "hostname", "", "The name of the host running this instance of logsuck. By default, logsuck will attempt to retrieve the hostname from the operating system.")
flag.StringVar(&timeLayoutFlag, "timelayout", "2006/01/02 15:04:05", "The layout of the timestamp which will be extracted in the _time field. For more information on how to write a timelayout and examples, see https://golang.org/pkg/time/#Parse and https://golang.org/pkg/time/#pkg-constants.")
flag.BoolVar(&jsonParserFlag, "json", false, "Parse the given files as JSON instead of using Regex to parse. The fieldexctractor flag will be ignored. Disabled by default.")
flag.StringVar(&logTypeFlag, "logType", "production", "The type of logger to use. Set it to 'development' to get human readable logging instead of JSON logging")
flag.BoolVar(&printVersion, "version", false, "Print version info and quit.")
flag.StringVar(&recipientFlag, "recipient", "", "Enables recipient mode and sets the port to expose the recipient on. Recipient mode is off by default.")
flag.StringVar(&timeFieldFlag, "timefield", "_time", "The name of the field which will contain the timestamp of the event. Default '_time'.")
flag.StringVar(&timeLayoutFlag, "timelayout", "2006/01/02 15:04:05", "The layout of the timestamp which will be extracted in the time field. For more information on how to write a timelayout and examples, see https://golang.org/pkg/time/#Parse and https://golang.org/pkg/time/#pkg-constants. There are also the special timelayouts \"UNIX\", \"UNIX_MILLIS\", and \"UNIX_DECIMAL_NANOS\". \"UNIX\" expects the _time field to contain the number of seconds since the Unix epoch, \"UNIX_MILLIS\" expects it to contain the number of milliseconds since the Unix epoch, and UNIX_DECIMAL_NANOS expects it to contain a string of the form \"<UNIX>.<NANOS>\" where \"<UNIX>\" is the number of seconds since the Unix epoch and \"<NANOS>\" is the number of elapsed nanoseconds in that second.")
flag.StringVar(&webAddrFlag, "webaddr", ":8080", "The address on which the search GUI will be exposed.")
flag.Parse()
if len(fieldExtractorFlags) == 0 {
Expand All @@ -127,27 +134,43 @@ func main() {
return
}

var err error
var logger *zap.Logger
if logTypeFlag == "development" {
logger, err = zap.NewDevelopment()

} else {
logger, err = zap.NewProduction()
}
if err != nil {
log.Fatalf("failed to create Zap logger: %v", err)
return
}

cfgFile, err := os.Open(cfgFileFlag)
if err == nil {
var jsonCfg config.JsonConfig
err = json.NewDecoder(cfgFile).Decode(&jsonCfg)
if err != nil {
log.Fatalf("error decoding json from file '%v': %v\n", cfgFileFlag, err)
logger.Fatal("error decoding json from config file", zap.String("fileName", cfgFileFlag), zap.Error(err))
return
}
newCfg, err := config.FromJSON(jsonCfg)
newCfg, err := config.FromJSON(jsonCfg, logger.Named("configFromJSON"))
if err != nil {
log.Fatalf("error parsing configuration from file '%v': %v\n", cfgFileFlag, err)
logger.Fatal("error parsing configuration from config file", zap.String("fileName", cfgFileFlag), zap.Error(err))
return
}
staticConfig = *newCfg
log.Printf("Using configuration from file '%v': %v\n", cfgFileFlag, staticConfig)
logger.Info("using configuration from file", zap.String("fileName", cfgFileFlag), zap.Any("staticConfig", staticConfig))
} else {
log.Printf("Could not open config file '%v', will use command line configuration\n", cfgFileFlag)
logger.Warn("Could not open config file, will use command line configuration", zap.String("fileName", cfgFileFlag))
if hostNameFlag != "" {
staticConfig.HostName = hostNameFlag
} else {
hostName, err := os.Hostname()
if err != nil {
log.Fatalf("error getting hostname: %v\n", err)
logger.Fatal("error getting hostname", zap.Error(err))
return
}
staticConfig.HostName = hostName
}
Expand All @@ -168,10 +191,27 @@ func main() {
staticConfig.Recipient.Address = recipientFlag
}

fieldExtractors := make([]*regexp.Regexp, len(fieldExtractorFlags))
if len(fieldExtractorFlags) > 0 {
for i, fe := range fieldExtractorFlags {
fieldExtractors[i] = regexp.MustCompile(fe)
var jsonParserConfig *parser.JsonParserConfig
var regexParserConfig *parser.RegexParserConfig
var parserType config.ParserType
if jsonParserFlag {
parserType = config.ParserTypeJSON
jsonParserConfig = &parser.JsonParserConfig{
EventDelimiter: regexp.MustCompile(eventDelimiterFlag),
TimeField: timeFieldFlag,
}
} else {
parserType = config.ParserTypeRegex
fieldExtractors := make([]*regexp.Regexp, len(fieldExtractorFlags))
if len(fieldExtractorFlags) > 0 {
for i, fe := range fieldExtractorFlags {
fieldExtractors[i] = regexp.MustCompile(fe)
}
}
regexParserConfig = &parser.RegexParserConfig{
EventDelimiter: regexp.MustCompile(eventDelimiterFlag),
FieldExtractors: fieldExtractors,
TimeField: timeFieldFlag,
}
}

Expand All @@ -180,11 +220,9 @@ func main() {
Name: "DEFAULT",
TimeLayout: timeLayoutFlag,
ReadInterval: 1 * time.Second,
ParserType: config.ParserTypeRegex,
Regex: &parser.RegexParserConfig{
EventDelimiter: regexp.MustCompile(eventDelimiterFlag),
FieldExtractors: fieldExtractors,
},
ParserType: parserType,
JSON: jsonParserConfig,
Regex: regexParserConfig,
},
}

Expand Down Expand Up @@ -214,8 +252,8 @@ func main() {
var publisher events.EventPublisher
var repo events.Repository
if staticConfig.Forwarder.Enabled {
publisher = forwarder.ForwardingEventPublisher(&staticConfig)
configSource = forwarder.NewRemoteConfigSource(&staticConfig)
publisher = forwarder.ForwardingEventPublisher(&staticConfig, logger.Named("ForwardingEventPublisher"))
configSource = forwarder.NewRemoteConfigSource(&staticConfig, logger.Named("RemoteConfigSource"))
} else {
additionalSqliteParameters := "?_journal_mode=WAL"
if staticConfig.SQLite.DatabaseFile == ":memory:" {
Expand All @@ -225,23 +263,27 @@ func main() {
}
db, err := sql.Open("sqlite3", "file:"+staticConfig.SQLite.DatabaseFile+additionalSqliteParameters)
if err != nil {
log.Fatalln(err.Error())
logger.Fatal("error when creating sqlite database", zap.Error(err))
return
}
configRepo, err = config.NewSqliteConfigRepository(&staticConfig, db, !forceStaticConfigFlag && !staticConfig.ForceStaticConfig)
configRepo, err = config.NewSqliteConfigRepository(&staticConfig, db, !forceStaticConfigFlag && !staticConfig.ForceStaticConfig, logger.Named("SqliteConfigRepository"))
if err != nil {
log.Fatalln(err.Error())
logger.Fatal("error when creating sqlite config repository", zap.Error(err))
return
}
repo, err = events.SqliteRepository(db, staticConfig.SQLite)
repo, err = events.SqliteRepository(db, staticConfig.SQLite, logger.Named("SqliteEventsRepository"))
if err != nil {
log.Fatalln(err.Error())
logger.Fatal("error when creating sqlite event repository", zap.Error(err))
return
}
jobRepo, err = jobs.SqliteRepository(db)
if err != nil {
log.Fatalln(err.Error())
logger.Fatal("error when creating sqlite job repository", zap.Error(err))
return
}
publisher = events.BatchedRepositoryPublisher(&staticConfig, repo)
publisher = events.BatchedRepositoryPublisher(&staticConfig, repo, logger.Named("BatchedRepositoryPublisher"))
if forceStaticConfigFlag || staticConfig.ForceStaticConfig {
log.Println("Static configuration is forced. Configuration will not be saved to database and will only be read from the JSON configuration file. Remove the forceStaticConfig flag from the command line or configuration file in order to use dynamic configuration.")
logger.Info("Static configuration is forced. Configuration will not be saved to database and will only be read from the JSON configuration file. Remove the forceStaticConfig flag from the command line or configuration file in order to use dynamic configuration.")
configSource = &config.StaticConfigSource{
Config: staticConfig,
}
Expand All @@ -251,56 +293,58 @@ func main() {
}
dynamicConfig, err := configSource.Get()
if err != nil {
log.Fatalf("failed to get dynamic configuration during init: %v\n", err)
logger.Fatal("failed to get dynamic configuration during init", zap.Error(err))
return
}

jobEngine = jobs.NewEngine(configSource, repo, jobRepo)
indexedFiles, err := indexedfiles.ReadFileConfig(&dynamicConfig.Cfg)
jobEngine = jobs.NewEngine(configSource, repo, jobRepo, logger.Named("JobEngine"))
indexedFiles, err := indexedfiles.ReadFileConfig(&dynamicConfig.Cfg, logger)
if err != nil {
log.Fatalln("got error when reading dynamic file config", err)
logger.Fatal("got error when reading dynamic file config", zap.Error(err))
return
}

watchers := map[string]*files.GlobWatcher{}
if staticConfig.Recipient.Enabled {
log.Println("recipient is enabled, will not read any files on this host.")
logger.Info("recipient is enabled, will not read any files on this host.")
} else {
reloadFileWatchers(&watchers, indexedFiles, &dynamicConfig.Cfg, publisher, ctx)
reloadFileWatchers(logger.Named("reloadFileWatchers"), &watchers, indexedFiles, &dynamicConfig.Cfg, publisher, ctx)
}

if staticConfig.Recipient.Enabled {
go func() {
log.Fatal(recipient.NewRecipientEndpoint(configSource, repo).Serve())
logger.Fatal("got error from recipient Serve method", zap.Error(recipient.NewRecipientEndpoint(configSource, repo, logger.Named("RecipientEndpoint")).Serve()))
}()
}

if staticConfig.Web.Enabled {
go func() {
log.Fatal(web.NewWeb(configSource, configRepo, repo, jobRepo, jobEngine).Serve())
logger.Fatal("got error from web Serve method", zap.Error(web.NewWeb(configSource, configRepo, repo, jobRepo, jobEngine, logger.Named("Web")).Serve()))
}()
}

tm := tasks.NewTaskManager(
&dynamicConfig.Cfg.Tasks, tasks.TaskContext{
EventsRepo: repo,
},
ctx)
ctx,
logger.Named("TaskManager"))
tm.UpdateConfig(dynamicConfig.Cfg)

go func() {
for {
<-configSource.Changes()
newCfg, err := configSource.Get()
if err != nil {
log.Printf("got error when reading updated dynamic file config. file config will not be updated: %v\n", err)
logger.Warn("got error when reading updated dynamic file config. file and task config will not be updated", zap.Error(err))
continue
}
if !staticConfig.Recipient.Enabled {
newIndexedFiles, err := indexedfiles.ReadFileConfig(&newCfg.Cfg)
newIndexedFiles, err := indexedfiles.ReadFileConfig(&newCfg.Cfg, logger)
if err != nil {
log.Printf("got error when reading updated dynamic file config. file config will not be updated: %v\n", err)
logger.Warn("got error when reading updated dynamic file config. file config will not be updated", zap.Error(err))
} else {
reloadFileWatchers(&watchers, newIndexedFiles, &newCfg.Cfg, publisher, ctx)
reloadFileWatchers(logger.Named("reloadFileWatchers"), &watchers, newIndexedFiles, &newCfg.Cfg, publisher, ctx)
}
}
tm.UpdateConfig(newCfg.Cfg)
Expand All @@ -310,8 +354,8 @@ func main() {
select {}
}

func reloadFileWatchers(watchers *map[string]*files.GlobWatcher, indexedFiles []indexedfiles.IndexedFileConfig, cfg *config.Config, publisher events.EventPublisher, ctx context.Context) {
log.Printf("reloading file watchers. newIndexedFilesLen=%v, oldIndexedFilesLen=%v\n", len(indexedFiles), len(*watchers))
func reloadFileWatchers(logger *zap.Logger, watchers *map[string]*files.GlobWatcher, indexedFiles []indexedfiles.IndexedFileConfig, cfg *config.Config, publisher events.EventPublisher, ctx context.Context) {
logger.Info("reloading file watchers", zap.Int("newIndexedFilesLen", len(indexedFiles)), zap.Int("oldIndexedFilesLen", len(*watchers)))
indexedFilesMap := map[string]indexedfiles.IndexedFileConfig{}
for _, cfg := range indexedFiles {
indexedFilesMap[cfg.Filename] = cfg
Expand All @@ -321,7 +365,7 @@ func reloadFileWatchers(watchers *map[string]*files.GlobWatcher, indexedFiles []
for k, v := range *watchers {
newCfg, ok := indexedFilesMap[k]
if !ok {
log.Printf("filename=%s not found in new indexed files config. will cancel and delete watcher.\n", k)
logger.Info("filename not found in new indexed files config. will cancel and delete watcher", zap.String("fileName", k))
v.Cancel()
watchersToDelete = append(watchersToDelete, k)
continue
Expand All @@ -340,10 +384,10 @@ func reloadFileWatchers(watchers *map[string]*files.GlobWatcher, indexedFiles []
if ok {
continue
}
log.Printf("creating new watcher for filename=%s\n", k)
w, err := files.NewGlobWatcher(v, v.Filename, staticConfig.HostName, publisher, ctx)
logger.Info("creating new watcher", zap.String("fileName", k))
w, err := files.NewGlobWatcher(v, v.Filename, staticConfig.HostName, publisher, ctx, logger.Named("GlobWatcher"))
if err != nil {
log.Printf("got error when creating GlobWatcher for filename=%s: %v", v.Filename, err)
logger.Warn("got error when creating GlobWatcher", zap.String("fileName", v.Filename), zap.Error(err))
continue
}
(*watchers)[k] = w
Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ require (
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/stretchr/testify v1.8.1 // indirect
github.com/ugorji/go/codec v1.2.8 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
go.uber.org/zap v1.24.0 // indirect
golang.org/x/crypto v0.5.0 // indirect
golang.org/x/sys v0.4.0 // indirect
golang.org/x/text v0.6.0 // indirect
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,12 @@ github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVM
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
github.com/ugorji/go/codec v1.2.8 h1:sgBJS6COt0b/P40VouWKdseidkDgHxYGm0SAglUHfP0=
github.com/ugorji/go/codec v1.2.8/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg=
go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ=
go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI=
go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTVQ=
go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60=
go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
Expand Down
Loading

0 comments on commit f5ab933

Please sign in to comment.