diff --git a/dev-tools/packer/docker/xgo-image/base/build.sh b/dev-tools/packer/docker/xgo-image/base/build.sh index 7b6e9408c18c..aea775cdbc5a 100644 --- a/dev-tools/packer/docker/xgo-image/base/build.sh +++ b/dev-tools/packer/docker/xgo-image/base/build.sh @@ -20,6 +20,7 @@ # Download the canonical import path (may fail, don't allow failures beyond) SRC_FOLDER=$SOURCE +AWS_FOLDER=$GOPATH/src/github.com/aws BEAT_PATH=$1 DST_FOLDER=`dirname $GOPATH/src/$BEAT_PATH` @@ -55,6 +56,12 @@ else git clone https://${GIT_REPO}.git fi +if [ "$AWSSDK" != '' ]; then + mkdir -p ${AWS_FOLDER} + echo "Copying aws-sdk folder ${AWSSDK} to folder ${AWS_FOLDER}" + rsync --exclude ".git" -a ${AWSSDK}/ ${AWS_FOLDER} +fi + set -e cd $WORKING_DIRECTORY diff --git a/libbeat/outputs/s3/config.go b/libbeat/outputs/s3/config.go new file mode 100644 index 000000000000..6466f9814b16 --- /dev/null +++ b/libbeat/outputs/s3/config.go @@ -0,0 +1,31 @@ +package s3 + +import ( + "fmt" +) + +type config struct { + Path string `config:"path"` + Filename string `config:"filename"` + UploadEveryKb int `config:"upload_every_kb" validate:"min=1"` + NumberOfFiles int `config:"number_of_files"` + Region string `config:"region"` + Bucket string `config:"bucket"` +} + +var ( + defaultConfig = config{ + NumberOfFiles: 2, + UploadEveryKb: 10 * 1024, + Region: "us-east-1", + } +) + +func (c *config) Validate() error { + if c.NumberOfFiles < 2 || c.NumberOfFiles > managerMaxFiles { + return fmt.Errorf("S3 number_of_files to keep should be between 2 and %v", + managerMaxFiles) + } + + return nil +} diff --git a/libbeat/outputs/s3/file_manager.go b/libbeat/outputs/s3/file_manager.go new file mode 100644 index 000000000000..731116819808 --- /dev/null +++ b/libbeat/outputs/s3/file_manager.go @@ -0,0 +1,290 @@ +package s3 + +import ( + "compress/gzip" + "fmt" + "github.com/aws/aws-sdk-go/aws" + //"github.com/aws/aws-sdk-go/aws/awsutil" + "github.com/elastic/beats/libbeat/logp" + "io" + "net" + "os" + "path/filepath" + "strconv" + "strings" + "time" + //"github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/session" + //"github.com/aws/aws-sdk-go/service/s3" + "github.com/aws/aws-sdk-go/service/s3/s3manager" +) + +const managerMaxFiles = 1024 +const defaultKeepFiles = 7 +const defaultUploadEveryBytes = 10 * 1024 * 1024 + +type fileManager struct { + Path string + Name string + Region string + Bucket string + UploadEveryBytes *uint64 + KeepFiles *int + + current *os.File + current_size uint64 + last string +} + +func (manager *fileManager) createDirectory() error { + fileinfo, err := os.Stat(manager.Path) + if err == nil { + if !fileinfo.IsDir() { + return fmt.Errorf("S3 %s exists but it's not a directory", manager.Path) + } + } + + if os.IsNotExist(err) { + err = os.MkdirAll(manager.Path, 0755) + if err != nil { + return err + } + } + + return nil +} + +func (manager *fileManager) checkIfConfigSane() error { + if len(manager.Name) == 0 { + return fmt.Errorf("S3 logging requires a name for the file names") + } + if len(manager.Bucket) == 0 { + return fmt.Errorf("S3 logging requires a bucket name") + } + if manager.KeepFiles == nil { + manager.KeepFiles = new(int) + *manager.KeepFiles = defaultKeepFiles + } + if manager.UploadEveryBytes == nil { + manager.UploadEveryBytes = new(uint64) + *manager.UploadEveryBytes = defaultUploadEveryBytes + } + + if *manager.KeepFiles < 2 || *manager.KeepFiles >= managerMaxFiles { + return fmt.Errorf("S3 number of files to keep should be between 2 and %d", managerMaxFiles-1) + } + return nil +} + +func (manager *fileManager) writeLine(line []byte) error { + if manager.shouldRotate() { + err := manager.rotate() + if err != nil { + return err + } + } + + line = append(line, '\n') + _, err := manager.current.Write(line) + if err != nil { + return err + } + manager.current_size += uint64(len(line)) + + return nil +} + +func (manager *fileManager) shouldRotate() bool { + if manager.current == nil { + return true + } + + if manager.current_size >= *manager.UploadEveryBytes { + return true + } + + return false +} + +func (manager *fileManager) localIP() string { + addrs, err := net.InterfaceAddrs() + if err != nil { + return "" + } + for _, address := range addrs { + // check the address type and if it is not a loopback the display it + if ipnet, ok := address.(*net.IPNet); ok && !ipnet.IP.IsLoopback() { + if ipnet.IP.To4() != nil { + return ipnet.IP.String() + } + } + } + return "" +} + +func (manager *fileManager) s3KeyName() string { + // Discern hostname or IP address + host, err := os.Hostname() + if err != nil { + host = "" + } + + if host == "" || host == "localhost" { + host = manager.localIP() + } + + // could still be empty string so could be random fallback + if host == "" { + host = "localhost" + } + + t := time.Now().UTC() + + timeIso8601 := fmt.Sprintf("%d-%02d-%02dT%02d:%02d:%02d,%09d+00:00", + t.Year(), t.Month(), t.Day(), + t.Hour(), t.Minute(), t.Second(), + t.Nanosecond()) + + // Final format is /YYYY/MM/DD/HOST_ISO8601 + keyName := fmt.Sprintf("/%d/%02d/%02d/%s_%s", + t.Year(), t.Month(), t.Day(), + host, timeIso8601) + + return keyName +} + +func (manager *fileManager) s3Upload() error { + logp.Info("S3 upload last path set to: %v", manager.last) + + file, err := os.Open(manager.last) + if err != nil { + logp.Info("S3 err opening file: %s\n", err) + } + defer file.Close() + + // compress + reader, writer := io.Pipe() + go func() { + gw := gzip.NewWriter(writer) + io.Copy(gw, file) + file.Close() + gw.Close() + writer.Close() + }() + + // aws session + cfg := aws.NewConfig().WithRegion(manager.Region) + sess, err := session.NewSession(cfg) + if err != nil { + logp.Info("S3 failed to create session: %v", err) + return err + } + + // upload + key := manager.s3KeyName() + ".gz" + + params := &s3manager.UploadInput{ + Body: reader, + Bucket: aws.String(manager.Bucket), + Key: aws.String(key), + } + + uploader := s3manager.NewUploader(sess) + result, err := uploader.Upload(params) + if err != nil { + logp.Info("S3 upload failure: %v", err) + } + + logp.Info("S3 upload success: %v", result.Location) + + return nil +} + +//func (manager *fileManager) timeIso8601() string { +// t := time.Now().UTC() +// +// //t.Format("2006-01-02T15:04:05.999999-07:00") +// +// tf := fmt.Sprintf("%d-%02d-%02dT%02d:%02d:%02d,%09d+00:00", +// t.Year(), t.Month(), t.Day(), +// t.Hour(), t.Minute(), t.Second(), +// t.Nanosecond()) +// //fmt.Printf("%s", tf) +// +// return tf +//} + +func (manager *fileManager) filePath(file_no int) string { + if file_no == 0 { + return filepath.Join(manager.Path, manager.Name) + } + filename := strings.Join([]string{manager.Name, strconv.Itoa(file_no)}, ".") + return filepath.Join(manager.Path, filename) +} + +func (manager *fileManager) fileExists(file_no int) bool { + file_path := manager.filePath(file_no) + _, err := os.Stat(file_path) + if os.IsNotExist(err) { + return false + } + return true +} + +func (manager *fileManager) rotate() error { + + if manager.current != nil { + if err := manager.current.Close(); err != nil { + return err + } + } + + // delete any extra files, normally we shouldn't have any + for file_no := *manager.KeepFiles; file_no < managerMaxFiles; file_no++ { + if manager.fileExists(file_no) { + perr := os.Remove(manager.filePath(file_no)) + if perr != nil { + return perr + } + } + } + + // shift all files from last to first + for fileNo := *manager.KeepFiles - 1; fileNo >= 0; fileNo-- { + if !manager.fileExists(fileNo) { + // file doesn't exist, don't rotate + continue + } + file_path := manager.filePath(fileNo) + + if manager.fileExists(fileNo + 1) { + // next file exists, something is strange + return fmt.Errorf("S3 file %s exists when rotating would overwrite it", manager.filePath(fileNo+1)) + } + + err := os.Rename(file_path, manager.filePath(fileNo+1)) + if err != nil { + return err + } + } + + // create the new file + file_path := manager.filePath(0) + current, err := os.Create(file_path) + if err != nil { + return err + } + manager.current = current + manager.current_size = 0 + + // delete the extra file, ignore errors here + file_path = manager.filePath(*manager.KeepFiles) + os.Remove(file_path) + + // upload the dot-1 file + file_path = manager.filePath(1) + manager.last = file_path + manager.s3Upload() + + return nil +} diff --git a/libbeat/outputs/s3/s3.go b/libbeat/outputs/s3/s3.go new file mode 100644 index 000000000000..01c83d63356d --- /dev/null +++ b/libbeat/outputs/s3/s3.go @@ -0,0 +1,102 @@ +package s3 + +import ( + "encoding/json" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/op" + "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/libbeat/outputs" +) + +func init() { + outputs.RegisterOutputPlugin("s3", New) +} + +type s3Output struct { + beatName string + manager fileManager +} + +// New instantiates a new file output instance. +func New(beatName string, cfg *common.Config, _ int) (outputs.Outputer, error) { + config := defaultConfig + if err := cfg.Unpack(&config); err != nil { + return nil, err + } + + // disable bulk support in publisher pipeline + cfg.SetInt("flush_interval", -1, -1) + cfg.SetInt("bulk_max_size", -1, -1) + + output := &s3Output{beatName: beatName} + if err := output.init(config); err != nil { + return nil, err + } + return output, nil +} + +func (out *s3Output) init(config config) error { + out.manager.Path = config.Path + out.manager.Name = config.Filename + out.manager.Region = config.Region + out.manager.Bucket = config.Bucket + if out.manager.Name == "" { + out.manager.Name = out.beatName + } + logp.Info("S3 output path set to: %v", out.manager.Path) + logp.Info("S3 output base filename set to: %v", out.manager.Name) + logp.Info("S3 output region set to: %v", out.manager.Region) + logp.Info("S3 output bucket set to: %v", out.manager.Bucket) + + uploadeverybytes := uint64(config.UploadEveryKb) * 1024 + logp.Info("S3 upload every bytes set to: %v", uploadeverybytes) + out.manager.UploadEveryBytes = &uploadeverybytes + + keepfiles := config.NumberOfFiles + logp.Info("S3 number of files set to: %v", keepfiles) + out.manager.KeepFiles = &keepfiles + + err := out.manager.createDirectory() + if err != nil { + return err + } + + err = out.manager.checkIfConfigSane() + if err != nil { + return err + } + + return nil +} + +// Implement Outputer +func (out *s3Output) Close() error { + return nil +} + +func (out *s3Output) PublishEvent( + sig op.Signaler, + opts outputs.Options, + data outputs.Data, +) error { + jsonEvent, err := json.Marshal(data.Event) + if err != nil { + // mark as success so event is not sent again. + op.SigCompleted(sig) + + logp.Err("S3 fail to json encode event(%v): %#v", err, data.Event) + return err + } + + err = out.manager.writeLine(jsonEvent) + if err != nil { + if opts.Guaranteed { + logp.Critical("S3 unable to write events to file: %s", err) + } else { + logp.Err("S3 error when writing line to file: %s", err) + } + } + op.Sig(sig, err) + return err +} diff --git a/libbeat/outputs/s3/s3_test.go b/libbeat/outputs/s3/s3_test.go new file mode 100644 index 000000000000..d1fbb2a2c8b3 --- /dev/null +++ b/libbeat/outputs/s3/s3_test.go @@ -0,0 +1,3 @@ +// +build !integration + +package s3 diff --git a/libbeat/publisher/publish.go b/libbeat/publisher/publish.go index 4f8d1b318e28..6cb8b192b0b4 100644 --- a/libbeat/publisher/publish.go +++ b/libbeat/publisher/publish.go @@ -21,6 +21,7 @@ import ( _ "github.com/elastic/beats/libbeat/outputs/kafka" _ "github.com/elastic/beats/libbeat/outputs/logstash" _ "github.com/elastic/beats/libbeat/outputs/redis" + _ "github.com/elastic/beats/libbeat/outputs/s3" ) // command line flags diff --git a/libbeat/scripts/Makefile b/libbeat/scripts/Makefile index f4c463f43426..4f35f1f2bd55 100755 --- a/libbeat/scripts/Makefile +++ b/libbeat/scripts/Makefile @@ -332,12 +332,14 @@ prepare-package: -v $(abspath ${ES_BEATS}/dev-tools/packer/xgo-scripts):/scripts \ -v $(abspath ../):/source \ -v $(BUILD_DIR):/build \ + -v $(abspath ../../../aws):/awssdk \ -e PUREGO="yes" \ -e PACK=${BEATNAME} \ -e BEFORE_BUILD=before_build.sh \ -e SOURCE=/source \ -e TARGETS=${TARGETS} \ -e BUILDID=${BUILDID} \ + -e AWSSDK=/awssdk \ tudorg/beats-builder \ ${BEAT_DIR} @@ -350,11 +352,13 @@ prepare-package-cgo: -v $(abspath ${ES_BEATS}/dev-tools/packer/xgo-scripts):/scripts \ -v $(abspath ../):/source \ -v $(BUILD_DIR):/build \ + -v $(abspath ../../../aws):/awssdk \ -e PACK=${BEATNAME} \ -e BEFORE_BUILD=before_build.sh \ -e SOURCE=/source \ -e TARGETS=${TARGETS} \ -e BUILDID=${BUILDID} \ + -e AWSSDK=/awssdk \ tudorg/beats-builder \ ${BEAT_DIR} @@ -363,11 +367,13 @@ prepare-package-cgo: -v ${BUILD_DIR}:/build \ -v $(abspath ${ES_BEATS}/dev-tools/packer/xgo-scripts):/scripts \ -v $(abspath ..):/source \ + -v $(abspath ../../../aws):/awssdk \ -e PACK=${BEATNAME} \ -e BEFORE_BUILD=before_build.sh \ -e SOURCE=/source \ -e TARGETS=${TARGETS_OLD} \ -e BUILDID=${BUILDID} \ + -e AWSSDK=/awssdk \ tudorg/beats-builder-deb6 \ ${BEAT_DIR}