diff --git a/cmd/tools/admin/main.go b/cmd/tools/admin/main.go index 5e9b7414..a8cf6609 100644 --- a/cmd/tools/admin/main.go +++ b/cmd/tools/admin/main.go @@ -24,468 +24,153 @@ import ( "os" "github.com/codegangsta/cli" - "github.com/uber/cherami-server/common" + lib "github.com/uber/cherami-server/cmd/tools/common" "github.com/uber/cherami-server/tools/admin" ) +const ( + adminToolService = "cherami-admin" +) + func main() { app := cli.NewApp() - cliHelper := common.NewCliHelper() - // SetCanonicalZones. For now just "zone1", "zone2", "z1" - // and "z2" are valid and they map to "zone1" and "zone2" - // canonical zones. - // We can use this API to set any valid zones - cliHelper.SetCanonicalZones(map[string]string{ - "zone1": "zone1", - "zone2": "zone2", - "z1": "zone1", - "z2": "zone2", - }) app.Name = "cherami" app.Usage = "A command-line tool for cherami developer, including debugging tool" app.Version = "1.2.1" - app.Flags = []cli.Flag{ - cli.BoolTFlag{ - Name: "hyperbahn", - Usage: "use hyperbahn", - }, - cli.IntFlag{ - Name: "timeout, t", - Value: 60, - Usage: "timeout in seconds", - }, - cli.StringFlag{ - Name: "env", - Value: "staging", - Usage: "env to connect. By default connects to staging, use \"prod\" to connect to production", - }, - cli.StringFlag{ - Name: "hyperbahn_bootstrap_file, hbfile", - Value: "/etc/uber/hyperbahn/hosts.json", - Usage: "hyperbahn boostrap file", - EnvVar: "HYPERBAHN_BOOSTRAP_FILE", - }, - cli.StringFlag{ - Name: "hostport", - Value: "", - Usage: "Host:port for frontend host", - EnvVar: "CHERAMI_FRONTEND_HOSTPORT", - }, - cli.BoolTFlag{ - Name: "admin_mode", - Usage: "use admin mode (bypass range checking for input arguments)", - }, - } - app.Commands = []cli.Command{ - { - Name: "create", - Aliases: []string{"c", "cr"}, - Usage: "create (destination | consumergroup)", - Subcommands: []cli.Command{ - { - Name: "destination", - Aliases: []string{"d", "dst"}, - Usage: "create destination [options]", - Flags: []cli.Flag{ - cli.StringFlag{ - Name: "type, t", - Value: "plain", - Usage: "Type of the destination: 'plain' or 'timer'", - }, + app.Flags = lib.GetCommonFlags() + app.Flags = append(app.Flags, cli.BoolTFlag{ + Name: "admin_mode", + Usage: "use admin mode (bypass range checking for input arguments)", + }) - cli.IntFlag{ - Name: "consumed_messages_retention, cr", - Value: 3600, - Usage: "Consumed messages retention period specified in seconds. Default is 1 hour.", - }, + app.Commands = lib.GetCommonCommands(adminToolService) - cli.IntFlag{ - Name: "unconsumed_messages_retention, ur", - Value: 7200, - Usage: "Unconsumed messages retention period specified in seconds. Default is two hours.", - }, - cli.StringFlag{ - Name: "checksum_option, co", - Value: "crcIEEE", - Usage: "Checksum_options, can be one of the crcIEEE, md5", - }, - cli.StringFlag{ - Name: "owner_email, oe", - Value: "", - Usage: "The owner's email who commits the request. Default is the $USER@uber.com", - }, - }, - Action: func(c *cli.Context) { - admin.CreateDestination(c, cliHelper) - }, - }, - { - Name: "consumergroup", - Aliases: []string{"c", "cg"}, - Usage: "create consumergroup [options]", - Flags: []cli.Flag{ - cli.IntFlag{ - Name: "start_time, s", - Value: 0, - Usage: "Start this consumer group at this UNIX timestamp; by default we start at this Unix timestamp (seconds since 1970-1-1)", - }, - cli.IntFlag{ - Name: "lock_timeout_seconds, l", - Value: 60, - Usage: "Ack timeout for each message", - }, - cli.IntFlag{ - Name: "max_delivery_count, m", - Value: 10, - Usage: "Maximum delivery count for a message before it sents to dead-letter queue", - }, - cli.IntFlag{ - Name: "skip_older_messages_in_seconds, k", - Value: 0, - Usage: "Skip messages older than this duration in seconds ('0' to skip none).", - }, - cli.IntFlag{ - Name: "delay_seonds, d", - Value: 0, - Usage: "Delay to add to every message, in seconds.", - }, - cli.StringFlag{ - Name: "owner_email, oe", - Value: "", - Usage: "The owner's email who commits the request. Default is the $USER@uber.com", - }, - }, - Action: func(c *cli.Context) { - admin.CreateConsumerGroup(c, cliHelper) - }, + showCommand := getCommand(app.Commands, "show") + showCommand.Usage = "show (destination | consumergroup | extent | storehost | message | dlq | cgAckID | cgqueue | destqueue | cgBacklog)" + showCommand.Subcommands = append(showCommand.Subcommands, []cli.Command{ + { + Name: "extent", + Aliases: []string{"e"}, + Usage: "show extent ", + Flags: []cli.Flag{ + cli.StringFlag{ + Name: "showcg, sc", + Value: "false", + Usage: "show consumer group(false, true), default to false", }, }, + Action: func(c *cli.Context) { + admin.ReadExtent(c) + }, }, { - Name: "show", - Aliases: []string{"s", "sh", "info", "i"}, - Usage: "show (destination | consumergroup | extent | storehost | message | dlq | cgAckID | cgqueue | destqueue | cgBacklog)", - Subcommands: []cli.Command{ - { - Name: "destination", - Aliases: []string{"d", "dst"}, - Usage: "show destination ", - Flags: []cli.Flag{ - cli.StringFlag{ - Name: "showcg, sc", - Value: "false", - Usage: "show consumer group(false, true), default to false", - }, - }, - Action: func(c *cli.Context) { - admin.ReadDestination(c) - }, - }, - { - Name: "consumergroup", - Aliases: []string{"c", "cg"}, - Usage: "show consumergroup ( | )", - Action: func(c *cli.Context) { - admin.ReadConsumerGroup(c) - }, - }, - { - Name: "extent", - Aliases: []string{"e"}, - Usage: "show extent ", - Flags: []cli.Flag{ - cli.StringFlag{ - Name: "showcg, sc", - Value: "false", - Usage: "show consumer group(false, true), default to false", - }, - }, - Action: func(c *cli.Context) { - admin.ReadExtent(c) - }, - }, - { - Name: "storehost", - Aliases: []string{"s"}, - Usage: "show storehost ", - Flags: []cli.Flag{ - cli.IntFlag{ - Name: "top, tp", - Value: 5, - Usage: "show the top k heavy extents in this storehost", - }, - }, - Action: func(c *cli.Context) { - admin.ReadStoreHost(c) - }, - }, - { - Name: "message", - Aliases: []string{"m"}, - Usage: "show message
", - Action: func(c *cli.Context) { - admin.ReadMessage(c) - }, - }, - { - Name: "dlq", - Aliases: []string{"dl"}, - Usage: "show dlq ", - Action: func(c *cli.Context) { - admin.ReadDlq(c) - }, - }, - { - Name: "cgAckID", - Aliases: []string{"aid"}, - Usage: "show cgAckID ", - Action: func(c *cli.Context) { - admin.ReadCgAckID(c) - }, - }, - { - Name: "cgqueue", - Aliases: []string{"cq", "cgq"}, - Usage: "show cgqueue ( | )", - Flags: []cli.Flag{ - cli.StringFlag{ - Name: "status, s", - Value: "", - Usage: "status: open | consumed | deleted, if empty, return all", - }, - }, - Action: func(c *cli.Context) { - admin.ReadCgQueue(c) - }, - }, - { - Name: "destqueue", - Aliases: []string{"dq", "destq"}, - Usage: "show destqueue ( | )", - Flags: []cli.Flag{ - cli.StringFlag{ - Name: "status, s", - Value: "open", - Usage: "status: open | sealed | consumed archived | deleted, if empty, return all", - }, - }, - Action: func(c *cli.Context) { - admin.ReadDestQueue(c) - }, - }, - { - Name: "cgBacklog", - Aliases: []string{"cgb", "cb"}, - Usage: "show cgBacklog ", - Action: func(c *cli.Context) { - admin.ReadCgBacklog(c) - }, + Name: "storehost", + Aliases: []string{"s"}, + Usage: "show storehost ", + Flags: []cli.Flag{ + cli.IntFlag{ + Name: "top, tp", + Value: 5, + Usage: "show the top k heavy extents in this storehost", }, }, + Action: func(c *cli.Context) { + admin.ReadStoreHost(c) + }, + }, { + Name: "cgAckID", + Aliases: []string{"aid"}, + Usage: "show cgAckID ", + Action: func(c *cli.Context) { + admin.ReadCgAckID(c) + }, }, { - Name: "update", - Aliases: []string{"u"}, - Usage: "update (destination | consumergroup | storehost)", - Subcommands: []cli.Command{ - { - Name: "destination", - Aliases: []string{"d", "dst"}, - Usage: "update destination ", - Flags: []cli.Flag{ - cli.StringFlag{ - Name: "status, s", - Value: "enabled", - Usage: "status: enabled | disabled | sendonly | recvonly", - }, - cli.IntFlag{ - Name: "consumed_messages_retention, cr", - Value: 3600, - Usage: "Consumed messages retention period specified in seconds. Default is one hour.", - }, - cli.IntFlag{ - Name: "unconsumed_messages_retention, ur", - Value: 7200, - Usage: "Unconsumed messages retention period specified in seconds. Default is two hours.", - }, - cli.StringFlag{ - Name: "checksum_option, co", - Value: "", - Usage: "Checksum_options, can be one of the crcIEEE, md5", - }, - cli.StringFlag{ - Name: "owner_email, oe", - Value: "", - Usage: "The updated owner's email", - }, - cli.StringSliceFlag{ - Name: "zone_config, zc", - Usage: "Zone configs for multi_zone destinations. Format for each zone should be \"ZoneName,AllowPublish,AllowConsume,ReplicaCount\". For example: \"zone1,true,true,3\"", - }, - }, - Action: func(c *cli.Context) { - admin.UpdateDestination(c, cliHelper) - }, - }, - { - Name: "consumergroup", - Aliases: []string{"c", "cg"}, - Usage: "update consumergroup ( | )", - Flags: []cli.Flag{ - cli.StringFlag{ - Name: "status, s", - Value: "enabled", - Usage: "status: enabled | disabled", - }, - cli.IntFlag{ - Name: "lock_timeout_seconds, l", - Value: 60, - Usage: "Ack timeout for each message", - }, - cli.IntFlag{ - Name: "max_delivery_count, m", - Value: 10, - Usage: "Maximum delivery count for a message before it sents to dead-letter queue", - }, - cli.IntFlag{ - Name: "skip_older_messages_in_seconds, k", - Value: 7200, - Usage: "Skip messages older than this duration in seconds.", - }, - cli.IntFlag{ - Name: "delay_seonds, d", - Value: 0, - Usage: "Delay to add to every message, in seconds.", - }, - cli.StringFlag{ - Name: "owner_email, oe", - Value: "", - Usage: "The updated owner's email", - }, - cli.StringFlag{ - Name: "active_zone, az", - Value: "", - Usage: "The updated active zone", - }, - cli.StringSliceFlag{ - Name: "zone_config, zc", - Usage: "Zone configs for multi_zone consumer group. Format for each zone should be \"ZoneName,PreferedActiveZone\". For example: \"zone1,false\"", - }, - }, - Action: func(c *cli.Context) { - admin.UpdateConsumerGroup(c, cliHelper) - }, + Name: "cgqueue", + Aliases: []string{"cq", "cgq"}, + Usage: "show cgqueue ( | )", + Flags: []cli.Flag{ + cli.StringFlag{ + Name: "status, s", + Value: "", + Usage: "status: open | consumed | deleted, if empty, return all", }, }, + Action: func(c *cli.Context) { + admin.ReadCgQueue(c) + }, }, { - Name: "delete", - Aliases: []string{"d"}, - Usage: "delete (destination | consumergroup)", - Subcommands: []cli.Command{ - { - Name: "destination", - Aliases: []string{"d", "dst"}, - Usage: "delete destination ", - Action: func(c *cli.Context) { - admin.DeleteDestination(c) - println("deleted destination: ", c.Args().First()) - }, - }, - { - Name: "consumergroup", - Aliases: []string{"c", "cg"}, - Usage: "delete consumergroup ", - Action: func(c *cli.Context) { - admin.DeleteConsumerGroup(c) - println("deleted consumergroup: ", c.Args()[0], c.Args()[1]) - }, + Name: "destqueue", + Aliases: []string{"dq", "destq"}, + Usage: "show destqueue ( | )", + Flags: []cli.Flag{ + cli.StringFlag{ + Name: "status, s", + Value: "open", + Usage: "status: open | sealed | consumed archived | deleted, if empty, return all", }, }, + Action: func(c *cli.Context) { + admin.ReadDestQueue(c) + }, }, + }...) + + listCommand := getCommand(app.Commands, "list") + listCommand.Usage = "list (destination | consumergroup | extents | consumergroupextents | hosts)" + listCommand.Subcommands = append(listCommand.Subcommands, []cli.Command{ { - Name: "list", - Aliases: []string{"l", "ls"}, - Usage: "list (destination | consumergroup | extents | consumergroupextents | hosts)", - Subcommands: []cli.Command{ - { - Name: "destination", - Aliases: []string{"d", "dst"}, - Usage: "list destination [options]", - Flags: []cli.Flag{ - cli.StringFlag{ - Name: "prefix, pf", - Value: "/", - Usage: "only show destinations of prefix", - }, - cli.StringFlag{ - Name: "status, s", - Value: "", - Usage: "status: enabled | disabled | sendonly | recvonly, if empty, return all", - }, - }, - Action: func(c *cli.Context) { - admin.ListDestinations(c) - }, - }, - { - Name: "consumergroup", - Aliases: []string{"c", "cg"}, - Usage: "list consumergroup []", - Action: func(c *cli.Context) { - admin.ListConsumerGroups(c) - }, + Name: "extents", + Aliases: []string{"e", "es"}, + Usage: "list extents ", + Flags: []cli.Flag{ + cli.StringFlag{ + Name: "prefix, pf", + Usage: "only show extents of prefix", }, - { - Name: "extents", - Aliases: []string{"e", "es"}, - Usage: "list extents ", - Flags: []cli.Flag{ - cli.StringFlag{ - Name: "prefix, pf", - Usage: "only show extents of prefix", - }, - }, - Action: func(c *cli.Context) { - admin.ListExtents(c) - }, + }, + Action: func(c *cli.Context) { + admin.ListExtents(c) + }, + }, + { + Name: "consumergroupextents", + Aliases: []string{"cge", "cges"}, + Usage: "list consumergroupextents ", + Flags: []cli.Flag{ + cli.IntFlag{ + Name: "limit, lm", + Value: 10, + Usage: "show top n consumer group extents", }, - { - Name: "consumergroupextents", - Aliases: []string{"cge", "cges"}, - Usage: "list consumergroupextents ", - Flags: []cli.Flag{ - cli.IntFlag{ - Name: "limit, lm", - Value: 10, - Usage: "show top n consumer group extents", - }, - }, - Action: func(c *cli.Context) { - admin.ListConsumerGroupExtents(c) - }, + }, + Action: func(c *cli.Context) { + admin.ListConsumerGroupExtents(c) + }, + }, + { + Name: "hosts", + Aliases: []string{"h", "hs"}, + Usage: "list hosts [options] ", + Flags: []cli.Flag{ + cli.StringFlag{ + Name: "service, s", + Usage: "only show hosts of service(input,output,frontend,store,controller)", }, - { - Name: "hosts", - Aliases: []string{"h", "hs"}, - Usage: "list hosts [options] ", - Flags: []cli.Flag{ - cli.StringFlag{ - Name: "service, s", - Usage: "only show hosts of service(input,output,frontend,store,controller)", - }, - cli.StringFlag{ - Name: "type, t", - Value: "active", - Usage: "show hosts from specific table(active, history), default to active", - }, - }, - Action: func(c *cli.Context) { - admin.ListHosts(c) - }, + cli.StringFlag{ + Name: "type, t", + Value: "active", + Usage: "show hosts from specific table(active, history), default to active", }, }, + Action: func(c *cli.Context) { + admin.ListHosts(c) + }, }, + }...) + + app.Commands = append(app.Commands, []cli.Command{ { Name: "listAll", Aliases: []string{"la", "lsa"}, @@ -518,33 +203,6 @@ func main() { }, }, }, - { - Name: "publish", - Aliases: []string{"p", "pub", "w", "write"}, - Usage: "publish ", - Action: func(c *cli.Context) { - admin.Publish(c) - }, - }, - { - Name: "consume", - Aliases: []string{"sub", "r", "read"}, - Usage: "consume [options]", - Flags: []cli.Flag{ - cli.BoolTFlag{ - Name: "autoack, a", - Usage: "automatically ack each message as it's printed", - }, - cli.IntFlag{ - Name: "prefetch_count, p", - Value: 1, - Usage: "prefetch count", - }, - }, - Action: func(c *cli.Context) { - admin.Consume(c) - }, - }, { Name: "uuid2hostport", Aliases: []string{"u2h"}, @@ -586,14 +244,6 @@ func main() { admin.HostAddr2uuid(c) }, }, - { - Name: "merge_dlq", - Aliases: []string{"m"}, - Usage: "merge_dlq [options]", - Action: func(c *cli.Context) { - println("**not implemented** merged DLQ in consumer group: ", c.Args().First()) - }, - }, { Name: "serviceconfig", Aliases: []string{"cfg"}, @@ -811,7 +461,16 @@ func main() { admin.StoreListExtents(c) }, }, - } + }...) app.Run(os.Args) } + +func getCommand(commands []cli.Command, name string) *cli.Command { + for _, command := range commands { + if command.Name == name { + return &command + } + } + return &cli.Command{} +} diff --git a/cmd/tools/cli/main.go b/cmd/tools/cli/main.go index 802f4d36..158e3f9e 100644 --- a/cmd/tools/cli/main.go +++ b/cmd/tools/cli/main.go @@ -22,422 +22,22 @@ package main import ( "os" - "time" "github.com/codegangsta/cli" - "github.com/uber/cherami-server/common" - lib "github.com/uber/cherami-server/tools/cli" + lib "github.com/uber/cherami-server/cmd/tools/common" ) const ( - strLockTimeoutSeconds = `Acknowledgement timeout for prefetched/received messages` - - strMaxDeliveryCount = "Maximum number of times a message is delivered\n\tbefore it is sent to the dead-letter queue (DLQ)" - - strSkipOlderMessagesInSeconds = `Skip messages older than this duration in seconds ('0' to skip none)` - intSkipOlderMessagesInSeconds = 0 // 0 -> skip none - - strDelaySeconds = `Delay to introduce to all messages, in seconds.` - intDelaySeconds = 0 // zero delay, by default + serviceName = "cherami-cli" ) func main() { app := cli.NewApp() - cliHelper := common.NewCliHelper() - // SetCanonicalZones. For now just "zone1", "zone2", "z1" - // and "z2" are valid and they map to "zone1" and "zone2" - // canonical zones. - // We can use this API to set any valid zones - cliHelper.SetCanonicalZones(map[string]string{ - "zone1": "zone1", - "zone2": "zone2", - "z1": "zone1", - "z2": "zone2", - }) app.Name = "cherami" app.Usage = "A command-line tool for cherami users" app.Version = "1.1.10" - app.Flags = []cli.Flag{ - cli.BoolTFlag{ - Name: "hyperbahn", - Usage: "Use hyperbahn", - }, - cli.IntFlag{ - Name: "timeout, t", - Value: 60, - Usage: "Timeout in seconds", - }, - cli.StringFlag{ - Name: "env", - Value: "staging", - Usage: "Deployment to connect to. By default connects to staging. Use \"prod\" to connect to production", - }, - cli.StringFlag{ - Name: "hyperbahn_bootstrap_file, hbfile", - Value: "/etc/uber/hyperbahn/hosts.json", - Usage: "hyperbahn boostrap file", - EnvVar: "HYPERBAHN_BOOSTRAP_FILE", - }, - cli.StringFlag{ - Name: "hostport", - Value: "", - Usage: "Host:port for frontend host", - EnvVar: "CHERAMI_FRONTEND_HOSTPORT", - }, - } - app.Commands = []cli.Command{ - { - Name: "create", - Aliases: []string{"c", "cr"}, - Usage: "create (destination | consumergroup)", - Subcommands: []cli.Command{ - { - Name: "destination", - Aliases: []string{"d", "dst", "dest"}, - Usage: "create destination [options]", - Flags: []cli.Flag{ - cli.StringFlag{ - Name: "type, t", - Value: "plain", - Usage: "Type of the destination: 'plain', 'timer', or 'kafka'", - }, - cli.IntFlag{ - Name: "consumed_messages_retention, cr", - Value: 3600, - Usage: "Consumed messages retention period specified in seconds. Default is 1 hour.", - }, - cli.IntFlag{ - Name: "unconsumed_messages_retention, ur", - Value: 7200, - Usage: "Unconsumed messages retention period specified in seconds. Default is two hours.", - }, - cli.StringFlag{ - Name: "checksum_option, co", - Value: "crcIEEE", - Usage: "Checksum_options, can be one of the crcIEEE, md5", - }, - cli.StringFlag{ - Name: "owner_email, oe", - Value: cliHelper.GetDefaultOwnerEmail(), - Usage: "The owner's email. Default is the $USER@uber.com", - }, - cli.StringSliceFlag{ - Name: "zone_config, zc", - Usage: "Zone configs for multi_zone destinations. Format for each zone should be \"ZoneName,AllowPublish,AllowConsume,ReplicaCount\". For example: \"zone1,true,true,3\"", - }, - cli.StringFlag{ - Name: "kafka_cluster, kc", - Usage: "Name of the Kafka cluster to attach to", - }, - cli.StringSliceFlag{ - Name: "kafka_topics, kt", - Usage: "List of kafka topics to subscribe to. Use multiple times, e.g. \"-kafka_topics topic_a -kafka_topics topic_b\"", - }, - }, - Action: func(c *cli.Context) { - lib.CreateDestination(c, cliHelper) - }, - }, - { - Name: "consumergroup", - Aliases: []string{"c", "cg"}, - Usage: "create consumergroup [|] [options]", - Flags: []cli.Flag{ - cli.IntFlag{ - Name: "start_time, s", - Value: int(time.Now().Unix()), - Usage: "Consume messages newer than this UNIX timestamp.\n\tDefault: now (i.e. consume no existing messages)\n\tUse `date -d \"2017-06-11 6:42:42 -7:00\" +%s` to determine a value for this.", - }, - cli.IntFlag{ - Name: "lock_timeout_seconds, l", - Value: 60, - Usage: strLockTimeoutSeconds, - }, - cli.IntFlag{ - Name: "max_delivery_count, m", - Value: 10, - Usage: strMaxDeliveryCount, - }, - cli.IntFlag{ - Name: "skip_older_messages_in_seconds, k", - Value: intSkipOlderMessagesInSeconds, - Usage: strSkipOlderMessagesInSeconds, - }, - cli.IntFlag{ - Name: "delay_seconds, d", - Value: intDelaySeconds, - Usage: strDelaySeconds, - }, - cli.StringFlag{ - Name: "owner_email, oe", - Value: cliHelper.GetDefaultOwnerEmail(), - Usage: "The owner's email. Default is the $USER@uber.com", - }, - cli.StringSliceFlag{ - Name: "zone_config, zc", - Usage: "Zone configs for multi_zone consumer group. Format for each zone should be \"ZoneName,PreferedActiveZone\". For example: \"zone1,false\"", - }, - }, - Action: func(c *cli.Context) { - lib.CreateConsumerGroup(c, cliHelper) - }, - }, - }, - }, - { - Name: "show", - Aliases: []string{"s", "sh", "info", "i"}, - Usage: "show (destination | consumergroup | message | dlq | cgBacklog)", - Subcommands: []cli.Command{ - { - Name: "destination", - Aliases: []string{"d", "dst", "dest"}, - Usage: "show destination ", - Flags: []cli.Flag{ - cli.BoolFlag{ - Name: "showcg, cg", - Usage: "show consumer groups for the destination", - }, - }, - Action: func(c *cli.Context) { - lib.ReadDestination(c) - }, - }, - { - Name: "consumergroup", - Aliases: []string{"c", "cg"}, - Usage: "show consumergroup ( | )", - Action: func(c *cli.Context) { - lib.ReadConsumerGroup(c) - }, - }, - { - Name: "message", - Aliases: []string{"m"}, - Usage: "show message
", - Action: func(c *cli.Context) { - lib.ReadMessage(c) - }, - }, - { - Name: "dlq", - Aliases: []string{"dl"}, - Usage: "show dlq ", - Action: func(c *cli.Context) { - lib.ReadDlq(c) - }, - }, - { - Name: "cgBacklog", - Aliases: []string{"cgb", "cb"}, - Usage: "show cgBacklog ( | )", - Action: func(c *cli.Context) { - lib.ReadCgBacklog(c) - }, - }, - }, - }, - { - Name: "update", - Aliases: []string{"u"}, - Usage: "update (destination | consumergroup)", - Subcommands: []cli.Command{ - { - Name: "destination", - Aliases: []string{"d", "dst", "dest"}, - Usage: "update destination ", - Flags: []cli.Flag{ - cli.StringFlag{ - Name: "status, s", - Value: "enabled", - Usage: "status: enabled | disabled | sendonly | recvonly", - }, - cli.IntFlag{ - Name: "consumed_messages_retention, cr", - Value: 3600, - Usage: "Consumed messages retention period specified in seconds. Default is one hour.", - }, - cli.IntFlag{ - Name: "unconsumed_messages_retention, ur", - Value: 7200, - Usage: "Unconsumed messages retention period specified in seconds. Default is two hours.", - }, - cli.StringFlag{ - Name: "checksum_option, co", - Value: "", - Usage: "Checksum_options, can be one of the crcIEEE, md5", - }, - cli.StringFlag{ - Name: "owner_email, oe", - Value: cliHelper.GetDefaultOwnerEmail(), - Usage: "The updated owner's email", - }, - cli.StringSliceFlag{ - Name: "zone_config, zc", - Usage: "Zone configs for multi_zone destinations. Format for each zone should be \"ZoneName,AllowPublish,AllowConsume,ReplicaCount\". For example: \"zone1,true,true,3\"", - }, - }, - Action: func(c *cli.Context) { - lib.UpdateDestination(c, cliHelper) - }, - }, - { - Name: "consumergroup", - Aliases: []string{"c", "cg"}, - Usage: "update consumergroup ( | )", - Flags: []cli.Flag{ - cli.StringFlag{ - Name: "status, s", - Value: "enabled", - Usage: "status: enabled | disabled", - }, - cli.IntFlag{ - Name: "lock_timeout_seconds, l", - Value: 60, - Usage: strLockTimeoutSeconds, - }, - cli.IntFlag{ - Name: "max_delivery_count, m", - Value: 10, - Usage: strMaxDeliveryCount, - }, - cli.IntFlag{ - Name: "skip_older_messages_in_seconds, k", - Value: intSkipOlderMessagesInSeconds, - Usage: strSkipOlderMessagesInSeconds, - }, - cli.IntFlag{ - Name: "delay_seconds, d", - Value: intDelaySeconds, - Usage: strDelaySeconds, - }, - cli.StringFlag{ - Name: "owner_email, oe", - Value: cliHelper.GetDefaultOwnerEmail(), - Usage: "The updated owner's email", - }, - cli.StringFlag{ - Name: "active_zone, az", - Value: "", - Usage: "The updated active zone", - }, - cli.StringSliceFlag{ - Name: "zone_config, zc", - Usage: "Zone configs for multi_zone consumer group. Format for each zone should be \"ZoneName,PreferedActiveZone\". For example: \"zone1,false\"", - }, - }, - Action: func(c *cli.Context) { - lib.UpdateConsumerGroup(c, cliHelper) - }, - }, - }, - }, - { - Name: "delete", - Aliases: []string{"d"}, - Usage: "delete (destination | consumergroup)", - Subcommands: []cli.Command{ - { - Name: "destination", - Aliases: []string{"d", "dst", "dest"}, - Usage: "delete destination ", - Action: func(c *cli.Context) { - lib.DeleteDestination(c) - println("deleted destination: ", c.Args().First()) - }, - }, - { - Name: "consumergroup", - Aliases: []string{"c", "cg"}, - Usage: "delete consumergroup [|] ", - Action: func(c *cli.Context) { - lib.DeleteConsumerGroup(c) - println("deleted consumergroup: ", c.Args()[0], c.Args()[1]) - }, - }, - }, - }, - { - Name: "list", - Aliases: []string{"l", "ls"}, - Usage: "list (destination | consumergroup)", - Subcommands: []cli.Command{ - { - Name: "destination", - Aliases: []string{"d", "dst", "dest"}, - Usage: "list destination [options]", - Flags: []cli.Flag{ - cli.StringFlag{ - Name: "prefix, pf", - Value: "/", - Usage: "only show destinations of prefix", - }, - cli.StringFlag{ - Name: "status, s", - Value: "", - Usage: "status: enabled | disabled | sendonly | recvonly, if empty, return all", - }, - }, - Action: func(c *cli.Context) { - lib.ListDestinations(c) - }, - }, - { - Name: "consumergroup", - Aliases: []string{"c", "cg"}, - Usage: "list consumergroup []", - Action: func(c *cli.Context) { - lib.ListConsumerGroups(c) - }, - }, - }, - }, - { - Name: "publish", - Aliases: []string{"p", "pub", "w", "write"}, - Usage: "publish ", - Action: func(c *cli.Context) { - lib.Publish(c) - }, - }, - { - Name: "consume", - Aliases: []string{"sub", "r", "read"}, - Usage: "consume [options]", - Flags: []cli.Flag{ - cli.BoolTFlag{ - Name: "autoack, a", - Usage: "automatically ack each message as it's printed", - }, - cli.IntFlag{ - Name: "prefetch_count, p", - Value: 1, - Usage: "prefetch count", - }, - }, - Action: func(c *cli.Context) { - lib.Consume(c) - }, - }, - { - Name: "merge_dlq", - Aliases: []string{"mdlq"}, - Usage: "merge_dlq ( | )", - - Action: func(c *cli.Context) { - lib.MergeDLQForConsumerGroup(c) - }, - }, - { - Name: "purge_dlq", - Aliases: []string{"pdlq"}, - Usage: "purge_dlq ( | )", - - Action: func(c *cli.Context) { - lib.PurgeDLQForConsumerGroup(c) - }, - }, - } + app.Flags = lib.GetCommonFlags() + app.Commands = lib.GetCommonCommands(serviceName) app.Run(os.Args) } diff --git a/cmd/tools/common/lib.go b/cmd/tools/common/lib.go new file mode 100644 index 00000000..926f2b92 --- /dev/null +++ b/cmd/tools/common/lib.go @@ -0,0 +1,442 @@ +// Copyright (c) 2016 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package common + +import ( + "time" + + "github.com/codegangsta/cli" + "github.com/uber/cherami-server/common" + toolscommon "github.com/uber/cherami-server/tools/common" +) + +const ( + strLockTimeoutSeconds = `Acknowledgement timeout for prefetched/received messages` + + strMaxDeliveryCount = "Maximum number of times a message is delivered\n\tbefore it is sent to the dead-letter queue (DLQ)" + + strSkipOlderMessagesInSeconds = `Skip messages older than this duration in seconds ('0' to skip none)` + intSkipOlderMessagesInSeconds = 0 // 0 -> skip none + + strDelaySeconds = `Delay to introduce to all messages, in seconds.` + intDelaySeconds = 0 // zero delay, by default +) + +// GetCommonFlags get the common flags for both cli and admin commands +func GetCommonFlags() []cli.Flag { + return []cli.Flag{ + cli.BoolTFlag{ + Name: "hyperbahn", + Usage: "Use hyperbahn", + }, + cli.IntFlag{ + Name: "timeout, t", + Value: 60, + Usage: "Timeout in seconds", + }, + cli.StringFlag{ + Name: "env", + Value: "staging", + Usage: "Deployment to connect to. By default connects to staging. Use \"prod\" to connect to production", + }, + cli.StringFlag{ + Name: "hyperbahn_bootstrap_file, hbfile", + Value: "/etc/uber/hyperbahn/hosts.json", + Usage: "hyperbahn boostrap file", + EnvVar: "HYPERBAHN_BOOSTRAP_FILE", + }, + cli.StringFlag{ + Name: "hostport", + Value: "", + Usage: "Host:port for frontend host", + EnvVar: "CHERAMI_FRONTEND_HOSTPORT", + }, + } +} + +// GetCommonCommands get the common commands for both cli and admin commands +func GetCommonCommands(serviceName string) []cli.Command { + cliHelper := common.NewCliHelper() + // SetCanonicalZones. For now just "zone1", "zone2", "z1" + // and "z2" are valid and they map to "zone1" and "zone2" + // canonical zones. + // We can use this API to set any valid zones + cliHelper.SetCanonicalZones(map[string]string{ + "zone1": "zone1", + "zone2": "zone2", + "z1": "zone1", + "z2": "zone2", + }) + + return []cli.Command{ + { + Name: "create", + Aliases: []string{"c", "cr"}, + Usage: "create (destination | consumergroup)", + Subcommands: []cli.Command{ + { + Name: "destination", + Aliases: []string{"d", "dst", "dest"}, + Usage: "create destination [options]", + Flags: []cli.Flag{ + cli.StringFlag{ + Name: "type, t", + Value: "plain", + Usage: "Type of the destination: 'plain', 'timer', or 'kafka'", + }, + cli.IntFlag{ + Name: "consumed_messages_retention, cr", + Value: 3600, + Usage: "Consumed messages retention period specified in seconds. Default is 1 hour.", + }, + cli.IntFlag{ + Name: "unconsumed_messages_retention, ur", + Value: 7200, + Usage: "Unconsumed messages retention period specified in seconds. Default is two hours.", + }, + cli.StringFlag{ + Name: "checksum_option, co", + Value: "crcIEEE", + Usage: "Checksum_options, can be one of the crcIEEE, md5", + }, + cli.StringFlag{ + Name: "owner_email, oe", + Value: cliHelper.GetDefaultOwnerEmail(), + Usage: "The owner's email. Default is the $USER@uber.com", + }, + cli.StringSliceFlag{ + Name: "zone_config, zc", + Usage: "Zone configs for multi_zone destinations. Format for each zone should be \"ZoneName,AllowPublish,AllowConsume,ReplicaCount\". For example: \"zone1,true,true,3\"", + }, + cli.StringFlag{ + Name: "kafka_cluster, kc", + Usage: "Name of the Kafka cluster to attach to", + }, + cli.StringSliceFlag{ + Name: "kafka_topics, kt", + Usage: "List of kafka topics to subscribe to. Use multiple times, e.g. \"-kafka_topics topic_a -kafka_topics topic_b\"", + }, + }, + Action: func(c *cli.Context) { + toolscommon.CreateDestination(c, cliHelper, serviceName) + }, + }, + { + Name: "consumergroup", + Aliases: []string{"c", "cg"}, + Usage: "create consumergroup [|] [options]", + Flags: []cli.Flag{ + cli.IntFlag{ + Name: "start_time, s", + Value: int(time.Now().Unix()), + Usage: "Consume messages newer than this UNIX timestamp.\n\tDefault: now (i.e. consume no existing messages)\n\tUse `date -d \"2017-06-11 6:42:42 -7:00\" +%s` to determine a value for this.", + }, + cli.IntFlag{ + Name: "lock_timeout_seconds, l", + Value: 60, + Usage: strLockTimeoutSeconds, + }, + cli.IntFlag{ + Name: "max_delivery_count, m", + Value: 10, + Usage: strMaxDeliveryCount, + }, + cli.IntFlag{ + Name: "skip_older_messages_in_seconds, k", + Value: intSkipOlderMessagesInSeconds, + Usage: strSkipOlderMessagesInSeconds, + }, + cli.IntFlag{ + Name: "delay_seconds, d", + Value: intDelaySeconds, + Usage: strDelaySeconds, + }, + cli.StringFlag{ + Name: "owner_email, oe", + Value: cliHelper.GetDefaultOwnerEmail(), + Usage: "The owner's email. Default is the $USER@uber.com", + }, + cli.StringSliceFlag{ + Name: "zone_config, zc", + Usage: "Zone configs for multi_zone consumer group. Format for each zone should be \"ZoneName,PreferedActiveZone\". For example: \"zone1,false\"", + }, + }, + Action: func(c *cli.Context) { + toolscommon.CreateConsumerGroup(c, cliHelper, serviceName) + }, + }, + }, + }, + { + Name: "show", + Aliases: []string{"s", "sh", "info", "i"}, + Usage: "show (destination | consumergroup | message | dlq | cgBacklog)", + Subcommands: []cli.Command{ + { + Name: "destination", + Aliases: []string{"d", "dst", "dest"}, + Usage: "show destination ", + Flags: []cli.Flag{ + cli.BoolFlag{ + Name: "showcg, cg", + Usage: "show consumer groups for the destination", + }, + }, + Action: func(c *cli.Context) { + toolscommon.ReadDestination(c, serviceName) + }, + }, + { + Name: "consumergroup", + Aliases: []string{"c", "cg"}, + Usage: "show consumergroup ( | )", + Action: func(c *cli.Context) { + toolscommon.ReadConsumerGroup(c, serviceName) + }, + }, + { + Name: "message", + Aliases: []string{"m"}, + Usage: "show message
", + Action: func(c *cli.Context) { + toolscommon.ReadMessage(c, serviceName) + }, + }, + { + Name: "dlq", + Aliases: []string{"dl"}, + Usage: "show dlq ", + Action: func(c *cli.Context) { + toolscommon.ReadDlq(c, serviceName) + }, + }, + { + Name: "cgBacklog", + Aliases: []string{"cgb", "cb"}, + Usage: "show cgBacklog ( | )", + Action: func(c *cli.Context) { + toolscommon.ReadCgBacklog(c, serviceName) + }, + }, + }, + }, + { + Name: "update", + Aliases: []string{"u"}, + Usage: "update (destination | consumergroup)", + Subcommands: []cli.Command{ + { + Name: "destination", + Aliases: []string{"d", "dst", "dest"}, + Usage: "update destination ", + Flags: []cli.Flag{ + cli.StringFlag{ + Name: "status, s", + Value: "enabled", + Usage: "status: enabled | disabled | sendonly | recvonly", + }, + cli.IntFlag{ + Name: "consumed_messages_retention, cr", + Value: 3600, + Usage: "Consumed messages retention period specified in seconds. Default is one hour.", + }, + cli.IntFlag{ + Name: "unconsumed_messages_retention, ur", + Value: 7200, + Usage: "Unconsumed messages retention period specified in seconds. Default is two hours.", + }, + cli.StringFlag{ + Name: "checksum_option, co", + Value: "", + Usage: "Checksum_options, can be one of the crcIEEE, md5", + }, + cli.StringFlag{ + Name: "owner_email, oe", + Value: cliHelper.GetDefaultOwnerEmail(), + Usage: "The updated owner's email", + }, + cli.StringSliceFlag{ + Name: "zone_config, zc", + Usage: "Zone configs for multi_zone destinations. Format for each zone should be \"ZoneName,AllowPublish,AllowConsume,ReplicaCount\". For example: \"zone1,true,true,3\"", + }, + }, + Action: func(c *cli.Context) { + toolscommon.UpdateDestination(c, cliHelper, serviceName) + }, + }, + { + Name: "consumergroup", + Aliases: []string{"c", "cg"}, + Usage: "update consumergroup ( | )", + Flags: []cli.Flag{ + cli.StringFlag{ + Name: "status, s", + Value: "enabled", + Usage: "status: enabled | disabled", + }, + cli.IntFlag{ + Name: "lock_timeout_seconds, l", + Value: 60, + Usage: strLockTimeoutSeconds, + }, + cli.IntFlag{ + Name: "max_delivery_count, m", + Value: 10, + Usage: strMaxDeliveryCount, + }, + cli.IntFlag{ + Name: "skip_older_messages_in_seconds, k", + Value: intSkipOlderMessagesInSeconds, + Usage: strSkipOlderMessagesInSeconds, + }, + cli.IntFlag{ + Name: "delay_seconds, d", + Value: intDelaySeconds, + Usage: strDelaySeconds, + }, + cli.StringFlag{ + Name: "owner_email, oe", + Value: cliHelper.GetDefaultOwnerEmail(), + Usage: "The updated owner's email", + }, + cli.StringFlag{ + Name: "active_zone, az", + Value: "", + Usage: "The updated active zone", + }, + cli.StringSliceFlag{ + Name: "zone_config, zc", + Usage: "Zone configs for multi_zone consumer group. Format for each zone should be \"ZoneName,PreferedActiveZone\". For example: \"zone1,false\"", + }, + }, + Action: func(c *cli.Context) { + toolscommon.UpdateConsumerGroup(c, cliHelper, serviceName) + }, + }, + }, + }, + { + Name: "delete", + Aliases: []string{"d"}, + Usage: "delete (destination | consumergroup)", + Subcommands: []cli.Command{ + { + Name: "destination", + Aliases: []string{"d", "dst", "dest"}, + Usage: "delete destination ", + Action: func(c *cli.Context) { + toolscommon.DeleteDestination(c, serviceName) + println("deleted destination: ", c.Args().First()) + }, + }, + { + Name: "consumergroup", + Aliases: []string{"c", "cg"}, + Usage: "delete consumergroup [|] ", + Action: func(c *cli.Context) { + toolscommon.DeleteConsumerGroup(c, serviceName) + println("deleted consumergroup: ", c.Args()[0], c.Args()[1]) + }, + }, + }, + }, + { + Name: "list", + Aliases: []string{"l", "ls"}, + Usage: "list (destination | consumergroup)", + Subcommands: []cli.Command{ + { + Name: "destination", + Aliases: []string{"d", "dst", "dest"}, + Usage: "list destination [options]", + Flags: []cli.Flag{ + cli.StringFlag{ + Name: "prefix, pf", + Value: "/", + Usage: "only show destinations of prefix", + }, + cli.StringFlag{ + Name: "status, s", + Value: "", + Usage: "status: enabled | disabled | sendonly | recvonly, if empty, return all", + }, + }, + Action: func(c *cli.Context) { + toolscommon.ListDestinations(c, serviceName) + }, + }, + { + Name: "consumergroup", + Aliases: []string{"c", "cg"}, + Usage: "list consumergroup []", + Action: func(c *cli.Context) { + toolscommon.ListConsumerGroups(c, serviceName) + }, + }, + }, + }, + { + Name: "publish", + Aliases: []string{"p", "pub", "w", "write"}, + Usage: "publish ", + Action: func(c *cli.Context) { + toolscommon.Publish(c, serviceName) + }, + }, + { + Name: "consume", + Aliases: []string{"sub", "r", "read"}, + Usage: "consume [options]", + Flags: []cli.Flag{ + cli.BoolTFlag{ + Name: "autoack, a", + Usage: "automatically ack each message as it's printed", + }, + cli.IntFlag{ + Name: "prefetch_count, p", + Value: 1, + Usage: "prefetch count", + }, + }, + Action: func(c *cli.Context) { + toolscommon.Consume(c, serviceName) + }, + }, + { + Name: "merge_dlq", + Aliases: []string{"mdlq"}, + Usage: "merge_dlq ( | )", + + Action: func(c *cli.Context) { + toolscommon.MergeDLQForConsumerGroup(c, serviceName) + }, + }, + { + Name: "purge_dlq", + Aliases: []string{"pdlq"}, + Usage: "purge_dlq ( | )", + + Action: func(c *cli.Context) { + toolscommon.PurgeDLQForConsumerGroup(c, serviceName) + }, + }, + } +} diff --git a/tools/admin/lib.go b/tools/admin/lib.go index cb039d45..6d9d6fe4 100644 --- a/tools/admin/lib.go +++ b/tools/admin/lib.go @@ -31,7 +31,6 @@ import ( "time" "github.com/codegangsta/cli" - cherami2 "github.com/uber/cherami-client-go/client/cherami" mcli "github.com/uber/cherami-server/clients/metadata" "github.com/uber/cherami-server/common" toolscommon "github.com/uber/cherami-server/tools/common" @@ -44,129 +43,6 @@ const ( adminToolService = "cherami-admin" ) -// CreateDestination creates a destination -func CreateDestination(c *cli.Context, cliHelper common.CliHelper) { - CreateDestinationSecure(c, cliHelper, nil) -} - -// CreateDestinationSecure creates a destination with security enabled -func CreateDestinationSecure(c *cli.Context, cliHelper common.CliHelper, authProvider cherami2.AuthProvider) { - cClient := toolscommon.GetCClientSecure(c, adminToolService, authProvider) - toolscommon.CreateDestination(c, cClient, cliHelper) -} - -// UpdateDestination updates properties of a destination -func UpdateDestination(c *cli.Context, cliHelper common.CliHelper) { - cClient := toolscommon.GetCClient(c, adminToolService) - mClient := toolscommon.GetMClient(c, adminToolService) - toolscommon.UpdateDestination(c, cClient, mClient, cliHelper) -} - -// UpdateDestinationSecure updates properties of a destination with security enabled -func UpdateDestinationSecure(c *cli.Context, cliHelper common.CliHelper, authProvider cherami2.AuthProvider) { - cClient := toolscommon.GetCClientSecure(c, adminToolService, authProvider) - mClient := toolscommon.GetMClient(c, adminToolService) - toolscommon.UpdateDestination(c, cClient, mClient, cliHelper) -} - -// CreateConsumerGroup creates a consumer group -func CreateConsumerGroup(c *cli.Context, cliHelper common.CliHelper) { - CreateConsumerGroupSecure(c, cliHelper, nil) -} - -// CreateConsumerGroupSecure creates a consumer group with security enabled -func CreateConsumerGroupSecure(c *cli.Context, cliHelper common.CliHelper, authProvider cherami2.AuthProvider) { - cClient := toolscommon.GetCClientSecure(c, adminToolService, authProvider) - mClient := toolscommon.GetMClient(c, adminToolService) - toolscommon.CreateConsumerGroup(c, cClient, mClient, cliHelper) -} - -// UpdateConsumerGroup updates properties of a consumer group -func UpdateConsumerGroup(c *cli.Context, cliHelper common.CliHelper) { - cClient := toolscommon.GetCClient(c, adminToolService) - mClient := toolscommon.GetMClient(c, adminToolService) - toolscommon.UpdateConsumerGroup(c, cClient, mClient, cliHelper) -} - -// UpdateConsumerGroupSecure updates properties of a consumer group with security enabled -func UpdateConsumerGroupSecure(c *cli.Context, cliHelper common.CliHelper, authProvider cherami2.AuthProvider) { - cClient := toolscommon.GetCClientSecure(c, adminToolService, authProvider) - mClient := toolscommon.GetMClient(c, adminToolService) - toolscommon.UpdateConsumerGroup(c, cClient, mClient, cliHelper) -} - -// ReadDestination reads a destination -func ReadDestination(c *cli.Context) { - mClient := toolscommon.GetMClient(c, adminToolService) - toolscommon.ReadDestination(c, mClient) -} - -// ReadDlq read Dlq properties -func ReadDlq(c *cli.Context) { - mClient := toolscommon.GetMClient(c, adminToolService) - toolscommon.ReadDlq(c, mClient) -} - -// DeleteDestination deletes a destination -func DeleteDestination(c *cli.Context) { - cClient := toolscommon.GetCClient(c, adminToolService) - toolscommon.DeleteDestination(c, cClient) -} - -// DeleteDestinationSecure deletes a destination with security enabled -func DeleteDestinationSecure(c *cli.Context, authProvider cherami2.AuthProvider) { - cClient := toolscommon.GetCClientSecure(c, adminToolService, authProvider) - toolscommon.DeleteDestination(c, cClient) -} - -// DeleteConsumerGroup deletes a consumer group -func DeleteConsumerGroup(c *cli.Context) { - cClient := toolscommon.GetCClient(c, adminToolService) - toolscommon.DeleteConsumerGroup(c, cClient) -} - -// DeleteConsumerGroupSecure deletes a consumer group with security enabled -func DeleteConsumerGroupSecure(c *cli.Context, authProvider cherami2.AuthProvider) { - cClient := toolscommon.GetCClientSecure(c, adminToolService, authProvider) - toolscommon.DeleteConsumerGroup(c, cClient) -} - -// ReadConsumerGroup reads properties of a consumer group -func ReadConsumerGroup(c *cli.Context) { - mClient := toolscommon.GetMClient(c, adminToolService) - toolscommon.ReadConsumerGroup(c, mClient) -} - -// ReadMessage read a message from store directly -func ReadMessage(c *cli.Context) { - mClient := toolscommon.GetMClient(c, adminToolService) - toolscommon.ReadMessage(c, mClient) -} - -// ListDestinations lists the destinations of matching names -func ListDestinations(c *cli.Context) { - mClient := toolscommon.GetMClient(c, adminToolService) - toolscommon.ListDestinations(c, mClient) -} - -// ListConsumerGroups lists the consumer groups of matching names -func ListConsumerGroups(c *cli.Context) { - cClient := toolscommon.GetCClient(c, adminToolService) - toolscommon.ListConsumerGroups(c, cClient) -} - -// Publish publishs messages -func Publish(c *cli.Context) { - cClient := toolscommon.GetCClient(c, adminToolService) - toolscommon.Publish(c, cClient) -} - -// Consume consumes message -func Consume(c *cli.Context) { - cClient := toolscommon.GetCClient(c, adminToolService) - toolscommon.Consume(c, cClient) -} - // UnloadConsumerGroup unloads the CG on the given outputhost func UnloadConsumerGroup(c *cli.Context) { mClient := toolscommon.GetMClient(c, adminToolService) @@ -183,12 +59,6 @@ func GetCgState(c *cli.Context) { toolscommon.GetConsumerGroupState(c) } -// ReadCgBacklog reads cg backlog -func ReadCgBacklog(c *cli.Context) { - cClient := toolscommon.GetCClient(c, adminToolService) - toolscommon.ReadCgBacklog(c, cClient) -} - type cgAckIDJSONOutputFields struct { Address int64 `json:"address"` SessionID uint16 `json:"sessioni_id"` diff --git a/tools/cli/lib.go b/tools/cli/lib.go deleted file mode 100644 index f6b83e9f..00000000 --- a/tools/cli/lib.go +++ /dev/null @@ -1,174 +0,0 @@ -// Copyright (c) 2016 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package cli - -import ( - "github.com/codegangsta/cli" - cherami2 "github.com/uber/cherami-client-go/client/cherami" - scommon "github.com/uber/cherami-server/common" - "github.com/uber/cherami-server/tools/common" -) - -const ( - serviceName = "cherami-cli" -) - -// ReadCgBacklog reads cg backlog -func ReadCgBacklog(c *cli.Context) { - cClient := common.GetCClient(c, serviceName) - common.ReadCgBacklog(c, cClient) -} - -// CreateDestination creates a destination -func CreateDestination(c *cli.Context, cliHelper scommon.CliHelper) { - CreateDestinationSecure(c, cliHelper, nil) -} - -// CreateDestinationSecure creates a destination with security enabled -func CreateDestinationSecure(c *cli.Context, cliHelper scommon.CliHelper, authProvider cherami2.AuthProvider) { - cClient := common.GetCClientSecure(c, serviceName, authProvider) - common.CreateDestination(c, cClient, cliHelper) -} - -// UpdateDestination updates the destination -func UpdateDestination(c *cli.Context, cliHelper scommon.CliHelper) { - mClient := common.GetMClient(c, serviceName) - cClient := common.GetCClient(c, serviceName) - common.UpdateDestination(c, cClient, mClient, cliHelper) -} - -// UpdateDestinationSecure updates the destination with security enabled -func UpdateDestinationSecure(c *cli.Context, cliHelper scommon.CliHelper, authProvider cherami2.AuthProvider) { - mClient := common.GetMClient(c, serviceName) - cClient := common.GetCClientSecure(c, serviceName, authProvider) - common.UpdateDestination(c, cClient, mClient, cliHelper) -} - -// CreateConsumerGroup creates the CG -func CreateConsumerGroup(c *cli.Context, cliHelper scommon.CliHelper) { - CreateConsumerGroupSecure(c, cliHelper, nil) -} - -// CreateConsumerGroupSecure creates the CG with security enabled -func CreateConsumerGroupSecure(c *cli.Context, cliHelper scommon.CliHelper, authProvider cherami2.AuthProvider) { - mClient := common.GetMClient(c, serviceName) - cClient := common.GetCClientSecure(c, serviceName, authProvider) - common.CreateConsumerGroup(c, cClient, mClient, cliHelper) -} - -// UpdateConsumerGroup updates the CG -func UpdateConsumerGroup(c *cli.Context, cliHelper scommon.CliHelper) { - cClient := common.GetCClient(c, serviceName) - mClient := common.GetMClient(c, serviceName) - common.UpdateConsumerGroup(c, cClient, mClient, cliHelper) -} - -// UpdateConsumerGroupSecure updates the CG with security enabled -func UpdateConsumerGroupSecure(c *cli.Context, cliHelper scommon.CliHelper, authProvider cherami2.AuthProvider) { - cClient := common.GetCClientSecure(c, serviceName, authProvider) - mClient := common.GetMClient(c, serviceName) - common.UpdateConsumerGroup(c, cClient, mClient, cliHelper) -} - -// ReadDestination is used to get info about the destination -func ReadDestination(c *cli.Context) { - mClient := common.GetMClient(c, serviceName) - common.ReadDestination(c, mClient) -} - -// ReadDlq is used to get info about the DLQ destination -func ReadDlq(c *cli.Context) { - mClient := common.GetMClient(c, serviceName) - common.ReadDlq(c, mClient) -} - -// DeleteDestination deletes the given destination -func DeleteDestination(c *cli.Context) { - cClient := common.GetCClient(c, serviceName) - common.DeleteDestination(c, cClient) -} - -// DeleteDestinationSecure deletes the given destination with security enabled -func DeleteDestinationSecure(c *cli.Context, authProvider cherami2.AuthProvider) { - cClient := common.GetCClientSecure(c, serviceName, authProvider) - common.DeleteDestination(c, cClient) -} - -// DeleteConsumerGroup deletes the given CG -func DeleteConsumerGroup(c *cli.Context) { - cClient := common.GetCClient(c, serviceName) - common.DeleteConsumerGroup(c, cClient) -} - -// DeleteConsumerGroupSecure deletes the given CG with security enabled -func DeleteConsumerGroupSecure(c *cli.Context, authProvider cherami2.AuthProvider) { - cClient := common.GetCClientSecure(c, serviceName, authProvider) - common.DeleteConsumerGroup(c, cClient) -} - -// ReadConsumerGroup gets info about the CG -func ReadConsumerGroup(c *cli.Context) { - mClient := common.GetMClient(c, serviceName) - common.ReadConsumerGroup(c, mClient) -} - -// ReadMessage is used to read a message -func ReadMessage(c *cli.Context) { - mClient := common.GetMClient(c, serviceName) - common.ReadMessage(c, mClient) -} - -// ListDestinations is used to list all destinations -func ListDestinations(c *cli.Context) { - mClient := common.GetMClient(c, serviceName) - common.ListDestinations(c, mClient) -} - -// ListConsumerGroups is used to list all CGs -func ListConsumerGroups(c *cli.Context) { - cClient := common.GetCClient(c, serviceName) - common.ListConsumerGroups(c, cClient) -} - -// Publish is used to publish to a given destination -func Publish(c *cli.Context) { - cClient := common.GetCClient(c, serviceName) - common.Publish(c, cClient) -} - -// Consume is used to consume from the given destination for -// the given CG -func Consume(c *cli.Context) { - cClient := common.GetCClient(c, serviceName) - common.Consume(c, cClient) -} - -// MergeDLQForConsumerGroup merges the DLQ for this CG -func MergeDLQForConsumerGroup(c *cli.Context) { - cClient := common.GetCClient(c, serviceName) - common.MergeDLQForConsumerGroup(c, cClient) -} - -// PurgeDLQForConsumerGroup purges the DLQ for this CG -func PurgeDLQForConsumerGroup(c *cli.Context) { - cClient := common.GetCClient(c, serviceName) - common.PurgeDLQForConsumerGroup(c, cClient) -} diff --git a/tools/common/lib.go b/tools/common/lib.go index 46e4c9d0..a4600404 100644 --- a/tools/common/lib.go +++ b/tools/common/lib.go @@ -227,8 +227,8 @@ func getChecksumOptionParam(optionStr string) cherami.ChecksumOption { return cherami.ChecksumOption_CRC32IEEE } -// CreateDestination create destination -func CreateDestination(c *cli.Context, cClient ccli.Client, cliHelper common.CliHelper) { +// CreateDestinationHelper create destination +func CreateDestinationHelper(c *cli.Context, cClient ccli.Client, cliHelper common.CliHelper) { if len(c.Args()) < 1 { ExitIfError(errors.New(strNotEnoughArgs)) } @@ -331,8 +331,8 @@ func getDestZoneConfigs(c *cli.Context, cliHelper common.CliHelper) cherami.Dest return zoneConfigs } -// UpdateDestination update destination based on cli -func UpdateDestination(c *cli.Context, cClient ccli.Client, mClient mcli.Client, cliHelper common.CliHelper) { +// UpdateDestinationHelper update destination based on cli +func UpdateDestinationHelper(c *cli.Context, cClient ccli.Client, mClient mcli.Client, cliHelper common.CliHelper) { if len(c.Args()) < 1 { ExitIfError(errors.New(strNotEnoughArgs)) } @@ -398,8 +398,8 @@ func UpdateDestination(c *cli.Context, cClient ccli.Client, mClient mcli.Client, fmt.Printf("%v\n", Jsonify(desc)) } -// CreateConsumerGroup create consumer group based on cli.Context -func CreateConsumerGroup(c *cli.Context, cClient ccli.Client, mClient mcli.Client, cliHelper common.CliHelper) { +// CreateConsumerGroupHelper create consumer group based on cli.Context +func CreateConsumerGroupHelper(c *cli.Context, cClient ccli.Client, mClient mcli.Client, cliHelper common.CliHelper) { if len(c.Args()) < 2 { ExitIfError(errors.New(strNotEnoughArgs)) } @@ -516,8 +516,8 @@ func getCgZoneConfigs(c *cli.Context, mClient mcli.Client, cliHelper common.CliH return zoneConfigs } -// UpdateConsumerGroup update the consumer group based on cli.Context -func UpdateConsumerGroup(c *cli.Context, cClient ccli.Client, mClient mcli.Client, cliHelper common.CliHelper) { +// UpdateConsumerGroupHelper update the consumer group based on cli.Context +func UpdateConsumerGroupHelper(c *cli.Context, cClient ccli.Client, mClient mcli.Client, cliHelper common.CliHelper) { var path, name string @@ -843,8 +843,8 @@ func printDest(dest *shared.DestinationDescription) { fmt.Fprintln(os.Stdout, string(outputStr)) } -// ReadDestination return the detail for dest, and also consumer group for this dest -func ReadDestination(c *cli.Context, mClient mcli.Client) { +// ReadDestinationHelper return the detail for dest, and also consumer group for this dest +func ReadDestinationHelper(c *cli.Context, mClient mcli.Client) { if len(c.Args()) < 1 { ExitIfError(errors.New(strNotEnoughArgs)) } @@ -895,8 +895,8 @@ func readDestinationFromMetadata(mClient mcli.Client, path string) (*shared.Dest }) } -// ReadDlq return the info for dlq dest and related consumer group -func ReadDlq(c *cli.Context, mClient mcli.Client) { +// ReadDlqHelper return the info for dlq dest and related consumer group +func ReadDlqHelper(c *cli.Context, mClient mcli.Client) { if len(c.Args()) < 1 { ExitIfError(errors.New(strNotEnoughArgs)) } @@ -921,8 +921,8 @@ func ReadDlq(c *cli.Context, mClient mcli.Client) { printCG(resp) } -// ReadCgBacklog reads the CG back log -func ReadCgBacklog(c *cli.Context, cClient ccli.Client) { +// ReadCgBacklogHelper reads the CG back log +func ReadCgBacklogHelper(c *cli.Context, cClient ccli.Client) { var cg, dst string var dstPtr *string if len(c.Args()) < 1 { @@ -955,8 +955,8 @@ func ReadCgBacklog(c *cli.Context, cClient ccli.Client) { fmt.Println(backlog.GetValue()) } -// DeleteDestination delete the destination based on Cli.Context -func DeleteDestination(c *cli.Context, cClient ccli.Client) { +// DeleteDestinationHelper delete the destination based on Cli.Context +func DeleteDestinationHelper(c *cli.Context, cClient ccli.Client) { if len(c.Args()) < 1 { ExitIfError(errors.New(strNotEnoughArgs)) } @@ -968,8 +968,8 @@ func DeleteDestination(c *cli.Context, cClient ccli.Client) { ExitIfError(err) } -// DeleteConsumerGroup delete the consumer group based on Cli.Context -func DeleteConsumerGroup(c *cli.Context, cClient ccli.Client) { +// DeleteConsumerGroupHelper delete the consumer group based on Cli.Context +func DeleteConsumerGroupHelper(c *cli.Context, cClient ccli.Client) { if len(c.Args()) < 2 { ExitIfError(errors.New(strNotEnoughArgs)) } @@ -1022,8 +1022,8 @@ func printCG(cg *shared.ConsumerGroupDescription) { fmt.Fprintln(os.Stdout, string(outputStr)) } -// ReadConsumerGroup return the consumer group information -func ReadConsumerGroup(c *cli.Context, mClient mcli.Client) { +// ReadConsumerGroupHelper return the consumer group information +func ReadConsumerGroupHelper(c *cli.Context, mClient mcli.Client) { if len(c.Args()) < 1 { ExitIfError(errors.New(strCGSpecIncorrectArgs)) } @@ -1051,8 +1051,8 @@ func ReadConsumerGroup(c *cli.Context, mClient mcli.Client) { } } -// MergeDLQForConsumerGroup return the consumer group information -func MergeDLQForConsumerGroup(c *cli.Context, cClient ccli.Client) { +// MergeDLQForConsumerGroupHelper return the consumer group information +func MergeDLQForConsumerGroupHelper(c *cli.Context, cClient ccli.Client) { var err error switch len(c.Args()) { default: @@ -1074,8 +1074,8 @@ func MergeDLQForConsumerGroup(c *cli.Context, cClient ccli.Client) { ExitIfError(err) } -// PurgeDLQForConsumerGroup return the consumer group information -func PurgeDLQForConsumerGroup(c *cli.Context, cClient ccli.Client) { +// PurgeDLQForConsumerGroupHelper return the consumer group information +func PurgeDLQForConsumerGroupHelper(c *cli.Context, cClient ccli.Client) { var err error switch len(c.Args()) { default: @@ -1131,8 +1131,8 @@ func matchDestStatus(status string, wantStatus shared.DestinationStatus) bool { return false } -// ReadMessage implement for show msg command line -func ReadMessage(c *cli.Context, mClient mcli.Client) { +// ReadMessageHelper implement for show msg command line +func ReadMessageHelper(c *cli.Context, mClient mcli.Client) { if len(c.Args()) < 2 { ExitIfError(errors.New("not enough arguments, need to specify both extent uuid and message address")) } @@ -1198,8 +1198,8 @@ type messageJSONOutputFields struct { EnqueueTimeUtc time.Time `json:"enqueueTimeUtc,omitempty"` } -// ListDestinations return destinations based on the Cli.Context -func ListDestinations(c *cli.Context, mClient mcli.Client) { +// ListDestinationsHelper return destinations based on the Cli.Context +func ListDestinationsHelper(c *cli.Context, mClient mcli.Client) { prefix := string(c.String("prefix")) included := string(c.String("include")) @@ -1303,8 +1303,8 @@ func ListDestinations(c *cli.Context, mClient mcli.Client) { } } -// ListConsumerGroups return the consumer groups based on the destination provided -func ListConsumerGroups(c *cli.Context, cClient ccli.Client) { +// ListConsumerGroupsHelper return the consumer groups based on the destination provided +func ListConsumerGroupsHelper(c *cli.Context, cClient ccli.Client) { if len(c.Args()) < 1 { ExitIfError(errors.New(strNotEnoughArgs)) } @@ -1336,8 +1336,8 @@ func ListConsumerGroups(c *cli.Context, cClient ccli.Client) { } } -// Publish start to pusblish to the destination provided -func Publish(c *cli.Context, cClient ccli.Client) { +// PublishHelper start to pusblish to the destination provided +func PublishHelper(c *cli.Context, cClient ccli.Client) { if len(c.Args()) < 1 { ExitIfError(errors.New(strNotEnoughArgs)) } @@ -1410,8 +1410,8 @@ type kafkaMessageJSON struct { Msg string } -// Consume start to consume from the destination -func Consume(c *cli.Context, cClient ccli.Client) { +// ConsumeHelper start to consume from the destination +func ConsumeHelper(c *cli.Context, cClient ccli.Client) { var err error if len(c.Args()) < 2 { ExitIfError(errors.New(strNotEnoughArgs)) @@ -1988,3 +1988,128 @@ func StoreListExtents(c *cli.Context, mClient mcli.Client) { fmt.Fprintln(os.Stdout, string(outputStr)) } } + +// CreateDestination creates a destination +func CreateDestination(c *cli.Context, cliHelper common.CliHelper, serviceName string) { + CreateDestinationSecure(c, cliHelper, nil, serviceName) +} + +// CreateDestinationSecure creates a destination with security enabled +func CreateDestinationSecure( + c *cli.Context, + cliHelper common.CliHelper, + authProvider ccli.AuthProvider, + serviceName string, +) { + cClient := GetCClientSecure(c, serviceName, authProvider) + CreateDestinationHelper(c, cClient, cliHelper) +} + +// CreateConsumerGroup creates the CG +func CreateConsumerGroup(c *cli.Context, cliHelper common.CliHelper, serviceName string) { + CreateConsumerGroupSecure(c, cliHelper, nil, serviceName) +} + +// CreateConsumerGroupSecure creates the CG with security enabled +func CreateConsumerGroupSecure( + c *cli.Context, + cliHelper common.CliHelper, + authProvider ccli.AuthProvider, + serviceName string, +) { + mClient := GetMClient(c, serviceName) + cClient := GetCClientSecure(c, serviceName, authProvider) + CreateConsumerGroupHelper(c, cClient, mClient, cliHelper) +} + +// ReadDestination is used to get info about the destination +func ReadDestination(c *cli.Context, serviceName string) { + mClient := GetMClient(c, serviceName) + ReadDestinationHelper(c, mClient) +} + +// ReadConsumerGroup gets info about the CG +func ReadConsumerGroup(c *cli.Context, serviceName string) { + mClient := GetMClient(c, serviceName) + ReadConsumerGroupHelper(c, mClient) +} + +// ReadMessage is used to read a message +func ReadMessage(c *cli.Context, serviceName string) { + mClient := GetMClient(c, serviceName) + ReadMessageHelper(c, mClient) +} + +// ReadDlq is used to get info about the DLQ destination +func ReadDlq(c *cli.Context, serviceName string) { + mClient := GetMClient(c, serviceName) + ReadDlqHelper(c, mClient) +} + +// ReadCgBacklog reads cg backlog +func ReadCgBacklog(c *cli.Context, serviceName string) { + cClient := GetCClient(c, serviceName) + ReadCgBacklogHelper(c, cClient) +} + +// UpdateDestination updates the destination +func UpdateDestination(c *cli.Context, cliHelper common.CliHelper, serviceName string) { + mClient := GetMClient(c, serviceName) + cClient := GetCClient(c, serviceName) + UpdateDestinationHelper(c, cClient, mClient, cliHelper) +} + +// UpdateConsumerGroup updates the CG +func UpdateConsumerGroup(c *cli.Context, cliHelper common.CliHelper, serviceName string) { + cClient := GetCClient(c, serviceName) + mClient := GetMClient(c, serviceName) + UpdateConsumerGroupHelper(c, cClient, mClient, cliHelper) +} + +// DeleteDestination deletes the given destination +func DeleteDestination(c *cli.Context, serviceName string) { + cClient := GetCClient(c, serviceName) + DeleteDestinationHelper(c, cClient) +} + +// DeleteConsumerGroup deletes the given CG +func DeleteConsumerGroup(c *cli.Context, serviceName string) { + cClient := GetCClient(c, serviceName) + DeleteConsumerGroupHelper(c, cClient) +} + +// ListDestinations is used to list all destinations +func ListDestinations(c *cli.Context, serviceName string) { + mClient := GetMClient(c, serviceName) + ListDestinationsHelper(c, mClient) +} + +// ListConsumerGroups is used to list all CGs +func ListConsumerGroups(c *cli.Context, serviceName string) { + cClient := GetCClient(c, serviceName) + ListConsumerGroupsHelper(c, cClient) +} + +// Publish is used to publish to a given destination +func Publish(c *cli.Context, serviceName string) { + cClient := GetCClient(c, serviceName) + PublishHelper(c, cClient) +} + +// Consume is used to consume from the given destination for the given CG +func Consume(c *cli.Context, serviceName string) { + cClient := GetCClient(c, serviceName) + ConsumeHelper(c, cClient) +} + +// MergeDLQForConsumerGroup merges the DLQ for this CG +func MergeDLQForConsumerGroup(c *cli.Context, serviceName string) { + cClient := GetCClient(c, serviceName) + MergeDLQForConsumerGroupHelper(c, cClient) +} + +// PurgeDLQForConsumerGroup purges the DLQ for this CG +func PurgeDLQForConsumerGroup(c *cli.Context, serviceName string) { + cClient := GetCClient(c, serviceName) + PurgeDLQForConsumerGroupHelper(c, cClient) +}