diff --git a/cmd/kafkaconnectcli/kafkaconnectcli.go b/cmd/kafkaconnectcli/kafkaconnectcli.go index 66f7864..95ffc17 100644 --- a/cmd/kafkaconnectcli/kafkaconnectcli.go +++ b/cmd/kafkaconnectcli/kafkaconnectcli.go @@ -235,7 +235,7 @@ func main() { case "status": utils.PrintJson(client.Status(conn), *args.pretty) case "delete": - client.Delete(conn) + deleteConnector(client, conn) case "resume": client.Resume(conn) case "pause": @@ -293,9 +293,9 @@ func main() { } name := config["name"] delete(config, "name") - jsonConfig, _ = json.Marshal(connect.Config{Name: name, Config: config}) + jsonConfig, _ = json.Marshal(connect.ConnectorConfig{Name: name, Config: config}) } - var config connect.Config + var config connect.ConnectorConfig fmt.Println(string(jsonConfig)) err := json.Unmarshal(jsonConfig, &config) if err != nil { @@ -321,13 +321,22 @@ func main() { case "delete-all": matchConnectors := findMatchingConnectors(client, func(_ string) bool { return true }) for _, conn := range matchConnectors { - client.Delete(conn) + deleteConnector(client, conn) } } } os.Exit(0) } +func deleteConnector(client connect.ConnectRestClient, connector string) { + fmt.Fprintf(os.Stdin, "\nCurrent configuration for connector %s\n\n", connector) + connectorTasks := client.GetConfig(connector) + config, _ := json.Marshal(connect.ConnectorConfig{Name: connectorTasks.Name, Config: connectorTasks.Config}) + utils.PrintJson(string(config), true) + fmt.Fprint(os.Stdin, "\nSave this to use as the `-config.json` option during rollback connector\n\n") + client.Delete(connector) +} + func findMatchingConnectors(client connect.ConnectRestClient, fn func(string) bool) []string { var matchConnectors []string for _, conn := range client.List() { diff --git a/connect/kafkaconnect.go b/connect/kafkaconnect.go index 0e74154..4c411e1 100644 --- a/connect/kafkaconnect.go +++ b/connect/kafkaconnect.go @@ -24,7 +24,7 @@ import ( ) // ConnectorStatus describes the configuration of a connector and its tasks. -type ConnectorConfig struct { +type ConnectorTasksConfig struct { Name string `json:"name"` Config map[string]string `json:"config"` Tasks []struct { @@ -48,7 +48,7 @@ type ConnectorStatus struct { } `json:"tasks"` } -type Config struct { +type ConnectorConfig struct { Name string `json:"name"` Config map[string]string `json:"config"` } @@ -126,9 +126,9 @@ func (client *ConnectRestClient) Tasks(connector string) string { // GetConfig retrieves the configuration for the specified connector. // Return a new ConnectorConfig struct. -func (client *ConnectRestClient) GetConfig(connector string) ConnectorConfig { +func (client *ConnectRestClient) GetConfig(connector string) ConnectorTasksConfig { response := sendGetResponse("GET", client.connectEndPoint()+connector, "") - var config ConnectorConfig + var config ConnectorTasksConfig err := json.Unmarshal([]byte(response), &config) if err != nil { panic(err) @@ -144,8 +144,10 @@ func (client *ConnectRestClient) Pause(connector string) { // Delete deletes all tasks for the specified connector name. func (client *ConnectRestClient) Delete(connector string) { - fmt.Fprintf(os.Stdin, "Deleting connector %s \n", connector) - send("DELETE", client.connectEndPoint()+connector) + statusCode := send("DELETE", client.connectEndPoint()+connector) + if statusCode == 204 { + fmt.Fprintf(os.Stdin, "Successfully deleted connector %s \n", connector) + } } // Resume resumes all tasks for the specified connector name. @@ -162,7 +164,7 @@ func (client *ConnectRestClient) Restart(connector string, id int) { // Create submit a new connector configuration. // Return a JSON string describing the new connector configuration. -func (client *ConnectRestClient) Create(config Config) string { +func (client *ConnectRestClient) Create(config ConnectorConfig) string { body, _ := json.Marshal(config) return sendGetResponse("POST", client.connectEndPoint(), string(body)) } @@ -186,7 +188,7 @@ func sendGetResponse(method string, url string, content string) string { return string(body) } -func send(method string, url string) { +func send(method string, url string) (statusCode int) { req, err := http.NewRequest(method, url, bytes.NewBuffer(nil)) client := &http.Client{} @@ -195,4 +197,7 @@ func send(method string, url string) { panic(err) } defer resp.Body.Close() + + statusCode = resp.StatusCode + return }