-
Notifications
You must be signed in to change notification settings - Fork 29
/
flink_application.go
158 lines (130 loc) · 4.87 KB
/
flink_application.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
package aiven
import "context"
type (
// FlinkApplicationHandler is the client which interacts with the Flink Application.
FlinkApplicationHandler struct {
client *Client
}
// GenericFlinkApplicationResponse is the generic response for Flink Application requests.
// GET https://api.aiven.io/v1/project/{project}/service/{service_name}/flink/application
GenericFlinkApplicationResponse struct {
APIResponse
ID string `json:"id"`
Name string `json:"name"`
CreatedAt string `json:"created_at"`
CreatedBy string `json:"created_by"`
UpdatedAt string `json:"updated_at"`
UpdatedBy string `json:"updated_by"`
}
// DetailedFlinkApplicationResponse is the detailed response for Flink Application requests.
// GET /project/{project}/service/{service_name}/flink/application/{application_id}
// POST /project/{project}/service/{service_name}/flink/application
// PUT /project/{project}/service/{service_name}/flink/application/{application_id}
// DELETE /project/{project}/service/{service_name}/flink/application/{application_id}
DetailedFlinkApplicationResponse struct {
GenericFlinkApplicationResponse
ApplicationVersions []FlinkApplicationVersion `json:"application_versions"`
CurrentDeployment FlinkApplicationDeployment `json:"current_deployment"`
}
// CreateFlinkApplicationRequest is the request to create a Flink Application.
// POST /project/{project}/service/{service_name}/flink/application
CreateFlinkApplicationRequest struct {
Name string `json:"name"`
ApplicationVersion *FlinkApplicationVersion `json:"application_version,omitempty"`
}
FlinkApplicationVersionCreateInput struct {
CreateTable string `json:"create_table"`
IntegrationID string `json:"integration_id"`
}
FlinkApplicationVersion struct {
Sinks []FlinkApplicationVersionCreateInput `json:"sinks"`
Sources []FlinkApplicationVersionCreateInput `json:"sources"`
Statement string `json:"statement"`
}
// UpdateFlinkApplicationRequest is the request to update a Flink Application.
// PUT /project/{project}/service/{service_name}/flink/application/{application_id}
UpdateFlinkApplicationRequest struct {
Name string `json:"name,omitempty"`
}
// FlinkApplicationListResponse is the response for listing Flink Applications.
// GET /project/{project}/service/{service_name}/flink/application
FlinkApplicationListResponse struct {
APIResponse
Applications []GenericFlinkApplicationResponse `json:"applications"`
}
)
// Get is the method to get a Flink Application.
func (h *FlinkApplicationHandler) Get(
ctx context.Context,
project string,
service string,
applicationID string,
) (*DetailedFlinkApplicationResponse, error) {
path := buildPath("project", project, "service", service, "flink", "application", applicationID)
bts, err := h.client.doGetRequest(ctx, path, nil)
if err != nil {
return nil, err
}
var r DetailedFlinkApplicationResponse
return &r, checkAPIResponse(bts, &r)
}
// Create is the method to create a Flink Application.
func (h *FlinkApplicationHandler) Create(
ctx context.Context,
project string,
service string,
req CreateFlinkApplicationRequest,
) (*DetailedFlinkApplicationResponse, error) {
path := buildPath("project", project, "service", service, "flink", "application")
bts, err := h.client.doPostRequest(ctx, path, req)
if err != nil {
return nil, err
}
var r DetailedFlinkApplicationResponse
return &r, checkAPIResponse(bts, &r)
}
// Update is the method to update a Flink Application.
func (h *FlinkApplicationHandler) Update(
ctx context.Context,
project string,
service string,
applicationID string,
req UpdateFlinkApplicationRequest,
) (*DetailedFlinkApplicationResponse, error) {
path := buildPath("project", project, "service", service, "flink", "application", applicationID)
bts, err := h.client.doPutRequest(ctx, path, req)
if err != nil {
return nil, err
}
var r DetailedFlinkApplicationResponse
return &r, checkAPIResponse(bts, &r)
}
// Delete is the method to delete a Flink Application.
func (h *FlinkApplicationHandler) Delete(
ctx context.Context,
project string,
service string,
applicationID string,
) (*DetailedFlinkApplicationResponse, error) {
path := buildPath("project", project, "service", service, "flink", "application", applicationID)
bts, err := h.client.doDeleteRequest(ctx, path, nil)
if err != nil {
return nil, err
}
var r DetailedFlinkApplicationResponse
return &r, checkAPIResponse(bts, &r)
}
// List is the method to list Flink Applications.
func (h *FlinkApplicationHandler) List(
ctx context.Context,
project string,
service string,
) (*FlinkApplicationListResponse, error) {
path := buildPath("project", project, "service", service, "flink", "application")
bts, err := h.client.doGetRequest(ctx, path, nil)
if err != nil {
return nil, err
}
var r FlinkApplicationListResponse
return &r, checkAPIResponse(bts, &r)
}