diff --git a/cmd/kafka-connect/kafka-connect.go b/cmd/kafka-connect/kafka-connect.go index 0992ab5..334e437 100644 --- a/cmd/kafka-connect/kafka-connect.go +++ b/cmd/kafka-connect/kafka-connect.go @@ -46,8 +46,10 @@ var ( tasksCmd = app.Command("tasks", "Displays tasks currently running for a connector.") tasksName = tasksCmd.Arg("name", "Name of the connector to look up.").Required().String() + statusCmd = app.Command("status", "Gets current status of a connector.") + statusName = statusCmd.Arg("name", "Name of the connector to look up.").Required().String() + // TODO: New stuff - // status // pause // resume // restart @@ -92,6 +94,9 @@ func run() error { case tasksCmd.FullCommand(): apiResult, _, err = client.GetConnectorTasks(*tasksName) + + case statusCmd.FullCommand(): + apiResult, _, err = client.GetConnectorStatus(*statusName) } if err != nil { diff --git a/connectors.go b/connectors.go index e216af3..24f9b8e 100644 --- a/connectors.go +++ b/connectors.go @@ -37,6 +37,32 @@ type TaskID struct { ID int `json:"task"` } +// ConnectorStatus reflects the status of a Connector and state of its Tasks. +// +// Having connector name and a "connector" object at top level is a little +// awkward and produces stuttering, but it's their design, not ours. +type ConnectorStatus struct { + Name string `json:"name"` + Connector ConnectorState `json:"connector"` + Tasks []TaskState `json:"tasks"` +} + +// ConnectorState reflects the running state of a Connector and the worker where +// it is running. +type ConnectorState struct { + State string `json:"state"` + WorkerID string `json:"worker_id"` +} + +// TaskState reflects the running state of a Task and the worker where it is +// running. +type TaskState struct { + ID int `json:"id"` + State string `json:"state"` + WorkerID string `json:"worker_id"` + Trace string `json:"trace,omitempty"` +} + // TODO: Probably need to URL-encode connector names // CreateConnector creates a new connector instance. It returns an error if @@ -92,6 +118,18 @@ func (c *Client) GetConnectorTasks(name string) ([]Task, *http.Response, error) return tasks, response, err } +// GetConnectorStatus gets current status of the connector, including whether it +// is running, failed or paused, which worker it is assigned to, error +// information if it has failed, and the state of all its tasks. +// +// See: http://docs.confluent.io/current/connect/userguide.html#get--connectors-(string-name)-status +func (c *Client) GetConnectorStatus(name string) (*ConnectorStatus, *http.Response, error) { + path := fmt.Sprintf("connectors/%v/status", name) + status := new(ConnectorStatus) + response, err := c.get(path, status) + return status, response, err +} + // UpdateConnectorConfig updates configuration for an existing connector with // the given name. If the connector does not exist, it will be created. // diff --git a/connectors_test.go b/connectors_test.go index 87e0423..83c07b3 100644 --- a/connectors_test.go +++ b/connectors_test.go @@ -201,6 +201,61 @@ var _ = Describe("Connectors", func() { }) }) + Describe("GetConnectorStatus", func() { + var resultStatus *ConnectorStatus + var statusCode int + + BeforeEach(func() { + resultStatus = &ConnectorStatus{ + Name: "local-file-source", + Connector: ConnectorState{ + State: "RUNNING", + WorkerID: "127.0.0.1:8083", + }, + Tasks: []TaskState{ + TaskState{ + ID: 0, + State: "RUNNING", + WorkerID: "127.0.0.1:8083", + }, + }, + } + + server.AppendHandlers( + ghttp.CombineHandlers( + ghttp.VerifyRequest("GET", "/connectors/local-file-source/status"), + ghttp.VerifyHeader(jsonAcceptHeader), + ghttp.RespondWithJSONEncodedPtr(&statusCode, &resultStatus), + ), + ) + }) + + Context("when existing connector name is given", func() { + BeforeEach(func() { + statusCode = http.StatusOK + }) + + It("returns connector status", func() { + status, _, err := client.GetConnectorStatus("local-file-source") + Expect(err).NotTo(HaveOccurred()) + Expect(status).To(Equal(resultStatus)) + }) + }) + + Context("when nonexisting connector name is given", func() { + BeforeEach(func() { + statusCode = http.StatusNotFound + }) + + It("returns an error response", func() { + status, resp, err := client.GetConnectorStatus("local-file-source") + Expect(err).To(HaveOccurred()) + Expect(*status).To(BeZero()) + Expect(resp.StatusCode).To(Equal(http.StatusNotFound)) + }) + }) + }) + Describe("UpdateConnectorConfig", func() { It("returns updated connector when successful", func() { connector, err := UpdateConnectorConfig("test", ConnectorConfig{})