Skip to content

Commit

Permalink
Merge pull request #1 from warwick-mitchell1/feature/multi-step-reindex
Browse files Browse the repository at this point in the history
Expand the reindexing capabilities.
  • Loading branch information
LGUG2Z authored Mar 16, 2020
2 parents dc015a0 + 68cd09e commit fd4d363
Show file tree
Hide file tree
Showing 3 changed files with 177 additions and 55 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.0.4
0.1.0
63 changes: 59 additions & 4 deletions cli/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func App() *cli.App {
app.Commands = []cli.Command{
Reindex(),
Cleanup(),
UpdateAlias(),
}

return app
Expand Down Expand Up @@ -132,9 +133,14 @@ func Cleanup() cli.Command {
func Reindex() cli.Command {
return cli.Command{
Name: "reindex",
Usage: "create a new index from an index template and reindex any existing documents",
Usage: "Reindex from a src index to a dest index optionally creating a new index from an index template",
Flags: []cli.Flag{
cli.BoolFlag{Name: "bulk-indexing", Usage: "set refresh_interval to -1 when reindexing and revert afterwards"},
cli.StringFlag{Name: "dest-index", Usage: "Optionally specify destination index, otherwise one will be generated and created for you."},
cli.BoolFlag{Name: "bulk-indexing", Usage: "set refresh_interval to -1 and set number_of_replicas to 0 when reindexing and revert afterwards."},
cli.BoolFlag{Name: "version-external", Usage: "set version_type to external. This will only index documents if they don't exist or the source doc is at a higher version"},
cli.BoolFlag{Name: "no-update-alias", Usage: "don't update the index alias. This setting will also not revert the refresh_interval and number_of_replicas if bulk-indexing is set"},
cli.StringFlag{Name: "reindex-host-allocation", Usage: "Optional target host for the reindex to happen on. eg. 'es-reindex-*'"},
cli.StringFlag{Name: "dest-host-allocation", Usage: "Optional target host once the reindex is complete. eg. 'es-data-*'"},
},
Action: func(c *cli.Context) error {
if c.NArg() == 0 || c.NArg() > 1 {
Expand All @@ -148,21 +154,44 @@ func Reindex() cli.Command {
}

if strings.HasSuffix(c.Args().First(), ".json") {
// Single index
filePath := c.Args().First()
newIndex, err := elasticsearch.UpdateTemplateAndCreateNewIndex(client, filePath, c.Bool("bulk-indexing"))

newIndex, err := elasticsearch.UpdateTemplateAndCreateNewIndex(client, filePath, c.String("dest-index"), c.Bool("bulk-indexing"))
if err != nil {
return err
}

if c.IsSet("reindex-host-allocation") {
// Update index settings to force allocation to specified host pattern for reindex
err := elasticsearch.UpdateHostAllocation(client, newIndex, c.String("reindex-host-allocation"))
if err != nil {
return err
}
}

_, fileName := filepath.Split(filePath)
alias := strings.TrimSuffix(fileName, ".json")
if err = elasticsearch.ReindexOne(client, alias, newIndex); err != nil {
if err = elasticsearch.ReindexOne(client, alias, newIndex, c.Bool("version-external"), c.Bool("no-update-alias"), c.Bool("bulk-indexing")); err != nil {
return err
}

if c.IsSet("dest-host-allocation") {
// Update index settings to force allocation to specified host pattern after reindex is complete
err := elasticsearch.UpdateHostAllocation(client, newIndex, c.String("dest-host-allocation"))
if err != nil {
return err
}
}
return nil
}

// Multiple indexes
if c.IsSet("dest-index") {
fmt.Printf("--dest-index not supported with multiple indexes, please only specify one index template .json.")
cli.ShowCommandHelpAndExit(c, "reindex", 1)
}

directory := c.Args().First()
aliasToNewIndex, err := elasticsearch.UpdateTemplatesAndCreateNewIndices(client, directory, c.Bool("bulk-indexing"))
if err != nil {
Expand All @@ -177,3 +206,29 @@ func Reindex() cli.Command {
},
}
}

func UpdateAlias() cli.Command {
return cli.Command{
Name: "update-alias",
Usage: "Swap an index alias to another index",
Flags: []cli.Flag{
cli.StringFlag{Name: "alias", Usage: "Name of the alias."},
cli.StringFlag{Name: "dest-index", Usage: "Name of the destination index."},
},
Action: func(c *cli.Context) error {
if !c.IsSet("alias") || !c.IsSet("dest-index") {
fmt.Printf("This command requires a json index template file path or a directory of json index templates\n\n")
cli.ShowCommandHelpAndExit(c, "reindex", 1)
}
client, err := getClient(c)
if err != nil {
return errors.Wrap(err, "error connecting to ElasticSearch")
}

if err = elasticsearch.UpdateAlias(client, c.String("alias"), c.String("dest-index")); err != nil {
return err
}
return nil
},
}
}
167 changes: 117 additions & 50 deletions elasticsearch/elasdx.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,16 @@ var IndexTemplatePrefix = "INDEX TEMPLATE "
var IndexPrefix = "INDEX "
var DocumentsPrefix = "DOCUMENTS "
var AliasPrefix = "ALIAS "
var SettingsPrefix = "SETTINGS "
var Added = color.GreenString("Added ")
var Created = color.GreenString("Created ")
var Removed = color.RedString("Removed ")
var Deleted = color.RedString("Deleted ")
var Updated = color.GreenString("Updated ")
var Reindexed = color.YellowString("Reindexed ")
var Info = color.YellowString("Info ")

func UpdateTemplateAndCreateNewIndex(client *elastic.Client, filePath string, bulkIndexing bool) (string, error) {
func UpdateTemplateAndCreateNewIndex(client *elastic.Client, filePath, newIndexName string, bulkIndexing bool) (string, error) {
bytes, err := ioutil.ReadFile(filePath)
if err != nil {
return "", errors.Wrapf(err, "failed reading file %s", filePath)
Expand All @@ -45,41 +47,52 @@ func UpdateTemplateAndCreateNewIndex(client *elastic.Client, filePath string, bu
if !indexPutTemplate.Acknowledged {
fmt.Printf("PUT index/%s not acknowledged\n", index)
}
fmt.Printf("%s %s %s\n", IndexTemplatePrefix, Updated, index)

// Create a unique time-stamped index
dateSuffix := time.Now().Format("2006-01-02-15:04:05")
indexWithDate := fmt.Sprintf("%s-%s", index, dateSuffix)
if newIndexName == "" {
newIndexName = fmt.Sprintf("%s-%s", index, dateSuffix)
}

create, err := client.CreateIndex(indexWithDate).Do(context.Background())
exists, err := client.IndexExists(newIndexName).Do(context.Background())
if err != nil {
return "", errors.Wrapf(err, "failed creating index %s", indexWithDate)
return "", errors.Wrapf(err, "failed checking for index %s", newIndexName)
}

bulkIndexingSettings := `{
"index" : {
"refresh_interval" : "-1"
}
}
`
if !exists {
create, err := client.CreateIndex(newIndexName).Do(context.Background())
if err != nil {
return "", errors.Wrapf(err, "failed creating index %s", newIndexName)
}
if !create.Acknowledged {
fmt.Println("index creation not acknowledged")
}
fmt.Printf("%s %s %s\n", IndexPrefix, Created, newIndexName)
} else {
fmt.Printf("%s %s %s\n", IndexPrefix, Updated, newIndexName)
}

if bulkIndexing {
settings, err := client.IndexPutSettings(indexWithDate).BodyString(bulkIndexingSettings).Do(context.Background())
indexingSettings := `{
"index" : {
"refresh_interval" : "-1",
"number_of_replicas": "0"
}
}
`
settings, err := client.IndexPutSettings(newIndexName).BodyString(indexingSettings).Do(context.Background())
if err != nil {
return "", errors.Wrapf(err, "failed setting refresh_interval to -1 for index %s", indexWithDate)
return "", errors.Wrapf(err, "failed setting index refresh_interval/replica settings for index %s", newIndexName)
}

if !settings.Acknowledged {
fmt.Println("index settings not acknowledged")
}
fmt.Printf("%s %s refresh_interval(0)/replica(-1) for %s \n", SettingsPrefix, Updated, newIndexName)
}

if !create.Acknowledged {
fmt.Println("index creation not acknowledged")
}

fmt.Printf("%s %s %s\n", IndexTemplatePrefix, Updated, index)
fmt.Printf("%s %s %s\n", IndexPrefix, Created, indexWithDate)
return indexWithDate, nil
return newIndexName, nil
}

func UpdateTemplatesAndCreateNewIndices(client *elastic.Client, templatesDir string, bulkIndexing bool) (map[string]string, error) {
Expand All @@ -97,7 +110,7 @@ func UpdateTemplatesAndCreateNewIndices(client *elastic.Client, templatesDir str
}

filePath := filepath.Join(templatesDir, file.Name())
newIndex, err := UpdateTemplateAndCreateNewIndex(client, filePath, bulkIndexing)
newIndex, err := UpdateTemplateAndCreateNewIndex(client, filePath, "", bulkIndexing)
if err != nil {
return nil, errors.Wrapf(err, "failed create new index from updated template %s", filePath)
}
Expand All @@ -109,7 +122,7 @@ func UpdateTemplatesAndCreateNewIndices(client *elastic.Client, templatesDir str
return aliasToNewIndex, err
}

func ReindexOne(client *elastic.Client, alias, newIndex string) error {
func ReindexOne(client *elastic.Client, alias, newIndex string, versionExternal, noUpdateAlias, bulkIndexing bool) error {
// We assume that we are reindexing from an existing index on the alias
reindexingRequired := true

Expand All @@ -131,55 +144,91 @@ func ReindexOne(client *elastic.Client, alias, newIndex string) error {
indicesFromAlias := aliasResult.IndicesByAlias(alias)
for _, index := range indicesFromAlias {
// Reindex from the existing index as the source to the new index as the destination
targetVersionType := "internal"
if versionExternal {
targetVersionType = "external"
}
src := elastic.NewReindexSource().Index(index)
dst := elastic.NewReindexDestination().Index(newIndex)
dst := elastic.NewReindexDestination().Index(newIndex).VersionType(targetVersionType)
refresh, err := client.Reindex().Source(src).Destination(dst).Refresh("true").Conflicts("proceed").Do(context.Background())
if err != nil {
return errors.Wrapf(err, "failed reindexing from %s to %s", index, newIndex)
}

fmt.Printf("%s %s %d from %s to %s\n", DocumentsPrefix, Reindexed, refresh.Total, index, newIndex)
}
}

// Remove the existing index from the alias
removeAlias, err := client.Alias().Remove(index, alias).Do(context.Background())
if err != nil {
return errors.Wrapf(err, "failed removing index %s from alias %s", index, alias)
}
// We don't reset the refresh/replca values unless we're either updating the index Alias or bulkIndexing isn't set
if !noUpdateAlias || !bulkIndexing {
templates, err := client.IndexGetTemplate(alias).Do(context.Background())
if err != nil {
return errors.Wrapf(err, "failed to retrieve index template %s", alias)
}

if !removeAlias.Acknowledged {
fmt.Println("alias removal not acknowledged")
}
seconds := "null"
replicas := "null"

fmt.Printf("%s %s %s from %s\n", AliasPrefix, Removed, index, alias)
if refreshInterval, ok := templates[alias].Settings["index"].(map[string]interface{})["refresh_interval"]; ok {
seconds = fmt.Sprintf(`"%s"`, refreshInterval.(string))
}
if replicaCount, ok := templates[alias].Settings["index"].(map[string]interface{})["number_of_replicas"]; ok {
replicas = fmt.Sprintf(`"%s"`, replicaCount.(string))
}
}

templates, err := client.IndexGetTemplate(alias).Do(context.Background())
if err != nil {
return errors.Wrapf(err, "failed to retrieve index template %s", alias)
// Reset the refresh interval to either whatever is specified in the index template or the default (using null)
resetRefreshInterval := fmt.Sprintf(`{
"index" : {
"refresh_interval" : %s,
"number_of_replicas" : %s
}
}
`, seconds, replicas)

seconds := "null"
settings, err := client.IndexPutSettings(newIndex).BodyString(resetRefreshInterval).Do(context.Background())
if err != nil {
return errors.Wrapf(err, "failed setting index bulkimport/replica settings for index %s", newIndex)
}

if refreshInterval, ok := templates[alias].Settings["index"].(map[string]interface{})["refresh_interval"]; ok {
seconds = fmt.Sprintf(`"%s"`, refreshInterval.(string))
if !settings.Acknowledged {
fmt.Println("index settings not acknowledged")
}
fmt.Printf("%s %s refresh_interval/replica for %s reset to template values\n", SettingsPrefix, Updated, newIndex)
}

// Reset the refresh interval to either whatever is specified in the index template or the default (using null)
resetRefreshInterval := fmt.Sprintf(`{
"index" : {
"refresh_interval" : %s
}
if !noUpdateAlias {
return UpdateAlias(client, alias, newIndex)
}
return nil
}
`, seconds)

settings, err := client.IndexPutSettings(newIndex).BodyString(resetRefreshInterval).Do(context.Background())
if err != nil {
return errors.Wrapf(err, "failed setting refresh_interval to -1 for index %s", newIndex)
func UpdateAlias(client *elastic.Client, alias, newIndex string) error {
if exists, err := client.IndexExists(newIndex).Do(context.Background()); err != nil || !exists {
fmt.Printf("failed checking for index %s", newIndex)
return errors.Wrapf(err, "failed checking for index %s", newIndex)
}

if !settings.Acknowledged {
fmt.Println("index settings not acknowledged")
aliasResult, err := client.Aliases().Alias(alias).Do(context.Background())
if err == nil {
// Remove the existing index from the alias
indicesFromAlias := aliasResult.IndicesByAlias(alias)
for _, index := range indicesFromAlias {
removeAlias, err := client.Alias().Remove(index, alias).Do(context.Background())
if err != nil {
return errors.Wrapf(err, "failed removing index %s from alias %s", index, alias)
}

if !removeAlias.Acknowledged {
fmt.Println("alias removal not acknowledged")
}

fmt.Printf("%s %s %s from %s\n", AliasPrefix, Removed, index, alias)
}
} else {
if !elastic.IsNotFound(err) {
return errors.Wrapf(err, "failed trying to lookup alias %s", alias)
}
fmt.Printf("%s %s %s not found, must be a new index\n", AliasPrefix, Info, alias)
}

// Add our new index which has been reindex with the existing data to the alias
Expand All @@ -197,9 +246,27 @@ func ReindexOne(client *elastic.Client, alias, newIndex string) error {
return nil
}

func UpdateHostAllocation(client *elastic.Client, newIndex, allocation string) error {

resetAllocation := fmt.Sprintf(`{
"index" : {
"routing.allocation.require._name" : "%s"
}
}
`, allocation)

_, err := client.IndexPutSettings(newIndex).BodyString(resetAllocation).Do(context.Background())
if err != nil {
return errors.Wrapf(err, "failed setting routing allocation to %s for index %s", allocation, newIndex)
}

fmt.Printf("%s %s Allocation set to %s\n", SettingsPrefix, Updated, allocation)
return nil
}

func ReindexAll(client *elastic.Client, aliasToNewIndex map[string]string) error {
for alias, newIndex := range aliasToNewIndex {
if err := ReindexOne(client, alias, newIndex); err != nil {
if err := ReindexOne(client, alias, newIndex, false, false, false); err != nil {
return errors.Wrapf(err, "failed reindexing to %s and adding to alias %s", newIndex, alias)
}
}
Expand Down

0 comments on commit fd4d363

Please sign in to comment.