Skip to content

Commit

Permalink
Implement the connector status endpoint and command
Browse files Browse the repository at this point in the history
  • Loading branch information
ches committed Aug 3, 2016
1 parent 32e4289 commit aed8b9a
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 1 deletion.
7 changes: 6 additions & 1 deletion cmd/kafka-connect/kafka-connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,10 @@ var (
tasksCmd = app.Command("tasks", "Displays tasks currently running for a connector.")
tasksName = tasksCmd.Arg("name", "Name of the connector to look up.").Required().String()

statusCmd = app.Command("status", "Gets current status of a connector.")
statusName = statusCmd.Arg("name", "Name of the connector to look up.").Required().String()

// TODO: New stuff
// status
// pause
// resume
// restart
Expand Down Expand Up @@ -92,6 +94,9 @@ func run() error {

case tasksCmd.FullCommand():
apiResult, _, err = client.GetConnectorTasks(*tasksName)

case statusCmd.FullCommand():
apiResult, _, err = client.GetConnectorStatus(*statusName)
}

if err != nil {
Expand Down
38 changes: 38 additions & 0 deletions connectors.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,32 @@ type TaskID struct {
ID int `json:"task"`
}

// ConnectorStatus reflects the status of a Connector and state of its Tasks.
//
// Having connector name and a "connector" object at top level is a little
// awkward and produces stuttering, but it's their design, not ours.
type ConnectorStatus struct {
Name string `json:"name"`
Connector ConnectorState `json:"connector"`
Tasks []TaskState `json:"tasks"`
}

// ConnectorState reflects the running state of a Connector and the worker where
// it is running.
type ConnectorState struct {
State string `json:"state"`
WorkerID string `json:"worker_id"`
}

// TaskState reflects the running state of a Task and the worker where it is
// running.
type TaskState struct {
ID int `json:"id"`
State string `json:"state"`
WorkerID string `json:"worker_id"`
Trace string `json:"trace,omitempty"`
}

// TODO: Probably need to URL-encode connector names

// CreateConnector creates a new connector instance. It returns an error if
Expand Down Expand Up @@ -92,6 +118,18 @@ func (c *Client) GetConnectorTasks(name string) ([]Task, *http.Response, error)
return tasks, response, err
}

// GetConnectorStatus gets current status of the connector, including whether it
// is running, failed or paused, which worker it is assigned to, error
// information if it has failed, and the state of all its tasks.
//
// See: http://docs.confluent.io/current/connect/userguide.html#get--connectors-(string-name)-status
func (c *Client) GetConnectorStatus(name string) (*ConnectorStatus, *http.Response, error) {
path := fmt.Sprintf("connectors/%v/status", name)
status := new(ConnectorStatus)
response, err := c.get(path, status)
return status, response, err
}

// UpdateConnectorConfig updates configuration for an existing connector with
// the given name. If the connector does not exist, it will be created.
//
Expand Down
55 changes: 55 additions & 0 deletions connectors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,61 @@ var _ = Describe("Connectors", func() {
})
})

Describe("GetConnectorStatus", func() {
var resultStatus *ConnectorStatus
var statusCode int

BeforeEach(func() {
resultStatus = &ConnectorStatus{
Name: "local-file-source",
Connector: ConnectorState{
State: "RUNNING",
WorkerID: "127.0.0.1:8083",
},
Tasks: []TaskState{
TaskState{
ID: 0,
State: "RUNNING",
WorkerID: "127.0.0.1:8083",
},
},
}

server.AppendHandlers(
ghttp.CombineHandlers(
ghttp.VerifyRequest("GET", "/connectors/local-file-source/status"),
ghttp.VerifyHeader(jsonAcceptHeader),
ghttp.RespondWithJSONEncodedPtr(&statusCode, &resultStatus),
),
)
})

Context("when existing connector name is given", func() {
BeforeEach(func() {
statusCode = http.StatusOK
})

It("returns connector status", func() {
status, _, err := client.GetConnectorStatus("local-file-source")
Expect(err).NotTo(HaveOccurred())
Expect(status).To(Equal(resultStatus))
})
})

Context("when nonexisting connector name is given", func() {
BeforeEach(func() {
statusCode = http.StatusNotFound
})

It("returns an error response", func() {
status, resp, err := client.GetConnectorStatus("local-file-source")
Expect(err).To(HaveOccurred())
Expect(*status).To(BeZero())
Expect(resp.StatusCode).To(Equal(http.StatusNotFound))
})
})
})

Describe("UpdateConnectorConfig", func() {
It("returns updated connector when successful", func() {
connector, err := UpdateConnectorConfig("test", ConnectorConfig{})
Expand Down

0 comments on commit aed8b9a

Please sign in to comment.