diff --git a/api/bindings/v1alpha1/bindingconfiguration_types.go b/api/bindings/v1alpha1/bindingconfiguration_types.go index d3c8287e..bab2cbcd 100644 --- a/api/bindings/v1alpha1/bindingconfiguration_types.go +++ b/api/bindings/v1alpha1/bindingconfiguration_types.go @@ -25,7 +25,7 @@ SOFTWARE. package v1alpha1 import ( - "github.com/ngrok/ngrok-api-go/v5" + v6 "github.com/ngrok/ngrok-api-go/v6" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -104,7 +104,7 @@ type BindingConfigurationStatus struct { // BindingEndpoint is a reference to an Endpoint object in the ngrok API that is attached to the kubernetes operator binding type BindingEndpoint struct { // Ref is the ngrok API reference to the Endpoint object (id, uri) - ngrok.Ref `json:",inline"` + v6.Ref `json:",inline"` // +kubebuilder:validation:Required // +kubebuilder:default="unknown" diff --git a/api/bindings/v1alpha1/endpointbinding_types.go b/api/bindings/v1alpha1/endpointbinding_types.go index d332500f..3d02b44b 100644 --- a/api/bindings/v1alpha1/endpointbinding_types.go +++ b/api/bindings/v1alpha1/endpointbinding_types.go @@ -33,6 +33,15 @@ import ( // EndpointBindingSpec defines the desired state of EndpointBinding type EndpointBindingSpec struct { + // EndpointURI is the unique identifier + // representing the EndpointBinding + its Endpoints + // Format: ://.: + // + // +kubebuilder:validation:Required + // See: https://regex101.com/r/9QkXWl/1 + // +kubebuilder:validation:Pattern=`^((?P(tcp|http|https|tls)?)://)?(?P[a-z][a-zA-Z0-9-]{0,62})\.(?P[a-z][a-zA-Z0-9-]{0,62})(:(?P\d+))?$` + EndpointURI string `json:"endpointURI"` + // Scheme is a user-defined field for endpoints that describe how the data packets // are framed by the pod forwarders mTLS connection to the ngrok edge // +kubebuilder:validation:Required diff --git a/helm/ngrok-operator/templates/crds/bindings.k8s.ngrok.com_endpointbindings.yaml b/helm/ngrok-operator/templates/crds/bindings.k8s.ngrok.com_endpointbindings.yaml index 9e992e20..63bd6eac 100644 --- a/helm/ngrok-operator/templates/crds/bindings.k8s.ngrok.com_endpointbindings.yaml +++ b/helm/ngrok-operator/templates/crds/bindings.k8s.ngrok.com_endpointbindings.yaml @@ -52,6 +52,16 @@ spec: spec: description: EndpointBindingSpec defines the desired state of EndpointBinding properties: + endpointURI: + description: |- + EndpointURI is the unique identifier + representing the EndpointBinding + its Endpoints + Format: ://.: + + + See: https://regex101.com/r/9QkXWl/1 + pattern: ^((?P(tcp|http|https|tls)?)://)?(?P[a-z][a-zA-Z0-9-]{0,62})\.(?P[a-z][a-zA-Z0-9-]{0,62})(:(?P\d+))?$ + type: string port: description: Port is the Service port this Endpoint uses internally to communicate with its pod forwarders @@ -121,6 +131,7 @@ spec: - service type: object required: + - endpointURI - port - scheme - target diff --git a/internal/ngrokapi/bindingendpoint_aggregator.go b/internal/ngrokapi/bindingendpoint_aggregator.go new file mode 100644 index 00000000..3ac3c90e --- /dev/null +++ b/internal/ngrokapi/bindingendpoint_aggregator.go @@ -0,0 +1,168 @@ +package ngrokapi + +import ( + "fmt" + "net/url" + "strconv" + "strings" + + v6 "github.com/ngrok/ngrok-api-go/v6" + bindingsv1alpha1 "github.com/ngrok/ngrok-operator/api/bindings/v1alpha1" +) + +var ( + defaultScheme = "https" + defaultPort = map[string]int32{ + "http": 80, + "https": 443, + "tls": 443, + } +) + +// AggregatedEndpoints is a map of hostport to BindingEndpoint (partially filled in) +type AggregatedEndpoints map[string]bindingsv1alpha1.EndpointBinding + +// AggregateBindingEndpoints aggregates the endpoints into a map of hostport to BindingEndpoint +// by parsing the hostport 4-tuple into each piece ([://].[:]) +// and collecting together matching endpoints into a single BindingEndpoint +func AggregateBindingEndpoints(endpoints []v6.Endpoint) (AggregatedEndpoints, error) { + aggregated := make(AggregatedEndpoints) + + for _, endpoint := range endpoints { + parsed, err := parseHostport(endpoint.Proto, endpoint.PublicURL) + if err != nil { + return nil, fmt.Errorf("failed to parse endpoint: %s: %w", endpoint.ID, err) + } + + endpointURI := parsed.String() + + // Create a new BindingEndpoint if one doesn't exist + var bindingEndpoint bindingsv1alpha1.EndpointBinding + if val, ok := aggregated[endpointURI]; ok { + bindingEndpoint = val + } else { + // newly found hostport, create a new EndpointBinding + bindingEndpoint = bindingsv1alpha1.EndpointBinding{ + // parsed bits are shared across endpoints with the same hostport + Spec: bindingsv1alpha1.EndpointBindingSpec{ + EndpointURI: endpointURI, + Scheme: parsed.Scheme, + Target: bindingsv1alpha1.EndpointTarget{ + Service: parsed.ServiceName, + Namespace: parsed.Namespace, + Port: parsed.Port, + Protocol: "TCP", // always tcp for now + }, + }, + Status: bindingsv1alpha1.EndpointBindingStatus{ + // empty list of endpoints (to be filled in by this loop) + Endpoints: []bindingsv1alpha1.BindingEndpoint{}, + }, + } + } + + // add the found endpoint to the list of endpoints + bindingEndpoint.Status.Endpoints = append(bindingEndpoint.Status.Endpoints, bindingsv1alpha1.BindingEndpoint{ + Ref: v6.Ref{ + ID: endpoint.ID, + URI: endpoint.URI, + }, + }) + + // update the aggregated map + aggregated[endpointURI] = bindingEndpoint + } + + return aggregated, nil +} + +// parsedHostport is a struct to hold the parsed bits +type parsedHostport struct { + Scheme string + ServiceName string + Namespace string + Port int32 +} + +// String prints the parsed hostport as a EndpointURI in the format: ://.: +func (p *parsedHostport) String() string { + return fmt.Sprintf("%s://%s.%s:%d", p.Scheme, p.ServiceName, p.Namespace, p.Port) +} + +// parseHostport parses the hostport from its 4-tuple into a struct +func parseHostport(proto string, publicURL string) (*parsedHostport, error) { + if publicURL == "" { + return nil, fmt.Errorf("missing publicURL") + } + + // to be parsed and filled in + var scheme string + var serviceName string + var namespace string + var port int32 + + parsedURL, err := url.Parse(publicURL) + if err != nil { + return nil, err + } + + if parsedURL.Scheme == "" { + // default scheme to https if not provided + if proto == "" { + proto = defaultScheme + } + + // add the proto as the scheme to the URL + // then reparse the URL so we get the correct Hostpath() + // this is to handle the case where the URL is missing the scheme + // which is required for the URL to be parsed correctly + fullUrl := fmt.Sprintf("%s://%s", proto, publicURL) + parsedURL, err = url.Parse(fullUrl) + if err != nil { + return nil, fmt.Errorf("unable to parse with given proto: %s", fullUrl) + } + } else { + if proto != "" && parsedURL.Scheme != proto { + return nil, fmt.Errorf("mismatched scheme, expected %s: %s", proto, publicURL) + } + } + + // set the scheme + scheme = parsedURL.Scheme + + // Extract the service name and namespace from the URL's host part. + // Format: . + parts := strings.Split(parsedURL.Hostname(), ".") + if len(parts) != 2 { + return nil, fmt.Errorf("invalid hostname, expected .: %s", parsedURL.Hostname()) + } else { + serviceName = parts[0] + namespace = parts[1] + } + + // Parse the port if available + // default based on the scheme. + urlPort := parsedURL.Port() + + // extra check just in case + if parsedURL.Scheme == "tcp" && urlPort == "" { + return nil, fmt.Errorf("missing port for tcp scheme: %s", publicURL) + } + + if urlPort != "" { + parsedPort, err := strconv.Atoi(urlPort) + if err != nil { + return nil, fmt.Errorf("invalid port value: %s", urlPort) + } + port = int32(parsedPort) + } else { + port = defaultPort[scheme] + } + + return &parsedHostport{ + Scheme: scheme, + ServiceName: serviceName, + Namespace: namespace, + Port: port, + }, nil +} diff --git a/internal/ngrokapi/bindingendpoint_aggregator_test.go b/internal/ngrokapi/bindingendpoint_aggregator_test.go new file mode 100644 index 00000000..969a335e --- /dev/null +++ b/internal/ngrokapi/bindingendpoint_aggregator_test.go @@ -0,0 +1,186 @@ +package ngrokapi + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + v6 "github.com/ngrok/ngrok-api-go/v6" + bindingsv1alpha1 "github.com/ngrok/ngrok-operator/api/bindings/v1alpha1" +) + +func Test_parseHostport(t *testing.T) { + t.Parallel() + assert := assert.New(t) + + tests := []struct { + name string + proto string + url string + want *parsedHostport + wantErr bool + }{ + {"empty", "", "", nil, true}, + {"invalid", "https", "does-not-parse", nil, true}, + // We trust the api to only support specific schemes + // {"invalid-scheme", "scheme", "scheme://test.not-working", nil, true}, + {"mismatched-scheme", "tls", "https://test.not-working", nil, true}, + {"missing-tcp-port", "tcp", "tcp://test.not-working", nil, true}, + // with defaults + {"simple", "", "service.namespace", &parsedHostport{"https", "service", "namespace", 443}, false}, + {"full", "tcp", "tcp://service.namespace:1234", &parsedHostport{"tcp", "service", "namespace", 1234}, false}, + {"http-no-port", "http", "service.namespace", &parsedHostport{"http", "service", "namespace", 80}, false}, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + t.Parallel() + got, err := parseHostport(test.proto, test.url) + if test.wantErr { + assert.Error(err) + return + } + assert.NoError(err) + assert.Equal(test.want, got) + }) + } +} + +func Test_AggregateBindingEndpoints(t *testing.T) { + t.Parallel() + assert := assert.New(t) + + tests := []struct { + name string + endpoints []v6.Endpoint + want AggregatedEndpoints + wantErr bool + }{ + {"empty", []v6.Endpoint{}, AggregatedEndpoints{}, false}, + { + name: "single", + endpoints: []v6.Endpoint{ + {ID: "ep_123", PublicURL: "https://service1.namespace1"}, + }, + want: AggregatedEndpoints{ + "https://service1.namespace1:443": { + Spec: bindingsv1alpha1.EndpointBindingSpec{ + EndpointURI: "https://service1.namespace1:443", + Scheme: "https", + Target: bindingsv1alpha1.EndpointTarget{ + Service: "service1", + Namespace: "namespace1", + Port: 443, + Protocol: "TCP", + }, + }, + Status: bindingsv1alpha1.EndpointBindingStatus{ + Endpoints: []bindingsv1alpha1.BindingEndpoint{ + {Ref: v6.Ref{ID: "ep_123"}}, + }, + }, + }, + }, + wantErr: false, + }, + { + name: "full", + endpoints: []v6.Endpoint{ + {ID: "ep_100", PublicURL: "https://service1.namespace1"}, + {ID: "ep_101", PublicURL: "https://service1.namespace1"}, + {ID: "ep_102", PublicURL: "https://service1.namespace1"}, + {ID: "ep_200", PublicURL: "tcp://service2.namespace2:2020"}, + {ID: "ep_201", PublicURL: "tcp://service2.namespace2:2020"}, + {ID: "ep_300", PublicURL: "service3.namespace3"}, + {ID: "ep_400", PublicURL: "http://service4.namespace4:8080"}, + }, + want: AggregatedEndpoints{ + "https://service1.namespace1:443": { + Spec: bindingsv1alpha1.EndpointBindingSpec{ + EndpointURI: "https://service1.namespace1:443", + Scheme: "https", + Target: bindingsv1alpha1.EndpointTarget{ + Service: "service1", + Namespace: "namespace1", + Port: 443, + Protocol: "TCP", + }, + }, + Status: bindingsv1alpha1.EndpointBindingStatus{ + Endpoints: []bindingsv1alpha1.BindingEndpoint{ + {Ref: v6.Ref{ID: "ep_100"}}, + {Ref: v6.Ref{ID: "ep_101"}}, + {Ref: v6.Ref{ID: "ep_102"}}, + }, + }, + }, + "tcp://service2.namespace2:2020": { + Spec: bindingsv1alpha1.EndpointBindingSpec{ + EndpointURI: "tcp://service2.namespace2:2020", + Scheme: "tcp", + Target: bindingsv1alpha1.EndpointTarget{ + Service: "service2", + Namespace: "namespace2", + Port: 2020, + Protocol: "TCP", + }, + }, + Status: bindingsv1alpha1.EndpointBindingStatus{ + Endpoints: []bindingsv1alpha1.BindingEndpoint{ + {Ref: v6.Ref{ID: "ep_200"}}, + {Ref: v6.Ref{ID: "ep_201"}}, + }, + }, + }, + "https://service3.namespace3:443": { + Spec: bindingsv1alpha1.EndpointBindingSpec{ + EndpointURI: "https://service3.namespace3:443", + Scheme: "https", + Target: bindingsv1alpha1.EndpointTarget{ + Service: "service3", + Namespace: "namespace3", + Port: 443, + Protocol: "TCP", + }, + }, + Status: bindingsv1alpha1.EndpointBindingStatus{ + Endpoints: []bindingsv1alpha1.BindingEndpoint{ + {Ref: v6.Ref{ID: "ep_300"}}, + }, + }, + }, + "http://service4.namespace4:8080": { + Spec: bindingsv1alpha1.EndpointBindingSpec{ + EndpointURI: "http://service4.namespace4:8080", + Scheme: "http", + Target: bindingsv1alpha1.EndpointTarget{ + Service: "service4", + Namespace: "namespace4", + Port: 8080, + Protocol: "TCP", + }, + }, + Status: bindingsv1alpha1.EndpointBindingStatus{ + Endpoints: []bindingsv1alpha1.BindingEndpoint{ + {Ref: v6.Ref{ID: "ep_400"}}, + }, + }, + }, + }, + wantErr: false, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + t.Parallel() + got, err := AggregateBindingEndpoints(test.endpoints) + if test.wantErr { + assert.Error(err) + return + } + assert.NoError(err) + assert.Equal(test.want, got) + }) + } +}