Skip to content

Commit

Permalink
Check license x-pack (elastic#11296)
Browse files Browse the repository at this point in the history
* Check License for basic or better
  • Loading branch information
ph authored Mar 20, 2019
1 parent 9ff8392 commit b0a2eb4
Show file tree
Hide file tree
Showing 26 changed files with 160 additions and 26 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
info from the cloud.machine.type and cloud.availability_zone. {issue}10968[10968]
- Empty `meta.json` file will be treated as a missing meta file. {issue}8558[8558]
- Rename `migration.enabled` config to `migration.6_to_7.enabled`. {pull}11284[11284]
- Beats Xpack now checks for Basic license on connect. {pull}11296[11296]

*Auditbeat*

Expand Down
12 changes: 11 additions & 1 deletion libbeat/outputs/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,16 @@ func NewClient(
}

client.Connection.onConnectCallback = func() error {
globalCallbackRegistry.mutex.Lock()
defer globalCallbackRegistry.mutex.Unlock()

for _, callback := range globalCallbackRegistry.callbacks {
err := callback(client)
if err != nil {
return err
}
}

if onConnect != nil {
onConnect.mutex.Lock()
defer onConnect.mutex.Unlock()
Expand Down Expand Up @@ -721,7 +731,7 @@ func (conn *Connection) Ping() (string, error) {
}

debugf("Ping status code: %v", status)
logp.Info("Connected to Elasticsearch version %s", response.Version.Number)
logp.Info("Attempting to connect to Elasticsearch version %s", response.Version.Number)
return response.Version.Number, nil
}

Expand Down
34 changes: 34 additions & 0 deletions libbeat/outputs/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,31 @@ type callbacksRegistry struct {
// XXX: it would be fantastic to do this without a package global
var connectCallbackRegistry = newCallbacksRegistry()

// NOTE(ph): We need to refactor this, right now this is the only way to ensure that every calls
// to an ES cluster executes a callback.
var globalCallbackRegistry = newCallbacksRegistry()

// RegisterGlobalCallback register a global callbacks.
func RegisterGlobalCallback(callback connectCallback) (uuid.UUID, error) {
globalCallbackRegistry.mutex.Lock()
defer globalCallbackRegistry.mutex.Unlock()

// find the next unique key
var key uuid.UUID
var err error
exists := true
for exists {
key, err = uuid.NewV4()
if err != nil {
return uuid.Nil, err
}
_, exists = globalCallbackRegistry.callbacks[key]
}

globalCallbackRegistry.callbacks[key] = callback
return key, nil
}

func newCallbacksRegistry() callbacksRegistry {
return callbacksRegistry{
callbacks: make(map[uuid.UUID]connectCallback),
Expand Down Expand Up @@ -99,6 +124,15 @@ func DeregisterConnectCallback(key uuid.UUID) {
delete(connectCallbackRegistry.callbacks, key)
}

// DeregisterGlobalCallback deregisters a callback for the elasticsearch output
// specified by its key. If a callback does not exist, nothing happens.
func DeregisterGlobalCallback(key uuid.UUID) {
globalCallbackRegistry.mutex.Lock()
defer globalCallbackRegistry.mutex.Unlock()

delete(globalCallbackRegistry.callbacks, key)
}

func makeES(
im outputs.IndexManager,
beat beat.Info,
Expand Down
25 changes: 25 additions & 0 deletions libbeat/outputs/elasticsearch/elasticsearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,28 @@ func TestConnectCallbacksManagement(t *testing.T) {
t.Fatalf("third callback cannot be retrieved")
}
}

func TestGlobalConnectCallbacksManagement(t *testing.T) {
f0 := func(client *Client) error { fmt.Println("i am function #0"); return nil }
f1 := func(client *Client) error { fmt.Println("i am function #1"); return nil }
f2 := func(client *Client) error { fmt.Println("i am function #2"); return nil }

_, err := RegisterGlobalCallback(f0)
if err != nil {
t.Fatalf("error while registering callback: %v", err)
}
id1, err := RegisterGlobalCallback(f1)
if err != nil {
t.Fatalf("error while registering callback: %v", err)
}
id2, err := RegisterGlobalCallback(f2)
if err != nil {
t.Fatalf("error while registering callback: %v", err)
}

t.Logf("removing second callback")
DeregisterGlobalCallback(id1)
if _, ok := globalCallbackRegistry.callbacks[id2]; !ok {
t.Fatalf("third callback cannot be retrieved")
}
}
6 changes: 3 additions & 3 deletions x-pack/functionbeat/beater/functionbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import (
"github.com/elastic/beats/x-pack/functionbeat/config"
"github.com/elastic/beats/x-pack/functionbeat/core"
_ "github.com/elastic/beats/x-pack/functionbeat/include" // imports features
"github.com/elastic/beats/x-pack/functionbeat/licenser"
"github.com/elastic/beats/x-pack/functionbeat/provider"
"github.com/elastic/beats/x-pack/libbeat/licenser"
)

var (
Expand Down Expand Up @@ -82,7 +82,7 @@ func (bt *Functionbeat) Run(b *beat.Beat) error {
defer manager.Stop()

// Wait until we receive the initial license.
if err := licenser.WaitForLicense(bt.ctx, bt.log, manager, checkLicense); err != nil {
if err := licenser.WaitForLicense(bt.ctx, bt.log, manager, licenser.BasicAndAboveOrTrial); err != nil {
return err
}

Expand Down Expand Up @@ -156,7 +156,7 @@ func makeClientFactory(log *logp.Logger, manager *licenser.Manager, pipeline bea

// Make the client aware of the current license, the client will accept sending events to the
// pipeline until the client is closed or if the license change and is not valid.
licenseAware := core.NewLicenseAwareClient(client, checkLicense)
licenseAware := core.NewLicenseAwareClient(client, licenser.BasicAndAboveOrTrial)
if err := manager.AddWatcher(licenseAware); err != nil {
return nil, err
}
Expand Down
14 changes: 0 additions & 14 deletions x-pack/functionbeat/beater/license.go

This file was deleted.

2 changes: 1 addition & 1 deletion x-pack/functionbeat/core/license_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common/atomic"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/x-pack/functionbeat/licenser"
"github.com/elastic/beats/x-pack/libbeat/licenser"
)

var errInvalidLicense = errors.New("invalid license detected, cannot publish events")
Expand Down
2 changes: 1 addition & 1 deletion x-pack/functionbeat/core/license_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/x-pack/functionbeat/licenser"
"github.com/elastic/beats/x-pack/libbeat/licenser"
)

type dummySyncClient struct{ EventCount int }
Expand Down
5 changes: 5 additions & 0 deletions x-pack/libbeat/cmd/inject.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,17 @@ package cmd

import (
"github.com/elastic/beats/libbeat/cmd"
"github.com/elastic/beats/libbeat/logp"

// register central management
"github.com/elastic/beats/x-pack/libbeat/licenser"
_ "github.com/elastic/beats/x-pack/libbeat/management"
)

const licenseDebugK = "license"

// AddXPack extends the given root folder with XPack features
func AddXPack(root *cmd.BeatsRootCmd, name string) {
licenser.Enforce(logp.NewLogger(licenseDebugK), licenser.BasicAndAboveOrTrial)
root.AddCommand(genEnrollCmd(name, ""))
}
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ func CheckTrial(log *logp.Logger, license License) bool {
log.Error("Trial license is expired")
return false
}
log.Info("Trial license active")
return true
}
return false
Expand All @@ -27,10 +28,11 @@ func CheckTrial(log *logp.Logger, license License) bool {
// CheckLicenseCover check that the current license cover the requested license.
func CheckLicenseCover(licenseType LicenseType) func(*logp.Logger, License) bool {
return func(log *logp.Logger, license License) bool {
log.Debugf("Checking that license cover %s", licenseType)
log.Debug("Checking that license covers %s", licenseType)
if license.Cover(licenseType) && license.IsActive() {
return true
}
log.Infof("License is active for %s", licenseType)
return false
}
}
Expand All @@ -48,3 +50,8 @@ func Validate(log *logp.Logger, license License, checks ...CheckFunc) bool {
}
return false
}

// BasicAndAboveOrTrial return true if the license is basic or if the license is trial and active.
func BasicAndAboveOrTrial(log *logp.Logger, license License) bool {
return CheckBasic(log, license) || CheckTrial(log, license)
}
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -114,13 +114,18 @@ func (f *ElasticFetcher) Fetch() (*License, error) {
status, body, err := f.client.Request("GET", xPackURL, "", params, nil)
// When we are running an OSS release of elasticsearch the _xpack endpoint will return a 405,
// "Method Not Allowed", so we return the default OSS license.
if status == http.StatusBadRequest {
f.log.Debug("Received 'Bad request' (400) response from server, fallback to OSS license")
return OSSLicense, nil
}

if status == http.StatusMethodNotAllowed {
f.log.Debug("Received 'Method Not allowed' (405) response from server, fallback to OSS license")
return OSSLicense, nil
}

if status == http.StatusUnauthorized {
return nil, errors.New("Unauthorized access, could not connect to the xpack endpoint, verify your credentials")
return nil, errors.New("unauthorized access, could not connect to the xpack endpoint, verify your credentials")
}

if status != http.StatusOK {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ func getTestClient() *elasticsearch.Client {
client, err := elasticsearch.NewClient(elasticsearch.ClientSettings{
URL: host,
Index: outil.MakeSelector(),
Username: cli.GetEnvOr("ES_USER", ""),
Password: cli.GetEnvOr("ES_PASS", ""),
Username: "myelastic", // NOTE: I will refactor this in a followup PR
Password: "changeme",
Timeout: 60 * time.Second,
CompressionLevel: 3,
}, nil)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func newServerClientPair(t *testing.T, handler http.HandlerFunc) (*httptest.Serv
}

func TestParseJSON(t *testing.T) {
t.Run("OSS release of Elasticsearch", func(t *testing.T) {
t.Run("OSS release of Elasticsearch (Code: 405)", func(t *testing.T) {
h := func(w http.ResponseWriter, r *http.Request) {
http.Error(w, "Method Not Allowed", 405)
}
Expand All @@ -52,6 +52,23 @@ func TestParseJSON(t *testing.T) {
assert.Equal(t, OSSLicense, oss)
})

t.Run("OSS release of Elasticsearch (Code: 400)", func(t *testing.T) {
h := func(w http.ResponseWriter, r *http.Request) {
http.Error(w, "Bad Request", 400)
}
s, c := newServerClientPair(t, h)
defer s.Close()
defer c.Close()

fetcher := NewElasticFetcher(c)
oss, err := fetcher.Fetch()
if assert.NoError(t, err) {
return
}

assert.Equal(t, OSSLicense, oss)
})

t.Run("malformed JSON", func(t *testing.T) {
h := func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("hello bad JSON"))
Expand All @@ -75,7 +92,7 @@ func TestParseJSON(t *testing.T) {

fetcher := NewElasticFetcher(c)
_, err := fetcher.Fetch()
assert.Equal(t, err.Error(), "Unauthorized access, could not connect to the xpack endpoint, verify your credentials")
assert.Equal(t, err.Error(), "unauthorized access, could not connect to the xpack endpoint, verify your credentials")
})

t.Run("any error from the server", func(t *testing.T) {
Expand Down
44 changes: 44 additions & 0 deletions x-pack/libbeat/licenser/es_callback.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package licenser

import (
"fmt"

"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/outputs/elasticsearch"
)

// Enforce setups the corresponding callbacks in libbeat to verify the license on the
// remote elasticsearch cluster.
func Enforce(log *logp.Logger, checks ...CheckFunc) {
cb := func(client *elasticsearch.Client) error {
fetcher := NewElasticFetcher(client)
license, err := fetcher.Fetch()

if err != nil {
return errors.Wrapf(err, "cannot retrieve the elasticsearch license")
}

if license == OSSLicense {
return errors.New("This Beat requires the default distribution of Elasticsearch. Please " +
"upgrade to the default distribution of Elasticsearch from elastic.co, or downgrade to " +
"the oss-only distribution of beats")
}

if !Validate(log, *license, checks...) {
return fmt.Errorf(
"invalid license found, requires a basic or a valid trial license and received %s",
license.Get(),
)
}

return nil
}

elasticsearch.RegisterGlobalCallback(cb)
}
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.

0 comments on commit b0a2eb4

Please sign in to comment.