Skip to content

Commit

Permalink
Catch error in waitBulkAction. Add bulk.WithRetryOnConflict(3) in mul…
Browse files Browse the repository at this point in the history
…tiple places. (#1896)

* Catch error in waitBulkAction. Add bulk.WithRetryOnConflict(3) in multiple places.

* Add changelog entry.

* Update CHANGELOG.next.asciidoc

Co-authored-by: Craig MacKenzie <craig.mackenzie@elastic.co>

Co-authored-by: Craig MacKenzie <craig.mackenzie@elastic.co>
(cherry picked from commit f77b97c)

# Conflicts:
#	CHANGELOG.next.asciidoc
#	internal/pkg/policy/policy_output.go
  • Loading branch information
blakerouse authored and mergify[bot] committed Sep 30, 2022
1 parent 6e54a01 commit 5ccc7cd
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 4 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,12 @@
- Remove events from agent checkin body. {issue}1774[1774]
- Improve authc debug logging. {pull}1870[1870]
- Add error detail to catch-all HTTP error response. {pull}1854[1854]
<<<<<<< HEAD
- Update apikey.cache_hit log field name to match convention. {pull}1900[1900]
- LoadServerLimits will not overwrite specified limits when loading default/agent number specified values. {issue}1841[1841] {pull}1912[1912]
=======
- Fix issue were errors where being ignored written to elasticsearch. {pull}1896[1896]
>>>>>>> f77b97c (Catch error in waitBulkAction. Add bulk.WithRetryOnConflict(3) in multiple places. (#1896))
==== New Features

Expand Down
4 changes: 2 additions & 2 deletions internal/pkg/api/handleAck.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ func (ack *AckT) handleUnenroll(ctx context.Context, zlog zerolog.Logger, agent
return errors.Wrap(err, "handleUnenroll marshal")
}

if err = ack.bulk.Update(ctx, dl.FleetAgents, agent.Id, body, bulk.WithRefresh()); err != nil {
if err = ack.bulk.Update(ctx, dl.FleetAgents, agent.Id, body, bulk.WithRefresh(), bulk.WithRetryOnConflict(3)); err != nil {
return errors.Wrap(err, "handleUnenroll update")
}

Expand All @@ -428,7 +428,7 @@ func (ack *AckT) handleUpgrade(ctx context.Context, zlog zerolog.Logger, agent *
return errors.Wrap(err, "handleUpgrade marshal")
}

if err = ack.bulk.Update(ctx, dl.FleetAgents, agent.Id, body, bulk.WithRefresh()); err != nil {
if err = ack.bulk.Update(ctx, dl.FleetAgents, agent.Id, body, bulk.WithRefresh(), bulk.WithRetryOnConflict(3)); err != nil {
return errors.Wrap(err, "handleUpgrade update")
}

Expand Down
5 changes: 5 additions & 0 deletions internal/pkg/bulk/opBulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"github.com/elastic/go-elasticsearch/v7/esapi"
"github.com/mailru/easyjson"
"github.com/rs/zerolog/log"

"github.com/elastic/fleet-server/v7/internal/pkg/es"
)

func (b *Bulker) Create(ctx context.Context, index, id string, body []byte, opts ...Opt) (string, error) {
Expand Down Expand Up @@ -73,6 +75,9 @@ func (b *Bulker) waitBulkAction(ctx context.Context, action actionT, index, id s
if !ok {
return nil, fmt.Errorf("unable to cast to *BulkIndexerResponseItem, detected type %T", resp.data)
}
if err := es.TranslateError(r.Status, r.Error); err != nil {
return nil, err
}
return r, nil
}

Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/coordinator/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,7 @@ func unenrollAgent(ctx context.Context, zlog zerolog.Logger, bulker bulk.Bulk, a
return err
}
}
if err = bulker.Update(ctx, agentsIndex, agent.Id, body, bulk.WithRefresh()); err != nil {
if err = bulker.Update(ctx, agentsIndex, agent.Id, body, bulk.WithRefresh(), bulk.WithRetryOnConflict(3)); err != nil {
zlog.Error().Err(err).Msg("Fail unenrollAgent record update")
}

Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/dl/servers.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,5 +55,5 @@ func EnsureServer(ctx context.Context, bulker bulk.Bulk, version string, agent m
if err != nil {
return err
}
return bulker.Update(ctx, o.indexName, agent.ID, data)
return bulker.Update(ctx, o.indexName, agent.ID, data, bulk.WithRefresh(), bulk.WithRetryOnConflict(3))
}
89 changes: 89 additions & 0 deletions internal/pkg/policy/policy_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func (p *PolicyOutput) Prepare(ctx context.Context, zlog zerolog.Logger, bulker
zlog.Debug().Msg("policy output permissions are the same")
}

<<<<<<< HEAD
if needNewKey {
zlog.Debug().
RawJSON("roles", p.Role.Raw).
Expand All @@ -78,17 +79,105 @@ func (p *PolicyOutput) Prepare(ctx context.Context, zlog zerolog.Logger, bulker
if err != nil {
zlog.Error().Err(err).Msg("fail generate output key")
return err
=======
output.PermissionsHash = p.Role.Sha2 // for the sake of consistency
zlog.Debug().
Str("hash.sha256", p.Role.Sha2).
Str("roles", string(p.Role.Raw)).
Msg("Updating agent record to pick up most recent roles.")

fields := map[string]interface{}{
dl.FieldPolicyOutputPermissionsHash: p.Role.Sha2,
}

// Using painless script to update permission hash for updated key
body, err := renderUpdatePainlessScript(p.Name, fields)
if err != nil {
return err
}

if err = bulker.Update(ctx, dl.FleetAgents, agent.Id, body, bulk.WithRefresh(), bulk.WithRetryOnConflict(3)); err != nil {
zlog.Error().Err(err).Msg("fail update agent record")
return err
}

} else if needNewKey {
zlog.Debug().
RawJSON("fleet.policy.roles", p.Role.Raw).
Str("fleet.policy.default.oldHash", output.PermissionsHash).
Str("fleet.policy.default.newHash", p.Role.Sha2).
Msg("Generating a new API key")

ctx := zlog.WithContext(ctx)
outputAPIKey, err :=
generateOutputAPIKey(ctx, bulker, agent.Id, p.Name, p.Role.Raw)
if err != nil {
return fmt.Errorf("failed generate output API key: %w", err)
}

// When a new keys is generated we need to update the Agent record,
// this will need to be updated when multiples remote Elasticsearch output
// are supported.
zlog.Info().
Str("fleet.policy.role.hash.sha256", p.Role.Sha2).
Str(logger.DefaultOutputAPIKeyID, outputAPIKey.ID).
Msg("Updating agent record to pick up default output key.")

fields := map[string]interface{}{
dl.FieldPolicyOutputAPIKey: outputAPIKey.Agent(),
dl.FieldPolicyOutputAPIKeyID: outputAPIKey.ID,
dl.FieldPolicyOutputPermissionsHash: p.Role.Sha2,
}

if !foundOutput {
fields[dl.FiledType] = OutputTypeElasticsearch
}
if output.APIKeyID != "" {
fields[dl.FieldPolicyOutputToRetireAPIKeyIDs] = model.ToRetireAPIKeyIdsItems{
ID: output.APIKeyID,
RetiredAt: time.Now().UTC().Format(time.RFC3339),
>>>>>>> f77b97c (Catch error in waitBulkAction. Add bulk.WithRetryOnConflict(3) in multiple places. (#1896))
}

agent.DefaultAPIKey = outputAPIKey.Agent()

<<<<<<< HEAD
// When a new keys is generated we need to update the Agent record,
// this will need to be updated when multiples Elasticsearch output
// are used.
zlog.Info().
Str("hash.sha256", p.Role.Sha2).
Str(logger.DefaultOutputAPIKeyID, outputAPIKey.ID).
Msg("Updating agent record to pick up default output key.")
=======
if err = bulker.Update(ctx, dl.FleetAgents, agent.Id, body, bulk.WithRefresh(), bulk.WithRetryOnConflict(3)); err != nil {
zlog.Error().Err(err).Msg("fail update agent record")
return fmt.Errorf("fail update agent record: %w", err)
}

// Now that all is done, we can update the output on the agent variable
// Right not it's more for consistency and to ensure the in-memory agent
// data is correct and in sync with ES, so it can be safely used after
// this method returns.
output.Type = OutputTypeElasticsearch
output.APIKey = outputAPIKey.Agent()
output.APIKeyID = outputAPIKey.ID
output.PermissionsHash = p.Role.Sha2 // for the sake of consistency
}

// Always insert the `api_key` as part of the output block, this is required
// because only fleet server knows the api key for the specific agent, if we don't
// add it the agent will not receive the `api_key` and will not be able to connect
// to Elasticsearch.
//
// We need to investigate allocation with the new LS output, we had optimization
// in place to reduce number of agent policy allocation when sending the updated
// agent policy to multiple agents.
// See: https://github.com/elastic/fleet-server/issues/1301
if err := setMapObj(outputMap, output.APIKey, p.Name, "api_key"); err != nil {
return err
}
>>>>>>> f77b97c (Catch error in waitBulkAction. Add bulk.WithRetryOnConflict(3) in multiple places. (#1896))

fields := map[string]interface{}{
dl.FieldDefaultAPIKey: outputAPIKey.Agent(),
Expand Down

0 comments on commit 5ccc7cd

Please sign in to comment.