diff --git a/pulsar-client-go/go.mod b/pulsar-client-go/go.mod index 614cf1438766d..2dd946aaa36d1 100644 --- a/pulsar-client-go/go.mod +++ b/pulsar-client-go/go.mod @@ -13,3 +13,5 @@ require ( gopkg.in/natefinch/lumberjack.v2 v2.0.0 gopkg.in/yaml.v2 v2.2.2 // indirect ) + +go 1.13 diff --git a/pulsar-function-go/conf/conf.go b/pulsar-function-go/conf/conf.go index 18035741873c0..984142cefdd8a 100644 --- a/pulsar-function-go/conf/conf.go +++ b/pulsar-function-go/conf/conf.go @@ -71,6 +71,8 @@ type Conf struct { //retryDetails config MaxMessageRetries int32 `json:"maxMessageRetries" yaml:"maxMessageRetries"` DeadLetterTopic string `json:"deadLetterTopic" yaml:"deadLetterTopic"` + ExpectedHealthCheckInterval int32 `json:"expectedHealthCheckInterval" yaml:"expectedHealthCheckInterval"` + } var ( diff --git a/pulsar-function-go/conf/conf.yaml b/pulsar-function-go/conf/conf.yaml index 86cf8f22fb177..6652014ddd127 100644 --- a/pulsar-function-go/conf/conf.yaml +++ b/pulsar-function-go/conf/conf.yaml @@ -55,3 +55,4 @@ disk: 0 # retryDetails config maxMessageRetries: 0 deadLetterTopic: "" +expectedHealthCheckInterval: 3 \ No newline at end of file diff --git a/pulsar-function-go/go.mod b/pulsar-function-go/go.mod index 75c9af1d83b3c..60d3139614c6a 100644 --- a/pulsar-function-go/go.mod +++ b/pulsar-function-go/go.mod @@ -3,9 +3,10 @@ module github.com/apache/pulsar/pulsar-function-go go 1.13 require ( - github.com/apache/pulsar-client-go v0.0.0-20200113085434-9b739cf9d098 + github.com/apache/pulsar-client-go v0.0.0-20200116214305-4d788d9935ed github.com/golang/protobuf v1.3.2 - github.com/sirupsen/logrus v1.4.2 - github.com/stretchr/testify v1.4.0 - gopkg.in/yaml.v2 v2.2.7 + github.com/sirupsen/logrus v1.4.1 + github.com/stretchr/testify v1.3.0 + google.golang.org/grpc v1.26.0 + gopkg.in/yaml.v2 v2.2.2 ) diff --git a/pulsar-function-go/go.sum b/pulsar-function-go/go.sum index 86a10c4c0c40f..6289b0f579d94 100644 --- a/pulsar-function-go/go.sum +++ b/pulsar-function-go/go.sum @@ -1,13 +1,27 @@ -github.com/apache/pulsar-client-go v0.0.0-20200113085434-9b739cf9d098 h1:L8k/8FYXV1+IWYAzovGg4RW0qElvyyrDn/2vP7+i+Uk= -github.com/apache/pulsar-client-go v0.0.0-20200113085434-9b739cf9d098/go.mod h1:G+CQVHnh2EPfNEQXOuisIDAyPMiKnzz4Vim/kjtj4U4= +cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/apache/pulsar-client-go v0.0.0-20200116214305-4d788d9935ed h1:Lp7eU5ym84jPmIXoonoaJWVN6psyB90Olookp61LCeA= +github.com/apache/pulsar-client-go v0.0.0-20200116214305-4d788d9935ed/go.mod h1:G+CQVHnh2EPfNEQXOuisIDAyPMiKnzz4Vim/kjtj4U4= github.com/beefsack/go-rate v0.0.0-20180408011153-efa7637bb9b6/go.mod h1:6YNgTHLutezwnBvyneBbwvB8C82y3dcoOj5EQJIdGXA= github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b/go.mod h1:ac9efd0D1fsDb3EJvhqgXRbFx7bs2wqZ10HQPeU8U/Q= +github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= +github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/mock v1.1.1 h1:G5FRp8JnTd7RQH5kemVNlMeyXQAztQ3mOWV95KxsXH8= +github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/google/go-cmp v0.2.0 h1:+dTQ8DZQJz0Mb/HjFlkptS1FeQ4cWSnN941F8aEG4SQ= +github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.3.0 h1:crn/baboCvb5fXaQ0IJ1SGTsTVrWpDsCWC8EGETZijY= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= @@ -21,9 +35,9 @@ github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/sirupsen/logrus v1.4.1 h1:GL2rEmy6nsikmW0r8opw9JIRScdMF5hA8cOYLH7In1k= github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q= -github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4= -github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ= @@ -31,14 +45,47 @@ github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnIn github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= -github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= -github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 h1:VklqNMn3ovrHsnt90PveolxSbWFaJdECFbxSq0Mqo2M= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= +golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= +golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190311183353-d8887717615a h1:oWX7TPOiFAMXLq8o0ikBYfCJVlRHBcsciT5bXOrH628= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f h1:wMNYb4v58l5UBM7MYRLPG6ZhfOqbKu7X5eyFl8ZhKvA= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33 h1:I6FyU15t786LL7oL/hn43zqTuEGr4PN7F4XJ1p4E3Y8= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20190422165155-953cdadca894 h1:Cz4ceDQGXuKRnVBDTS23GTn/pU5OE2C0WrNTOYK1Uuc= -golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= +google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= +google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55 h1:gSJIx1SDwno+2ElGhA4+qG2zF97qiUzTM+rQ0klBOcE= +google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= +google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= +google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= +google.golang.org/grpc v1.26.0 h1:2dTRdpdFEEhJYQD8EMLB61nnrzSCTbG38PhqdhvOltg= +google.golang.org/grpc v1.26.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v2 v2.2.7 h1:VUgggvou5XRW9mHwD/yXxIYSMtY0zoKQf/v226p2nyo= -gopkg.in/yaml.v2 v2.2.7/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/pulsar-function-go/pb/Function.pb.go b/pulsar-function-go/pb/Function.pb.go index 9766ea6726005..34d9b9a7cdc03 100644 --- a/pulsar-function-go/pb/Function.pb.go +++ b/pulsar-function-go/pb/Function.pb.go @@ -17,14 +17,13 @@ // under the License. // -// Code generated by protoc-gen-go. DO NOT EDIT. -// source: Function.proto +package api -package pb - -import proto "github.com/golang/protobuf/proto" -import fmt "fmt" -import math "math" +import ( + fmt "fmt" + proto "github.com/golang/protobuf/proto" + math "math" +) // Reference imports to suppress errors if they are not otherwise used. var _ = proto.Marshal @@ -35,7 +34,7 @@ var _ = math.Inf // is compatible with the proto package it is being compiled against. // A compilation error at this line likely means your copy of the // proto package needs to be updated. -const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package +const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package type ProcessingGuarantees int32 @@ -50,6 +49,7 @@ var ProcessingGuarantees_name = map[int32]string{ 1: "ATMOST_ONCE", 2: "EFFECTIVELY_ONCE", } + var ProcessingGuarantees_value = map[string]int32{ "ATLEAST_ONCE": 0, "ATMOST_ONCE": 1, @@ -59,8 +59,9 @@ var ProcessingGuarantees_value = map[string]int32{ func (x ProcessingGuarantees) String() string { return proto.EnumName(ProcessingGuarantees_name, int32(x)) } + func (ProcessingGuarantees) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_Function_33c6e1841d2624f0, []int{0} + return fileDescriptor_225cf355fcfb169c, []int{0} } type SubscriptionType int32 @@ -74,6 +75,7 @@ var SubscriptionType_name = map[int32]string{ 0: "SHARED", 1: "FAILOVER", } + var SubscriptionType_value = map[string]int32{ "SHARED": 0, "FAILOVER": 1, @@ -82,8 +84,34 @@ var SubscriptionType_value = map[string]int32{ func (x SubscriptionType) String() string { return proto.EnumName(SubscriptionType_name, int32(x)) } + func (SubscriptionType) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_Function_33c6e1841d2624f0, []int{1} + return fileDescriptor_225cf355fcfb169c, []int{1} +} + +type SubscriptionPosition int32 + +const ( + SubscriptionPosition_LATEST SubscriptionPosition = 0 + SubscriptionPosition_EARLIEST SubscriptionPosition = 1 +) + +var SubscriptionPosition_name = map[int32]string{ + 0: "LATEST", + 1: "EARLIEST", +} + +var SubscriptionPosition_value = map[string]int32{ + "LATEST": 0, + "EARLIEST": 1, +} + +func (x SubscriptionPosition) String() string { + return proto.EnumName(SubscriptionPosition_name, int32(x)) +} + +func (SubscriptionPosition) EnumDescriptor() ([]byte, []int) { + return fileDescriptor_225cf355fcfb169c, []int{2} } type FunctionState int32 @@ -97,6 +125,7 @@ var FunctionState_name = map[int32]string{ 0: "RUNNING", 1: "STOPPED", } + var FunctionState_value = map[string]int32{ "RUNNING": 0, "STOPPED": 1, @@ -105,8 +134,9 @@ var FunctionState_value = map[string]int32{ func (x FunctionState) String() string { return proto.EnumName(FunctionState_name, int32(x)) } + func (FunctionState) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_Function_33c6e1841d2624f0, []int{2} + return fileDescriptor_225cf355fcfb169c, []int{3} } type FunctionDetails_Runtime int32 @@ -122,6 +152,7 @@ var FunctionDetails_Runtime_name = map[int32]string{ 1: "PYTHON", 3: "GO", } + var FunctionDetails_Runtime_value = map[string]int32{ "JAVA": 0, "PYTHON": 1, @@ -131,8 +162,40 @@ var FunctionDetails_Runtime_value = map[string]int32{ func (x FunctionDetails_Runtime) String() string { return proto.EnumName(FunctionDetails_Runtime_name, int32(x)) } + func (FunctionDetails_Runtime) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_Function_33c6e1841d2624f0, []int{2, 0} + return fileDescriptor_225cf355fcfb169c, []int{2, 0} +} + +type FunctionDetails_ComponentType int32 + +const ( + FunctionDetails_UNKNOWN FunctionDetails_ComponentType = 0 + FunctionDetails_FUNCTION FunctionDetails_ComponentType = 1 + FunctionDetails_SOURCE FunctionDetails_ComponentType = 2 + FunctionDetails_SINK FunctionDetails_ComponentType = 3 +) + +var FunctionDetails_ComponentType_name = map[int32]string{ + 0: "UNKNOWN", + 1: "FUNCTION", + 2: "SOURCE", + 3: "SINK", +} + +var FunctionDetails_ComponentType_value = map[string]int32{ + "UNKNOWN": 0, + "FUNCTION": 1, + "SOURCE": 2, + "SINK": 3, +} + +func (x FunctionDetails_ComponentType) String() string { + return proto.EnumName(FunctionDetails_ComponentType_name, int32(x)) +} + +func (FunctionDetails_ComponentType) EnumDescriptor() ([]byte, []int) { + return fileDescriptor_225cf355fcfb169c, []int{2, 1} } type Resources struct { @@ -148,16 +211,17 @@ func (m *Resources) Reset() { *m = Resources{} } func (m *Resources) String() string { return proto.CompactTextString(m) } func (*Resources) ProtoMessage() {} func (*Resources) Descriptor() ([]byte, []int) { - return fileDescriptor_Function_33c6e1841d2624f0, []int{0} + return fileDescriptor_225cf355fcfb169c, []int{0} } + func (m *Resources) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_Resources.Unmarshal(m, b) } func (m *Resources) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { return xxx_messageInfo_Resources.Marshal(b, m, deterministic) } -func (dst *Resources) XXX_Merge(src proto.Message) { - xxx_messageInfo_Resources.Merge(dst, src) +func (m *Resources) XXX_Merge(src proto.Message) { + xxx_messageInfo_Resources.Merge(m, src) } func (m *Resources) XXX_Size() int { return xxx_messageInfo_Resources.Size(m) @@ -201,16 +265,17 @@ func (m *RetryDetails) Reset() { *m = RetryDetails{} } func (m *RetryDetails) String() string { return proto.CompactTextString(m) } func (*RetryDetails) ProtoMessage() {} func (*RetryDetails) Descriptor() ([]byte, []int) { - return fileDescriptor_Function_33c6e1841d2624f0, []int{1} + return fileDescriptor_225cf355fcfb169c, []int{1} } + func (m *RetryDetails) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_RetryDetails.Unmarshal(m, b) } func (m *RetryDetails) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { return xxx_messageInfo_RetryDetails.Marshal(b, m, deterministic) } -func (dst *RetryDetails) XXX_Merge(src proto.Message) { - xxx_messageInfo_RetryDetails.Merge(dst, src) +func (m *RetryDetails) XXX_Merge(src proto.Message) { + xxx_messageInfo_RetryDetails.Merge(m, src) } func (m *RetryDetails) XXX_Size() int { return xxx_messageInfo_RetryDetails.Size(m) @@ -236,41 +301,45 @@ func (m *RetryDetails) GetDeadLetterTopic() string { } type FunctionDetails struct { - Tenant string `protobuf:"bytes,1,opt,name=tenant,proto3" json:"tenant,omitempty"` - Namespace string `protobuf:"bytes,2,opt,name=namespace,proto3" json:"namespace,omitempty"` - Name string `protobuf:"bytes,3,opt,name=name,proto3" json:"name,omitempty"` - ClassName string `protobuf:"bytes,4,opt,name=className,proto3" json:"className,omitempty"` - LogTopic string `protobuf:"bytes,5,opt,name=logTopic,proto3" json:"logTopic,omitempty"` - ProcessingGuarantees ProcessingGuarantees `protobuf:"varint,6,opt,name=processingGuarantees,proto3,enum=proto.ProcessingGuarantees" json:"processingGuarantees,omitempty"` - UserConfig string `protobuf:"bytes,7,opt,name=userConfig,proto3" json:"userConfig,omitempty"` - SecretsMap string `protobuf:"bytes,16,opt,name=secretsMap,proto3" json:"secretsMap,omitempty"` - Runtime FunctionDetails_Runtime `protobuf:"varint,8,opt,name=runtime,proto3,enum=proto.FunctionDetails_Runtime" json:"runtime,omitempty"` - AutoAck bool `protobuf:"varint,9,opt,name=autoAck,proto3" json:"autoAck,omitempty"` - Parallelism int32 `protobuf:"varint,10,opt,name=parallelism,proto3" json:"parallelism,omitempty"` - Source *SourceSpec `protobuf:"bytes,11,opt,name=source,proto3" json:"source,omitempty"` - Sink *SinkSpec `protobuf:"bytes,12,opt,name=sink,proto3" json:"sink,omitempty"` - Resources *Resources `protobuf:"bytes,13,opt,name=resources,proto3" json:"resources,omitempty"` - PackageUrl string `protobuf:"bytes,14,opt,name=packageUrl,proto3" json:"packageUrl,omitempty"` - RetryDetails *RetryDetails `protobuf:"bytes,15,opt,name=retryDetails,proto3" json:"retryDetails,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` + Tenant string `protobuf:"bytes,1,opt,name=tenant,proto3" json:"tenant,omitempty"` + Namespace string `protobuf:"bytes,2,opt,name=namespace,proto3" json:"namespace,omitempty"` + Name string `protobuf:"bytes,3,opt,name=name,proto3" json:"name,omitempty"` + ClassName string `protobuf:"bytes,4,opt,name=className,proto3" json:"className,omitempty"` + LogTopic string `protobuf:"bytes,5,opt,name=logTopic,proto3" json:"logTopic,omitempty"` + ProcessingGuarantees ProcessingGuarantees `protobuf:"varint,6,opt,name=processingGuarantees,proto3,enum=proto.ProcessingGuarantees" json:"processingGuarantees,omitempty"` + UserConfig string `protobuf:"bytes,7,opt,name=userConfig,proto3" json:"userConfig,omitempty"` + SecretsMap string `protobuf:"bytes,16,opt,name=secretsMap,proto3" json:"secretsMap,omitempty"` + Runtime FunctionDetails_Runtime `protobuf:"varint,8,opt,name=runtime,proto3,enum=proto.FunctionDetails_Runtime" json:"runtime,omitempty"` + AutoAck bool `protobuf:"varint,9,opt,name=autoAck,proto3" json:"autoAck,omitempty"` + Parallelism int32 `protobuf:"varint,10,opt,name=parallelism,proto3" json:"parallelism,omitempty"` + Source *SourceSpec `protobuf:"bytes,11,opt,name=source,proto3" json:"source,omitempty"` + Sink *SinkSpec `protobuf:"bytes,12,opt,name=sink,proto3" json:"sink,omitempty"` + Resources *Resources `protobuf:"bytes,13,opt,name=resources,proto3" json:"resources,omitempty"` + PackageUrl string `protobuf:"bytes,14,opt,name=packageUrl,proto3" json:"packageUrl,omitempty"` + RetryDetails *RetryDetails `protobuf:"bytes,15,opt,name=retryDetails,proto3" json:"retryDetails,omitempty"` + RuntimeFlags string `protobuf:"bytes,17,opt,name=runtimeFlags,proto3" json:"runtimeFlags,omitempty"` + ComponentType FunctionDetails_ComponentType `protobuf:"varint,18,opt,name=componentType,proto3,enum=proto.FunctionDetails_ComponentType" json:"componentType,omitempty"` + CustomRuntimeOptions string `protobuf:"bytes,19,opt,name=customRuntimeOptions,proto3" json:"customRuntimeOptions,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` } func (m *FunctionDetails) Reset() { *m = FunctionDetails{} } func (m *FunctionDetails) String() string { return proto.CompactTextString(m) } func (*FunctionDetails) ProtoMessage() {} func (*FunctionDetails) Descriptor() ([]byte, []int) { - return fileDescriptor_Function_33c6e1841d2624f0, []int{2} + return fileDescriptor_225cf355fcfb169c, []int{2} } + func (m *FunctionDetails) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_FunctionDetails.Unmarshal(m, b) } func (m *FunctionDetails) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { return xxx_messageInfo_FunctionDetails.Marshal(b, m, deterministic) } -func (dst *FunctionDetails) XXX_Merge(src proto.Message) { - xxx_messageInfo_FunctionDetails.Merge(dst, src) +func (m *FunctionDetails) XXX_Merge(src proto.Message) { + xxx_messageInfo_FunctionDetails.Merge(m, src) } func (m *FunctionDetails) XXX_Size() int { return xxx_messageInfo_FunctionDetails.Size(m) @@ -393,6 +462,27 @@ func (m *FunctionDetails) GetRetryDetails() *RetryDetails { return nil } +func (m *FunctionDetails) GetRuntimeFlags() string { + if m != nil { + return m.RuntimeFlags + } + return "" +} + +func (m *FunctionDetails) GetComponentType() FunctionDetails_ComponentType { + if m != nil { + return m.ComponentType + } + return FunctionDetails_UNKNOWN +} + +func (m *FunctionDetails) GetCustomRuntimeOptions() string { + if m != nil { + return m.CustomRuntimeOptions + } + return "" +} + type ConsumerSpec struct { SchemaType string `protobuf:"bytes,1,opt,name=schemaType,proto3" json:"schemaType,omitempty"` SerdeClassName string `protobuf:"bytes,2,opt,name=serdeClassName,proto3" json:"serdeClassName,omitempty"` @@ -407,16 +497,17 @@ func (m *ConsumerSpec) Reset() { *m = ConsumerSpec{} } func (m *ConsumerSpec) String() string { return proto.CompactTextString(m) } func (*ConsumerSpec) ProtoMessage() {} func (*ConsumerSpec) Descriptor() ([]byte, []int) { - return fileDescriptor_Function_33c6e1841d2624f0, []int{3} + return fileDescriptor_225cf355fcfb169c, []int{3} } + func (m *ConsumerSpec) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_ConsumerSpec.Unmarshal(m, b) } func (m *ConsumerSpec) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { return xxx_messageInfo_ConsumerSpec.Marshal(b, m, deterministic) } -func (dst *ConsumerSpec) XXX_Merge(src proto.Message) { - xxx_messageInfo_ConsumerSpec.Merge(dst, src) +func (m *ConsumerSpec) XXX_Merge(src proto.Message) { + xxx_messageInfo_ConsumerSpec.Merge(m, src) } func (m *ConsumerSpec) XXX_Size() int { return xxx_messageInfo_ConsumerSpec.Size(m) @@ -466,16 +557,17 @@ func (m *ConsumerSpec_ReceiverQueueSize) Reset() { *m = ConsumerSpec_Rec func (m *ConsumerSpec_ReceiverQueueSize) String() string { return proto.CompactTextString(m) } func (*ConsumerSpec_ReceiverQueueSize) ProtoMessage() {} func (*ConsumerSpec_ReceiverQueueSize) Descriptor() ([]byte, []int) { - return fileDescriptor_Function_33c6e1841d2624f0, []int{3, 0} + return fileDescriptor_225cf355fcfb169c, []int{3, 0} } + func (m *ConsumerSpec_ReceiverQueueSize) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_ConsumerSpec_ReceiverQueueSize.Unmarshal(m, b) } func (m *ConsumerSpec_ReceiverQueueSize) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { return xxx_messageInfo_ConsumerSpec_ReceiverQueueSize.Marshal(b, m, deterministic) } -func (dst *ConsumerSpec_ReceiverQueueSize) XXX_Merge(src proto.Message) { - xxx_messageInfo_ConsumerSpec_ReceiverQueueSize.Merge(dst, src) +func (m *ConsumerSpec_ReceiverQueueSize) XXX_Merge(src proto.Message) { + xxx_messageInfo_ConsumerSpec_ReceiverQueueSize.Merge(m, src) } func (m *ConsumerSpec_ReceiverQueueSize) XXX_Size() int { return xxx_messageInfo_ConsumerSpec_ReceiverQueueSize.Size(m) @@ -502,35 +594,37 @@ type SourceSpec struct { SubscriptionType SubscriptionType `protobuf:"varint,3,opt,name=subscriptionType,proto3,enum=proto.SubscriptionType" json:"subscriptionType,omitempty"` // @deprecated -- use topicsToSchema TopicsToSerDeClassName map[string]string `protobuf:"bytes,4,rep,name=topicsToSerDeClassName,proto3" json:"topicsToSerDeClassName,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` // Deprecated: Do not use. - // * + //* // InputSpecs map[string]*ConsumerSpec `protobuf:"bytes,10,rep,name=inputSpecs,proto3" json:"inputSpecs,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` TimeoutMs uint64 `protobuf:"varint,6,opt,name=timeoutMs,proto3" json:"timeoutMs,omitempty"` TopicsPattern string `protobuf:"bytes,7,opt,name=topicsPattern,proto3" json:"topicsPattern,omitempty"` // Deprecated: Do not use. // If specified, this will refer to an archive that is // already present in the server - Builtin string `protobuf:"bytes,8,opt,name=builtin,proto3" json:"builtin,omitempty"` - SubscriptionName string `protobuf:"bytes,9,opt,name=subscriptionName,proto3" json:"subscriptionName,omitempty"` - CleanupSubscription bool `protobuf:"varint,11,opt,name=cleanupSubscription,proto3" json:"cleanupSubscription,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` + Builtin string `protobuf:"bytes,8,opt,name=builtin,proto3" json:"builtin,omitempty"` + SubscriptionName string `protobuf:"bytes,9,opt,name=subscriptionName,proto3" json:"subscriptionName,omitempty"` + CleanupSubscription bool `protobuf:"varint,11,opt,name=cleanupSubscription,proto3" json:"cleanupSubscription,omitempty"` + SubscriptionPosition SubscriptionPosition `protobuf:"varint,12,opt,name=subscriptionPosition,proto3,enum=proto.SubscriptionPosition" json:"subscriptionPosition,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` } func (m *SourceSpec) Reset() { *m = SourceSpec{} } func (m *SourceSpec) String() string { return proto.CompactTextString(m) } func (*SourceSpec) ProtoMessage() {} func (*SourceSpec) Descriptor() ([]byte, []int) { - return fileDescriptor_Function_33c6e1841d2624f0, []int{4} + return fileDescriptor_225cf355fcfb169c, []int{4} } + func (m *SourceSpec) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_SourceSpec.Unmarshal(m, b) } func (m *SourceSpec) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { return xxx_messageInfo_SourceSpec.Marshal(b, m, deterministic) } -func (dst *SourceSpec) XXX_Merge(src proto.Message) { - xxx_messageInfo_SourceSpec.Merge(dst, src) +func (m *SourceSpec) XXX_Merge(src proto.Message) { + xxx_messageInfo_SourceSpec.Merge(m, src) } func (m *SourceSpec) XXX_Size() int { return xxx_messageInfo_SourceSpec.Size(m) @@ -620,6 +714,13 @@ func (m *SourceSpec) GetCleanupSubscription() bool { return false } +func (m *SourceSpec) GetSubscriptionPosition() SubscriptionPosition { + if m != nil { + return m.SubscriptionPosition + } + return SubscriptionPosition_LATEST +} + type SinkSpec struct { ClassName string `protobuf:"bytes,1,opt,name=className,proto3" json:"className,omitempty"` // map in json format @@ -631,7 +732,7 @@ type SinkSpec struct { // If specified, this will refer to an archive that is // already present in the server Builtin string `protobuf:"bytes,6,opt,name=builtin,proto3" json:"builtin,omitempty"` - // * + //* // Builtin schema type or custom schema class name SchemaType string `protobuf:"bytes,7,opt,name=schemaType,proto3" json:"schemaType,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` @@ -643,16 +744,17 @@ func (m *SinkSpec) Reset() { *m = SinkSpec{} } func (m *SinkSpec) String() string { return proto.CompactTextString(m) } func (*SinkSpec) ProtoMessage() {} func (*SinkSpec) Descriptor() ([]byte, []int) { - return fileDescriptor_Function_33c6e1841d2624f0, []int{5} + return fileDescriptor_225cf355fcfb169c, []int{5} } + func (m *SinkSpec) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_SinkSpec.Unmarshal(m, b) } func (m *SinkSpec) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { return xxx_messageInfo_SinkSpec.Marshal(b, m, deterministic) } -func (dst *SinkSpec) XXX_Merge(src proto.Message) { - xxx_messageInfo_SinkSpec.Merge(dst, src) +func (m *SinkSpec) XXX_Merge(src proto.Message) { + xxx_messageInfo_SinkSpec.Merge(m, src) } func (m *SinkSpec) XXX_Size() int { return xxx_messageInfo_SinkSpec.Size(m) @@ -724,16 +826,17 @@ func (m *PackageLocationMetaData) Reset() { *m = PackageLocationMetaData func (m *PackageLocationMetaData) String() string { return proto.CompactTextString(m) } func (*PackageLocationMetaData) ProtoMessage() {} func (*PackageLocationMetaData) Descriptor() ([]byte, []int) { - return fileDescriptor_Function_33c6e1841d2624f0, []int{6} + return fileDescriptor_225cf355fcfb169c, []int{6} } + func (m *PackageLocationMetaData) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_PackageLocationMetaData.Unmarshal(m, b) } func (m *PackageLocationMetaData) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { return xxx_messageInfo_PackageLocationMetaData.Marshal(b, m, deterministic) } -func (dst *PackageLocationMetaData) XXX_Merge(src proto.Message) { - xxx_messageInfo_PackageLocationMetaData.Merge(dst, src) +func (m *PackageLocationMetaData) XXX_Merge(src proto.Message) { + xxx_messageInfo_PackageLocationMetaData.Merge(m, src) } func (m *PackageLocationMetaData) XXX_Size() int { return xxx_messageInfo_PackageLocationMetaData.Size(m) @@ -759,30 +862,32 @@ func (m *PackageLocationMetaData) GetOriginalFileName() string { } type FunctionMetaData struct { - FunctionDetails *FunctionDetails `protobuf:"bytes,1,opt,name=functionDetails,proto3" json:"functionDetails,omitempty"` - PackageLocation *PackageLocationMetaData `protobuf:"bytes,2,opt,name=packageLocation,proto3" json:"packageLocation,omitempty"` - Version uint64 `protobuf:"varint,3,opt,name=version,proto3" json:"version,omitempty"` - CreateTime uint64 `protobuf:"varint,4,opt,name=createTime,proto3" json:"createTime,omitempty"` - InstanceStates map[int32]FunctionState `protobuf:"bytes,5,rep,name=instanceStates,proto3" json:"instanceStates,omitempty" protobuf_key:"varint,1,opt,name=key,proto3" protobuf_val:"varint,2,opt,name=value,proto3,enum=proto.FunctionState"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` + FunctionDetails *FunctionDetails `protobuf:"bytes,1,opt,name=functionDetails,proto3" json:"functionDetails,omitempty"` + PackageLocation *PackageLocationMetaData `protobuf:"bytes,2,opt,name=packageLocation,proto3" json:"packageLocation,omitempty"` + Version uint64 `protobuf:"varint,3,opt,name=version,proto3" json:"version,omitempty"` + CreateTime uint64 `protobuf:"varint,4,opt,name=createTime,proto3" json:"createTime,omitempty"` + InstanceStates map[int32]FunctionState `protobuf:"bytes,5,rep,name=instanceStates,proto3" json:"instanceStates,omitempty" protobuf_key:"varint,1,opt,name=key,proto3" protobuf_val:"varint,2,opt,name=value,proto3,enum=proto.FunctionState"` + FunctionAuthSpec *FunctionAuthenticationSpec `protobuf:"bytes,6,opt,name=functionAuthSpec,proto3" json:"functionAuthSpec,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` } func (m *FunctionMetaData) Reset() { *m = FunctionMetaData{} } func (m *FunctionMetaData) String() string { return proto.CompactTextString(m) } func (*FunctionMetaData) ProtoMessage() {} func (*FunctionMetaData) Descriptor() ([]byte, []int) { - return fileDescriptor_Function_33c6e1841d2624f0, []int{7} + return fileDescriptor_225cf355fcfb169c, []int{7} } + func (m *FunctionMetaData) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_FunctionMetaData.Unmarshal(m, b) } func (m *FunctionMetaData) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { return xxx_messageInfo_FunctionMetaData.Marshal(b, m, deterministic) } -func (dst *FunctionMetaData) XXX_Merge(src proto.Message) { - xxx_messageInfo_FunctionMetaData.Merge(dst, src) +func (m *FunctionMetaData) XXX_Merge(src proto.Message) { + xxx_messageInfo_FunctionMetaData.Merge(m, src) } func (m *FunctionMetaData) XXX_Size() int { return xxx_messageInfo_FunctionMetaData.Size(m) @@ -828,6 +933,67 @@ func (m *FunctionMetaData) GetInstanceStates() map[int32]FunctionState { return nil } +func (m *FunctionMetaData) GetFunctionAuthSpec() *FunctionAuthenticationSpec { + if m != nil { + return m.FunctionAuthSpec + } + return nil +} + +type FunctionAuthenticationSpec struct { + //* + // function authentication related data that the function authentication provider + // needs to cache/distribute to all workers support function authentication. + // Depending on the function authentication provider implementation, this can be the actual auth credentials + // or a pointer to the auth credentials that this function should use + Data []byte `protobuf:"bytes,1,opt,name=data,proto3" json:"data,omitempty"` + //* + // classname of the function auth provicer this data is relevant to + Provider string `protobuf:"bytes,2,opt,name=provider,proto3" json:"provider,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *FunctionAuthenticationSpec) Reset() { *m = FunctionAuthenticationSpec{} } +func (m *FunctionAuthenticationSpec) String() string { return proto.CompactTextString(m) } +func (*FunctionAuthenticationSpec) ProtoMessage() {} +func (*FunctionAuthenticationSpec) Descriptor() ([]byte, []int) { + return fileDescriptor_225cf355fcfb169c, []int{8} +} + +func (m *FunctionAuthenticationSpec) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_FunctionAuthenticationSpec.Unmarshal(m, b) +} +func (m *FunctionAuthenticationSpec) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_FunctionAuthenticationSpec.Marshal(b, m, deterministic) +} +func (m *FunctionAuthenticationSpec) XXX_Merge(src proto.Message) { + xxx_messageInfo_FunctionAuthenticationSpec.Merge(m, src) +} +func (m *FunctionAuthenticationSpec) XXX_Size() int { + return xxx_messageInfo_FunctionAuthenticationSpec.Size(m) +} +func (m *FunctionAuthenticationSpec) XXX_DiscardUnknown() { + xxx_messageInfo_FunctionAuthenticationSpec.DiscardUnknown(m) +} + +var xxx_messageInfo_FunctionAuthenticationSpec proto.InternalMessageInfo + +func (m *FunctionAuthenticationSpec) GetData() []byte { + if m != nil { + return m.Data + } + return nil +} + +func (m *FunctionAuthenticationSpec) GetProvider() string { + if m != nil { + return m.Provider + } + return "" +} + type Instance struct { FunctionMetaData *FunctionMetaData `protobuf:"bytes,1,opt,name=functionMetaData,proto3" json:"functionMetaData,omitempty"` InstanceId int32 `protobuf:"varint,2,opt,name=instanceId,proto3" json:"instanceId,omitempty"` @@ -840,16 +1006,17 @@ func (m *Instance) Reset() { *m = Instance{} } func (m *Instance) String() string { return proto.CompactTextString(m) } func (*Instance) ProtoMessage() {} func (*Instance) Descriptor() ([]byte, []int) { - return fileDescriptor_Function_33c6e1841d2624f0, []int{8} + return fileDescriptor_225cf355fcfb169c, []int{9} } + func (m *Instance) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_Instance.Unmarshal(m, b) } func (m *Instance) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { return xxx_messageInfo_Instance.Marshal(b, m, deterministic) } -func (dst *Instance) XXX_Merge(src proto.Message) { - xxx_messageInfo_Instance.Merge(dst, src) +func (m *Instance) XXX_Merge(src proto.Message) { + xxx_messageInfo_Instance.Merge(m, src) } func (m *Instance) XXX_Size() int { return xxx_messageInfo_Instance.Size(m) @@ -886,16 +1053,17 @@ func (m *Assignment) Reset() { *m = Assignment{} } func (m *Assignment) String() string { return proto.CompactTextString(m) } func (*Assignment) ProtoMessage() {} func (*Assignment) Descriptor() ([]byte, []int) { - return fileDescriptor_Function_33c6e1841d2624f0, []int{9} + return fileDescriptor_225cf355fcfb169c, []int{10} } + func (m *Assignment) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_Assignment.Unmarshal(m, b) } func (m *Assignment) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { return xxx_messageInfo_Assignment.Marshal(b, m, deterministic) } -func (dst *Assignment) XXX_Merge(src proto.Message) { - xxx_messageInfo_Assignment.Merge(dst, src) +func (m *Assignment) XXX_Merge(src proto.Message) { + xxx_messageInfo_Assignment.Merge(m, src) } func (m *Assignment) XXX_Size() int { return xxx_messageInfo_Assignment.Size(m) @@ -921,6 +1089,12 @@ func (m *Assignment) GetWorkerId() string { } func init() { + proto.RegisterEnum("proto.ProcessingGuarantees", ProcessingGuarantees_name, ProcessingGuarantees_value) + proto.RegisterEnum("proto.SubscriptionType", SubscriptionType_name, SubscriptionType_value) + proto.RegisterEnum("proto.SubscriptionPosition", SubscriptionPosition_name, SubscriptionPosition_value) + proto.RegisterEnum("proto.FunctionState", FunctionState_name, FunctionState_value) + proto.RegisterEnum("proto.FunctionDetails_Runtime", FunctionDetails_Runtime_name, FunctionDetails_Runtime_value) + proto.RegisterEnum("proto.FunctionDetails_ComponentType", FunctionDetails_ComponentType_name, FunctionDetails_ComponentType_value) proto.RegisterType((*Resources)(nil), "proto.Resources") proto.RegisterType((*RetryDetails)(nil), "proto.RetryDetails") proto.RegisterType((*FunctionDetails)(nil), "proto.FunctionDetails") @@ -933,93 +1107,102 @@ func init() { proto.RegisterType((*PackageLocationMetaData)(nil), "proto.PackageLocationMetaData") proto.RegisterType((*FunctionMetaData)(nil), "proto.FunctionMetaData") proto.RegisterMapType((map[int32]FunctionState)(nil), "proto.FunctionMetaData.InstanceStatesEntry") + proto.RegisterType((*FunctionAuthenticationSpec)(nil), "proto.FunctionAuthenticationSpec") proto.RegisterType((*Instance)(nil), "proto.Instance") proto.RegisterType((*Assignment)(nil), "proto.Assignment") - proto.RegisterEnum("proto.ProcessingGuarantees", ProcessingGuarantees_name, ProcessingGuarantees_value) - proto.RegisterEnum("proto.SubscriptionType", SubscriptionType_name, SubscriptionType_value) - proto.RegisterEnum("proto.FunctionState", FunctionState_name, FunctionState_value) - proto.RegisterEnum("proto.FunctionDetails_Runtime", FunctionDetails_Runtime_name, FunctionDetails_Runtime_value) } -func init() { proto.RegisterFile("Function.proto", fileDescriptor_Function_33c6e1841d2624f0) } - -var fileDescriptor_Function_33c6e1841d2624f0 = []byte{ - // 1220 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x56, 0xdb, 0x6e, 0xdb, 0x46, - 0x13, 0x36, 0xad, 0xf3, 0xc8, 0xb6, 0xe8, 0x8d, 0x91, 0x10, 0xce, 0x8f, 0x40, 0xd1, 0xdf, 0x83, - 0xec, 0x24, 0x42, 0xe1, 0x5e, 0x34, 0xe8, 0x55, 0x15, 0x59, 0x4e, 0x54, 0xd8, 0x96, 0xba, 0x52, - 0x52, 0xe4, 0xaa, 0xd8, 0xd0, 0x63, 0x65, 0x21, 0x8a, 0x24, 0x76, 0x97, 0x69, 0xdc, 0x07, 0xe8, - 0x63, 0xf4, 0x49, 0xfa, 0x30, 0x7d, 0x92, 0xa2, 0xd8, 0x25, 0x29, 0x91, 0x94, 0xdc, 0xbb, 0x5e, - 0x89, 0xf3, 0xcd, 0x71, 0x67, 0xbf, 0x99, 0x15, 0x1c, 0x5c, 0x44, 0xbe, 0xab, 0x78, 0xe0, 0xf7, - 0x42, 0x11, 0xa8, 0x80, 0x54, 0xcc, 0x4f, 0x67, 0x00, 0x0d, 0x8a, 0x32, 0x88, 0x84, 0x8b, 0x92, - 0xd8, 0x50, 0x72, 0xc3, 0xc8, 0xb1, 0xda, 0x56, 0xd7, 0xa2, 0xfa, 0x53, 0x23, 0x82, 0x2d, 0x9d, - 0xdd, 0xb6, 0xd5, 0x2d, 0x51, 0xfd, 0x49, 0x08, 0x94, 0x6f, 0xb8, 0x5c, 0x38, 0x25, 0x03, 0x99, - 0xef, 0xce, 0x2d, 0xec, 0x51, 0x54, 0xe2, 0xee, 0x1c, 0x15, 0xe3, 0x9e, 0x24, 0xcf, 0xe1, 0x70, - 0xc9, 0x3e, 0x5f, 0xa1, 0x94, 0x6c, 0x8e, 0x5a, 0xc3, 0x51, 0x9a, 0xa8, 0x15, 0xba, 0xa9, 0x20, - 0x5d, 0x68, 0xdd, 0x20, 0xbb, 0xb9, 0x44, 0xa5, 0x50, 0xcc, 0x82, 0x90, 0xbb, 0x26, 0x5f, 0x83, - 0x16, 0xe1, 0xce, 0x1f, 0x15, 0x68, 0xa5, 0xc7, 0x48, 0x73, 0x3d, 0x84, 0xaa, 0x42, 0x9f, 0xf9, - 0xca, 0x24, 0x68, 0xd0, 0x44, 0x22, 0xff, 0x83, 0x86, 0xcf, 0x96, 0x28, 0x43, 0xe6, 0x62, 0x12, - 0x6f, 0x0d, 0xe8, 0x53, 0x68, 0xc1, 0x9c, 0xa2, 0x41, 0xcd, 0xb7, 0xf6, 0x70, 0x3d, 0x26, 0xe5, - 0xb5, 0x56, 0x94, 0x63, 0x8f, 0x15, 0x40, 0x8e, 0xa1, 0xee, 0x05, 0xf3, 0xb8, 0xbc, 0x8a, 0x51, - 0xae, 0x64, 0x32, 0x86, 0xa3, 0x50, 0x04, 0x2e, 0x4a, 0xc9, 0xfd, 0xf9, 0xeb, 0x88, 0x09, 0xe6, - 0x2b, 0x44, 0xe9, 0x54, 0xdb, 0x56, 0xf7, 0xe0, 0xec, 0x71, 0xdc, 0xf1, 0xde, 0x64, 0x8b, 0x09, - 0xdd, 0xea, 0x48, 0x9e, 0x00, 0x44, 0x12, 0xc5, 0x20, 0xf0, 0x6f, 0xf9, 0xdc, 0xa9, 0x99, 0x74, - 0x19, 0x44, 0xeb, 0x25, 0xba, 0x02, 0x95, 0xbc, 0x62, 0xa1, 0x63, 0xc7, 0xfa, 0x35, 0x42, 0x5e, - 0x42, 0x4d, 0x44, 0xbe, 0xe2, 0x4b, 0x74, 0xea, 0xa6, 0x86, 0x27, 0x49, 0x0d, 0x85, 0xee, 0xf5, - 0x68, 0x6c, 0x45, 0x53, 0x73, 0xe2, 0x40, 0x8d, 0x45, 0x2a, 0xe8, 0xbb, 0x0b, 0xa7, 0xd1, 0xb6, - 0xba, 0x75, 0x9a, 0x8a, 0xa4, 0x0d, 0xcd, 0x90, 0x09, 0xe6, 0x79, 0xe8, 0x71, 0xb9, 0x74, 0xc0, - 0x5c, 0x67, 0x16, 0x22, 0x27, 0x50, 0x8d, 0x99, 0xe4, 0x34, 0xdb, 0x56, 0xb7, 0x79, 0x76, 0x98, - 0x24, 0x9d, 0x1a, 0x70, 0x1a, 0xa2, 0x4b, 0x13, 0x03, 0xf2, 0x7f, 0x28, 0x4b, 0xee, 0x2f, 0x9c, - 0x3d, 0x63, 0xd8, 0x4a, 0x0d, 0xb9, 0xbf, 0x30, 0x66, 0x46, 0x49, 0x7a, 0xd0, 0x10, 0x29, 0x37, - 0x9d, 0x7d, 0x63, 0x69, 0x27, 0x96, 0x2b, 0xce, 0xd2, 0xb5, 0x89, 0xee, 0x4a, 0xc8, 0xdc, 0x05, - 0x9b, 0xe3, 0x5b, 0xe1, 0x39, 0x07, 0x71, 0x57, 0xd6, 0x08, 0xf9, 0x0e, 0xf6, 0x44, 0x86, 0xa6, - 0x4e, 0xcb, 0x84, 0x7c, 0xb0, 0x0a, 0xb9, 0x56, 0xd1, 0x9c, 0x61, 0xe7, 0x6b, 0xa8, 0x25, 0x8d, - 0x22, 0x75, 0x28, 0xff, 0xd8, 0x7f, 0xd7, 0xb7, 0x77, 0x08, 0x40, 0x75, 0xf2, 0x7e, 0xf6, 0x66, - 0x7c, 0x6d, 0x5b, 0xa4, 0x0a, 0xbb, 0xaf, 0xc7, 0x76, 0xa9, 0xf3, 0xb7, 0x05, 0x7b, 0x83, 0xc0, - 0x97, 0xd1, 0x12, 0x85, 0x3e, 0x88, 0xb9, 0x28, 0xf7, 0x23, 0x2e, 0xd9, 0xec, 0x2e, 0xc4, 0x84, - 0xa1, 0x19, 0x84, 0x7c, 0x05, 0x07, 0x12, 0xc5, 0x0d, 0x0e, 0x56, 0xc4, 0x8b, 0xa9, 0x5a, 0x40, - 0xb5, 0x1d, 0x97, 0x14, 0xe7, 0xf8, 0x79, 0xc2, 0xf4, 0x3c, 0xf8, 0x86, 0xb9, 0x75, 0x5a, 0x40, - 0xc9, 0x14, 0x0e, 0x05, 0xba, 0xc8, 0x3f, 0xa1, 0xf8, 0x29, 0xc2, 0x08, 0xa7, 0xfc, 0xb7, 0x98, - 0xcb, 0xcd, 0xb3, 0x2f, 0x93, 0x73, 0x66, 0xeb, 0xeb, 0xd1, 0xa2, 0x31, 0xdd, 0xf4, 0x3f, 0x3e, - 0x81, 0xc3, 0x0d, 0x3b, 0x72, 0x04, 0x95, 0x4f, 0xcc, 0x8b, 0x30, 0x99, 0xeb, 0x58, 0xe8, 0xfc, - 0x59, 0x01, 0x58, 0x5f, 0x77, 0x7e, 0xa4, 0xac, 0xe2, 0x48, 0x39, 0x50, 0x73, 0x0d, 0x9f, 0x65, - 0x72, 0xea, 0x54, 0x24, 0x5f, 0xc0, 0xbe, 0xba, 0x0b, 0x33, 0x5d, 0x89, 0x27, 0x2e, 0x0f, 0x92, - 0x01, 0xd8, 0x32, 0xfa, 0x20, 0x5d, 0xc1, 0x43, 0xcd, 0x69, 0xd3, 0xe2, 0x92, 0xa1, 0xfb, 0xa3, - 0x94, 0x50, 0x05, 0x35, 0xdd, 0x70, 0x20, 0x1c, 0x1e, 0x2a, 0x3d, 0xc4, 0x72, 0x16, 0x4c, 0x51, - 0x9c, 0x67, 0x72, 0x96, 0xdb, 0xa5, 0x6e, 0xf3, 0xec, 0xc5, 0x06, 0x89, 0x7b, 0xb3, 0xad, 0xf6, - 0x43, 0x5f, 0x89, 0xbb, 0x57, 0xbb, 0x8e, 0x45, 0xef, 0x09, 0x48, 0xfa, 0x00, 0xdc, 0x0f, 0x23, - 0xa5, 0x83, 0x48, 0x07, 0x4c, 0xf8, 0xa7, 0x9b, 0xe1, 0x47, 0x2b, 0x1b, 0x13, 0x92, 0x66, 0x9c, - 0x74, 0x43, 0x35, 0x0d, 0x83, 0x48, 0x5d, 0xc5, 0xeb, 0xa5, 0x4c, 0xd7, 0x00, 0xe9, 0xc2, 0x7e, - 0x9c, 0x3a, 0x25, 0x89, 0xd9, 0x1c, 0xa6, 0xa6, 0xbc, 0x42, 0xb7, 0xfe, 0x43, 0xc4, 0x3d, 0xc5, - 0x7d, 0xb3, 0x20, 0x1a, 0x34, 0x15, 0xc9, 0x69, 0xbe, 0xa9, 0xa6, 0x13, 0x0d, 0x63, 0xb2, 0x81, - 0x93, 0x6f, 0xe0, 0x81, 0xeb, 0x21, 0xf3, 0xa3, 0x30, 0xdb, 0x68, 0x33, 0xfd, 0x75, 0xba, 0x4d, - 0x75, 0x3c, 0x82, 0xc7, 0xff, 0xd2, 0x3d, 0xfd, 0xdc, 0x2c, 0xf0, 0x2e, 0x61, 0x8a, 0xfe, 0x5c, - 0xd3, 0x2c, 0x66, 0x48, 0x2c, 0x7c, 0xbf, 0xfb, 0xd2, 0x3a, 0xa6, 0xd0, 0x2a, 0x74, 0x6a, 0x8b, - 0xfb, 0x49, 0xd6, 0x7d, 0x3d, 0xeb, 0xd9, 0x19, 0xc8, 0xc4, 0xec, 0xfc, 0x65, 0x41, 0x3d, 0x5d, - 0x42, 0xff, 0x31, 0x79, 0x8f, 0xa0, 0x62, 0xae, 0x24, 0x79, 0x82, 0x62, 0x21, 0xd9, 0x07, 0x79, - 0x16, 0xa6, 0xfb, 0x20, 0x4b, 0xa5, 0xcc, 0xfd, 0x55, 0xf3, 0xf7, 0x97, 0xdf, 0x38, 0xb5, 0xe2, - 0xc6, 0xe9, 0xcc, 0xe1, 0xd1, 0x24, 0x5e, 0x89, 0x97, 0x81, 0xcb, 0xf4, 0xa5, 0x5c, 0xa1, 0x62, - 0xe7, 0x4c, 0xb1, 0x78, 0xc3, 0x1b, 0xd5, 0x84, 0xa9, 0x8f, 0xc9, 0x91, 0xb3, 0x90, 0x26, 0x47, - 0x20, 0xf8, 0x9c, 0xfb, 0xcc, 0xbb, 0xe0, 0x1e, 0x66, 0x16, 0xd6, 0x06, 0xde, 0xf9, 0xbd, 0x04, - 0x76, 0xfa, 0xdc, 0xac, 0x52, 0xfc, 0x00, 0xad, 0xdb, 0xfc, 0x13, 0x64, 0xd2, 0x34, 0xcf, 0x1e, - 0x6e, 0x7f, 0xa0, 0x68, 0xd1, 0x9c, 0xbc, 0x81, 0x56, 0x98, 0xaf, 0x3f, 0xb9, 0xdb, 0xf4, 0x89, - 0xbb, 0xe7, 0x74, 0xb4, 0xe8, 0xa6, 0x7b, 0xf8, 0x09, 0x85, 0xd4, 0x11, 0x4a, 0x66, 0x92, 0x52, - 0x51, 0xf7, 0xd0, 0x15, 0xc8, 0x14, 0xce, 0x78, 0x72, 0x03, 0x65, 0x9a, 0x41, 0xc8, 0x14, 0x0e, - 0xb8, 0x2f, 0x15, 0xf3, 0x5d, 0x9c, 0x2a, 0xa6, 0x50, 0x3a, 0x15, 0x33, 0xcc, 0xcf, 0x0a, 0x87, - 0x48, 0x73, 0xf7, 0x46, 0x39, 0xeb, 0x78, 0xac, 0x0b, 0x21, 0x8e, 0x7f, 0x86, 0x07, 0x5b, 0xcc, - 0xb2, 0x9c, 0xae, 0xc4, 0x9c, 0x3e, 0xcd, 0x72, 0xfa, 0xe0, 0xec, 0xa8, 0x90, 0xd4, 0x38, 0x67, - 0x49, 0x1d, 0x40, 0x3d, 0x0d, 0xac, 0x57, 0xe6, 0x6d, 0xa1, 0xb8, 0xe4, 0x02, 0x1e, 0xdd, 0x53, - 0x3b, 0xdd, 0x70, 0xd0, 0xed, 0x49, 0x6b, 0x1f, 0xdd, 0x98, 0x2a, 0x2a, 0x34, 0x83, 0x74, 0xde, - 0x02, 0xf4, 0xa5, 0xe4, 0x73, 0x7f, 0x89, 0xbe, 0x22, 0xcf, 0xa0, 0x9e, 0xea, 0x92, 0x54, 0xe9, - 0x73, 0x9f, 0x56, 0x45, 0x57, 0x06, 0xfa, 0x5f, 0xd6, 0xaf, 0x81, 0x58, 0xa0, 0x48, 0x02, 0x37, - 0xe8, 0x4a, 0x3e, 0x1d, 0xc3, 0xd1, 0xb6, 0xbf, 0x50, 0xc4, 0x86, 0xbd, 0xfe, 0xec, 0x72, 0xd8, - 0x9f, 0xce, 0x7e, 0x19, 0x5f, 0x0f, 0x86, 0xf6, 0x0e, 0x69, 0x41, 0xb3, 0x3f, 0xbb, 0x1a, 0xa7, - 0x80, 0x45, 0x8e, 0xc0, 0x1e, 0x5e, 0x5c, 0x0c, 0x07, 0xb3, 0xd1, 0xbb, 0xe1, 0xe5, 0xfb, 0x18, - 0xdd, 0x3d, 0x7d, 0x0e, 0x76, 0xf1, 0x81, 0xd0, 0xaf, 0xfa, 0xf4, 0x4d, 0x9f, 0x0e, 0xcf, 0xed, - 0x1d, 0xb2, 0x07, 0xf5, 0x8b, 0xfe, 0xe8, 0x72, 0xfc, 0x6e, 0x48, 0x6d, 0xeb, 0xf4, 0x04, 0xf6, - 0x73, 0x2d, 0x26, 0x4d, 0xa8, 0xd1, 0xb7, 0xd7, 0xd7, 0xa3, 0xeb, 0xd7, 0xf6, 0x8e, 0x16, 0xa6, - 0xb3, 0xf1, 0x64, 0x32, 0x3c, 0xb7, 0xad, 0x57, 0x2f, 0xe0, 0x69, 0x20, 0xe6, 0x3d, 0x16, 0x32, - 0xf7, 0x23, 0xf6, 0xc2, 0xc8, 0x93, 0x4c, 0xf4, 0xd2, 0x36, 0xca, 0xf8, 0xf4, 0xaf, 0xea, 0x69, - 0xb4, 0x0f, 0x55, 0x03, 0x7c, 0xfb, 0x4f, 0x00, 0x00, 0x00, 0xff, 0xff, 0xfe, 0xa8, 0xa5, 0x5f, - 0xa3, 0x0b, 0x00, 0x00, +func init() { proto.RegisterFile("Function.proto", fileDescriptor_225cf355fcfb169c) } + +var fileDescriptor_225cf355fcfb169c = []byte{ + // 1410 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x56, 0x4f, 0x6f, 0xdb, 0x36, + 0x14, 0x8f, 0xe2, 0xf8, 0xdf, 0xb3, 0x13, 0x2b, 0x8c, 0xd1, 0x0a, 0xe9, 0x50, 0xa4, 0x5e, 0xb7, + 0x39, 0x69, 0x6b, 0x14, 0xd9, 0x61, 0xc5, 0x4e, 0x75, 0x1d, 0xa7, 0x75, 0xeb, 0xd8, 0x1e, 0xad, + 0xb4, 0xe8, 0x69, 0x60, 0x15, 0xc6, 0x21, 0x2c, 0x4b, 0x02, 0x49, 0x65, 0xcd, 0xce, 0xbb, 0xee, + 0xe3, 0xed, 0xbe, 0x4f, 0x32, 0x0c, 0xa4, 0x24, 0x5b, 0x92, 0x9d, 0xdd, 0x76, 0x92, 0xde, 0xef, + 0xfd, 0x23, 0x7f, 0x7c, 0xef, 0x91, 0xb0, 0x77, 0x1e, 0x7a, 0x8e, 0x64, 0xbe, 0xd7, 0x09, 0xb8, + 0x2f, 0x7d, 0x54, 0xd4, 0x9f, 0x56, 0x0f, 0xaa, 0x98, 0x0a, 0x3f, 0xe4, 0x0e, 0x15, 0xc8, 0x84, + 0x82, 0x13, 0x84, 0x96, 0x71, 0x64, 0xb4, 0x0d, 0xac, 0x7e, 0x15, 0xc2, 0xc9, 0xc2, 0xda, 0x3e, + 0x32, 0xda, 0x05, 0xac, 0x7e, 0x11, 0x82, 0x9d, 0x2b, 0x26, 0xe6, 0x56, 0x41, 0x43, 0xfa, 0xbf, + 0x75, 0x0d, 0x75, 0x4c, 0x25, 0xbf, 0x3b, 0xa3, 0x92, 0x30, 0x57, 0xa0, 0xe7, 0xb0, 0xbf, 0x20, + 0x5f, 0x2f, 0xa8, 0x10, 0x64, 0x46, 0x95, 0x86, 0x51, 0xa1, 0xa3, 0x16, 0xf1, 0xba, 0x02, 0xb5, + 0xa1, 0x71, 0x45, 0xc9, 0xd5, 0x90, 0x4a, 0x49, 0xb9, 0xed, 0x07, 0xcc, 0xd1, 0xf9, 0xaa, 0x38, + 0x0f, 0xb7, 0xfe, 0x28, 0x43, 0x23, 0xd9, 0x46, 0x92, 0xeb, 0x01, 0x94, 0x24, 0xf5, 0x88, 0x27, + 0x75, 0x82, 0x2a, 0x8e, 0x25, 0xf4, 0x0d, 0x54, 0x3d, 0xb2, 0xa0, 0x22, 0x20, 0x0e, 0x8d, 0xe3, + 0xad, 0x00, 0xb5, 0x0b, 0x25, 0xe8, 0x5d, 0x54, 0xb1, 0xfe, 0x57, 0x1e, 0x8e, 0x4b, 0x84, 0x18, + 0x29, 0xc5, 0x4e, 0xe4, 0xb1, 0x04, 0xd0, 0x21, 0x54, 0x5c, 0x7f, 0x16, 0x2d, 0xaf, 0xa8, 0x95, + 0x4b, 0x19, 0x8d, 0xa1, 0x19, 0x70, 0xdf, 0xa1, 0x42, 0x30, 0x6f, 0xf6, 0x36, 0x24, 0x9c, 0x78, + 0x92, 0x52, 0x61, 0x95, 0x8e, 0x8c, 0xf6, 0xde, 0xe9, 0xa3, 0x88, 0xf1, 0xce, 0x64, 0x83, 0x09, + 0xde, 0xe8, 0x88, 0x1e, 0x03, 0x84, 0x82, 0xf2, 0x9e, 0xef, 0x5d, 0xb3, 0x99, 0x55, 0xd6, 0xe9, + 0x52, 0x88, 0xd2, 0x0b, 0xea, 0x70, 0x2a, 0xc5, 0x05, 0x09, 0x2c, 0x33, 0xd2, 0xaf, 0x10, 0xf4, + 0x0a, 0xca, 0x3c, 0xf4, 0x24, 0x5b, 0x50, 0xab, 0xa2, 0xd7, 0xf0, 0x38, 0x5e, 0x43, 0x8e, 0xbd, + 0x0e, 0x8e, 0xac, 0x70, 0x62, 0x8e, 0x2c, 0x28, 0x93, 0x50, 0xfa, 0x5d, 0x67, 0x6e, 0x55, 0x8f, + 0x8c, 0x76, 0x05, 0x27, 0x22, 0x3a, 0x82, 0x5a, 0x40, 0x38, 0x71, 0x5d, 0xea, 0x32, 0xb1, 0xb0, + 0x40, 0x1f, 0x67, 0x1a, 0x42, 0xc7, 0x50, 0x8a, 0x2a, 0xc9, 0xaa, 0x1d, 0x19, 0xed, 0xda, 0xe9, + 0x7e, 0x9c, 0x74, 0xaa, 0xc1, 0x69, 0x40, 0x1d, 0x1c, 0x1b, 0xa0, 0x6f, 0x61, 0x47, 0x30, 0x6f, + 0x6e, 0xd5, 0xb5, 0x61, 0x23, 0x31, 0x64, 0xde, 0x5c, 0x9b, 0x69, 0x25, 0xea, 0x40, 0x95, 0x27, + 0xb5, 0x69, 0xed, 0x6a, 0x4b, 0x33, 0xb6, 0x5c, 0xd6, 0x2c, 0x5e, 0x99, 0x28, 0x56, 0x02, 0xe2, + 0xcc, 0xc9, 0x8c, 0x5e, 0x72, 0xd7, 0xda, 0x8b, 0x58, 0x59, 0x21, 0xe8, 0x27, 0xa8, 0xf3, 0x54, + 0x99, 0x5a, 0x0d, 0x1d, 0xf2, 0x60, 0x19, 0x72, 0xa5, 0xc2, 0x19, 0x43, 0xd4, 0x82, 0x7a, 0xcc, + 0xcf, 0xb9, 0x4b, 0x66, 0xc2, 0xda, 0xd7, 0xa1, 0x33, 0x18, 0x7a, 0x0f, 0xbb, 0x8e, 0xbf, 0x08, + 0x7c, 0x8f, 0x7a, 0xd2, 0xbe, 0x0b, 0xa8, 0x85, 0x34, 0xf1, 0x4f, 0xef, 0x21, 0xbe, 0x97, 0xb6, + 0xc5, 0x59, 0x57, 0x74, 0x0a, 0x4d, 0x27, 0x14, 0xd2, 0x5f, 0xc4, 0xc7, 0x33, 0x0e, 0x94, 0xab, + 0xb0, 0x0e, 0x74, 0xde, 0x8d, 0xba, 0xd6, 0x0f, 0x50, 0x8e, 0x11, 0x54, 0x81, 0x9d, 0xf7, 0xdd, + 0x8f, 0x5d, 0x73, 0x0b, 0x01, 0x94, 0x26, 0x9f, 0xed, 0x77, 0xe3, 0x91, 0x69, 0xa0, 0x12, 0x6c, + 0xbf, 0x1d, 0x9b, 0x85, 0xd6, 0x6b, 0xd8, 0xcd, 0x24, 0x47, 0x35, 0x28, 0x5f, 0x8e, 0x3e, 0x8c, + 0xc6, 0x9f, 0x46, 0xe6, 0x16, 0xaa, 0x43, 0xe5, 0xfc, 0x72, 0xd4, 0xb3, 0x07, 0xda, 0x07, 0xa0, + 0x34, 0x1d, 0x5f, 0xe2, 0x5e, 0xdf, 0xdc, 0x56, 0x51, 0xa7, 0x83, 0xd1, 0x07, 0xb3, 0xd0, 0xfa, + 0xc7, 0x80, 0x7a, 0xcf, 0xf7, 0x44, 0xb8, 0xa0, 0x5c, 0x1d, 0x97, 0x2e, 0x47, 0xe7, 0x86, 0x2e, + 0x88, 0xde, 0xb8, 0x11, 0x97, 0xe3, 0x12, 0x41, 0xdf, 0xc3, 0x9e, 0xa0, 0xfc, 0x8a, 0xf6, 0x96, + 0xed, 0x15, 0x35, 0x64, 0x0e, 0x55, 0x76, 0x4c, 0x60, 0x3a, 0xa3, 0x5f, 0x27, 0x44, 0x75, 0xbd, + 0xa7, 0xfb, 0xb3, 0x82, 0x73, 0x28, 0x9a, 0xc2, 0x3e, 0xa7, 0x0e, 0x65, 0xb7, 0x94, 0xff, 0x12, + 0xd2, 0x90, 0x4e, 0xd9, 0xef, 0x51, 0xc7, 0xd6, 0x4e, 0xbf, 0x8b, 0xf9, 0x4e, 0xaf, 0xaf, 0x83, + 0xf3, 0xc6, 0x78, 0xdd, 0xff, 0xf0, 0x18, 0xf6, 0xd7, 0xec, 0x50, 0x13, 0x8a, 0xb7, 0xc4, 0x0d, + 0x69, 0x3c, 0xbd, 0x22, 0xa1, 0xf5, 0x67, 0x09, 0x60, 0x55, 0xd4, 0xd9, 0xc1, 0x61, 0xe4, 0x07, + 0x87, 0x05, 0x65, 0x47, 0x77, 0xad, 0x88, 0x77, 0x9d, 0x88, 0xe8, 0x29, 0xec, 0xca, 0xbb, 0x20, + 0xc5, 0x4a, 0x34, 0x57, 0xb2, 0x20, 0xea, 0x81, 0x29, 0xc2, 0x2f, 0xc2, 0xe1, 0x4c, 0x9f, 0xb4, + 0xa6, 0xb8, 0xa0, 0x6b, 0xeb, 0x61, 0xd2, 0x36, 0x39, 0x35, 0x5e, 0x73, 0x40, 0x0c, 0x1e, 0x48, + 0x35, 0xaa, 0x84, 0xed, 0x4f, 0x29, 0x3f, 0x4b, 0xe5, 0xdc, 0x39, 0x2a, 0xb4, 0x6b, 0xa7, 0x2f, + 0xd6, 0x5a, 0xb5, 0x63, 0x6f, 0xb4, 0xef, 0x7b, 0x92, 0xdf, 0xbd, 0xd9, 0xb6, 0x0c, 0x7c, 0x4f, + 0x40, 0xd4, 0x05, 0x60, 0x5e, 0x10, 0x4a, 0x15, 0x44, 0x58, 0xa0, 0xc3, 0x3f, 0x59, 0x0f, 0x3f, + 0x58, 0xda, 0xe8, 0x90, 0x38, 0xe5, 0xa4, 0x08, 0x55, 0x85, 0xec, 0x87, 0xf2, 0x22, 0x1a, 0xa2, + 0x3b, 0x78, 0x05, 0xa0, 0x36, 0xec, 0x46, 0xa9, 0x93, 0x22, 0xd1, 0xf3, 0x51, 0xaf, 0x29, 0xab, + 0x50, 0xd4, 0x7f, 0x09, 0x99, 0x2b, 0x99, 0xa7, 0xc7, 0x60, 0x15, 0x27, 0x22, 0x3a, 0xc9, 0x92, + 0xaa, 0x99, 0xa8, 0x6a, 0x93, 0x35, 0x1c, 0xbd, 0x84, 0x03, 0xc7, 0xa5, 0xc4, 0x0b, 0x83, 0x34, + 0xd1, 0x7a, 0xc6, 0x55, 0xf0, 0x26, 0x95, 0xba, 0x0f, 0xd2, 0x51, 0x26, 0xbe, 0x60, 0xda, 0xa5, + 0x9e, 0xb9, 0x0f, 0xa6, 0x1b, 0x4c, 0xf0, 0x46, 0xc7, 0xc3, 0x01, 0x3c, 0xfa, 0x8f, 0xe3, 0x50, + 0xb7, 0xf4, 0x9c, 0xde, 0xc5, 0xa5, 0xa7, 0x7e, 0x57, 0x75, 0x1b, 0x95, 0x5c, 0x24, 0xfc, 0xbc, + 0xfd, 0xca, 0x38, 0xc4, 0xd0, 0xc8, 0x51, 0xbf, 0xc1, 0xfd, 0x38, 0xed, 0xbe, 0x1a, 0x91, 0xe9, + 0xa6, 0x4a, 0xc5, 0x6c, 0xfd, 0x6d, 0x40, 0x25, 0x99, 0xdd, 0xff, 0x73, 0x37, 0x34, 0xa1, 0xa8, + 0xcf, 0x38, 0xbe, 0xb9, 0x23, 0x21, 0x1e, 0x30, 0xd9, 0xb2, 0x4e, 0x06, 0x4c, 0xba, 0x36, 0x53, + 0x05, 0x51, 0xca, 0x16, 0x44, 0x76, 0x84, 0x95, 0xf3, 0x23, 0xac, 0x35, 0x83, 0x87, 0x93, 0xe8, + 0x26, 0x19, 0xfa, 0x0e, 0x51, 0x87, 0x72, 0x41, 0x25, 0x39, 0x23, 0x92, 0x44, 0x17, 0xa3, 0x56, + 0x4d, 0x88, 0xbc, 0x89, 0xb7, 0x9c, 0x86, 0x54, 0xb5, 0xf9, 0x9c, 0xcd, 0x98, 0x47, 0xdc, 0x73, + 0xe6, 0xd2, 0xd4, 0x04, 0x5c, 0xc3, 0x5b, 0x7f, 0x15, 0xc0, 0x4c, 0x2e, 0x8b, 0x65, 0x8a, 0xd7, + 0xd0, 0xb8, 0xce, 0x5e, 0x20, 0x3a, 0x4d, 0xed, 0xf4, 0xc1, 0xe6, 0xeb, 0x05, 0xe7, 0xcd, 0xd1, + 0x3b, 0x68, 0x04, 0xd9, 0xf5, 0xc7, 0x67, 0x9b, 0xbc, 0x0c, 0xee, 0xd9, 0x1d, 0xce, 0xbb, 0x29, + 0x0e, 0x6f, 0x29, 0x17, 0x2a, 0x42, 0x41, 0xb7, 0x66, 0x22, 0x2a, 0x0e, 0x1d, 0x4e, 0x89, 0xa4, + 0x36, 0x8b, 0x4f, 0x60, 0x07, 0xa7, 0x10, 0x34, 0x85, 0x3d, 0xe6, 0x09, 0x49, 0x3c, 0x87, 0x4e, + 0x25, 0x91, 0x54, 0x58, 0x45, 0x3d, 0x1d, 0x9e, 0xe5, 0x36, 0x91, 0xe4, 0xee, 0x0c, 0x32, 0xd6, + 0xd1, 0x9c, 0xc8, 0x85, 0x40, 0x17, 0x60, 0x26, 0x7b, 0xed, 0x86, 0xf2, 0x46, 0x95, 0xa0, 0x3e, + 0xdb, 0xd5, 0xd0, 0x39, 0x4f, 0xa9, 0xa9, 0x27, 0x59, 0xb4, 0x0f, 0x5d, 0xc3, 0x6b, 0xae, 0x87, + 0x9f, 0xe0, 0x60, 0x43, 0xd6, 0x74, 0x8b, 0x14, 0xa3, 0x16, 0x39, 0x49, 0xb7, 0xc8, 0xde, 0x69, + 0x33, 0x97, 0x4c, 0x3b, 0xa7, 0x7b, 0x64, 0x08, 0x87, 0xf7, 0x2f, 0x44, 0xbf, 0xaa, 0x89, 0x24, + 0x3a, 0x41, 0x1d, 0xeb, 0x7f, 0xf5, 0xe2, 0x0c, 0xb8, 0x7f, 0xcb, 0xae, 0x28, 0x8f, 0xab, 0x65, + 0x29, 0xb7, 0x7c, 0xa8, 0x24, 0xcb, 0x54, 0x17, 0xc4, 0x75, 0x8e, 0xb9, 0xb8, 0x3a, 0x1e, 0xde, + 0x43, 0x2c, 0x5e, 0x73, 0x50, 0x67, 0x97, 0x10, 0x3b, 0xb8, 0xd2, 0xe9, 0x8a, 0x38, 0x85, 0xb4, + 0x2e, 0x01, 0xba, 0x42, 0xb0, 0x99, 0xb7, 0xa0, 0x9e, 0x44, 0xcf, 0xa0, 0x92, 0xe8, 0xe2, 0x54, + 0xc9, 0x13, 0x2e, 0x59, 0x15, 0x5e, 0x1a, 0xa8, 0x7d, 0xfc, 0xe6, 0xf3, 0x39, 0xe5, 0x71, 0xe0, + 0x2a, 0x5e, 0xca, 0x27, 0x63, 0x68, 0x6e, 0x7a, 0x16, 0x23, 0x13, 0xea, 0x5d, 0x7b, 0xd8, 0xef, + 0x4e, 0xed, 0x5f, 0xc7, 0xa3, 0x5e, 0xdf, 0xdc, 0x42, 0x0d, 0xa8, 0x75, 0xed, 0x8b, 0x71, 0x02, + 0x18, 0xa8, 0x09, 0x66, 0xff, 0xfc, 0xbc, 0xdf, 0xb3, 0x07, 0x1f, 0xfb, 0xc3, 0xcf, 0x11, 0xba, + 0x7d, 0xf2, 0x1c, 0xcc, 0xfc, 0x75, 0xa8, 0x5f, 0x31, 0xef, 0xba, 0xb8, 0x7f, 0x16, 0xbf, 0x6f, + 0xba, 0x83, 0xe1, 0xf8, 0x63, 0x1f, 0x9b, 0xc6, 0xc9, 0x4b, 0x68, 0x6e, 0x9a, 0xc2, 0xca, 0x63, + 0xd8, 0xb5, 0xfb, 0x53, 0x3b, 0xf2, 0xe8, 0x77, 0xf1, 0x70, 0xa0, 0x24, 0xe3, 0xe4, 0x18, 0x76, + 0x33, 0x47, 0xac, 0x5e, 0x4f, 0xf8, 0x72, 0x34, 0x1a, 0x8c, 0xde, 0x9a, 0x5b, 0x4a, 0x98, 0xda, + 0xe3, 0xc9, 0xa4, 0x7f, 0x66, 0x1a, 0x6f, 0x5e, 0xc0, 0x13, 0x9f, 0xcf, 0x3a, 0x24, 0x20, 0xce, + 0x0d, 0xed, 0x04, 0xa1, 0x2b, 0x08, 0xef, 0x24, 0xc4, 0x8b, 0x88, 0xaf, 0x37, 0x95, 0x24, 0xda, + 0x97, 0x92, 0x06, 0x7e, 0xfc, 0x37, 0x00, 0x00, 0xff, 0xff, 0xb1, 0xfd, 0x7e, 0x68, 0xa9, 0x0d, + 0x00, 0x00, } diff --git a/pulsar-function-go/pb/InstanceCommunication.pb.go b/pulsar-function-go/pb/InstanceCommunication.pb.go index a0c36e07050f7..e52ac1e138cda 100644 --- a/pulsar-function-go/pb/InstanceCommunication.pb.go +++ b/pulsar-function-go/pb/InstanceCommunication.pb.go @@ -17,15 +17,18 @@ // under the License. // -// Code generated by protoc-gen-go. DO NOT EDIT. -// source: InstanceCommunication.proto - -package pb - -import proto "github.com/golang/protobuf/proto" -import fmt "fmt" -import math "math" -import _ "github.com/golang/protobuf/ptypes/empty" +package api + +import ( + context "context" + fmt "fmt" + proto "github.com/golang/protobuf/proto" + empty "github.com/golang/protobuf/ptypes/empty" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" + math "math" +) // Reference imports to suppress errors if they are not otherwise used. var _ = proto.Marshal @@ -36,7 +39,7 @@ var _ = math.Inf // is compatible with the proto package it is being compiled against. // A compilation error at this line likely means your copy of the // proto package needs to be updated. -const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package +const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package type FunctionStatus struct { Running bool `protobuf:"varint,1,opt,name=running,proto3" json:"running,omitempty"` @@ -75,16 +78,17 @@ func (m *FunctionStatus) Reset() { *m = FunctionStatus{} } func (m *FunctionStatus) String() string { return proto.CompactTextString(m) } func (*FunctionStatus) ProtoMessage() {} func (*FunctionStatus) Descriptor() ([]byte, []int) { - return fileDescriptor_InstanceCommunication_5d8f1fc97439e9d3, []int{0} + return fileDescriptor_93ee26d3627c79da, []int{0} } + func (m *FunctionStatus) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_FunctionStatus.Unmarshal(m, b) } func (m *FunctionStatus) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { return xxx_messageInfo_FunctionStatus.Marshal(b, m, deterministic) } -func (dst *FunctionStatus) XXX_Merge(src proto.Message) { - xxx_messageInfo_FunctionStatus.Merge(dst, src) +func (m *FunctionStatus) XXX_Merge(src proto.Message) { + xxx_messageInfo_FunctionStatus.Merge(m, src) } func (m *FunctionStatus) XXX_Size() int { return xxx_messageInfo_FunctionStatus.Size(m) @@ -226,16 +230,17 @@ func (m *FunctionStatus_ExceptionInformation) Reset() { *m = FunctionSta func (m *FunctionStatus_ExceptionInformation) String() string { return proto.CompactTextString(m) } func (*FunctionStatus_ExceptionInformation) ProtoMessage() {} func (*FunctionStatus_ExceptionInformation) Descriptor() ([]byte, []int) { - return fileDescriptor_InstanceCommunication_5d8f1fc97439e9d3, []int{0, 0} + return fileDescriptor_93ee26d3627c79da, []int{0, 0} } + func (m *FunctionStatus_ExceptionInformation) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_FunctionStatus_ExceptionInformation.Unmarshal(m, b) } func (m *FunctionStatus_ExceptionInformation) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { return xxx_messageInfo_FunctionStatus_ExceptionInformation.Marshal(b, m, deterministic) } -func (dst *FunctionStatus_ExceptionInformation) XXX_Merge(src proto.Message) { - xxx_messageInfo_FunctionStatus_ExceptionInformation.Merge(dst, src) +func (m *FunctionStatus_ExceptionInformation) XXX_Merge(src proto.Message) { + xxx_messageInfo_FunctionStatus_ExceptionInformation.Merge(m, src) } func (m *FunctionStatus_ExceptionInformation) XXX_Size() int { return xxx_messageInfo_FunctionStatus_ExceptionInformation.Size(m) @@ -273,16 +278,17 @@ func (m *FunctionStatusList) Reset() { *m = FunctionStatusList{} } func (m *FunctionStatusList) String() string { return proto.CompactTextString(m) } func (*FunctionStatusList) ProtoMessage() {} func (*FunctionStatusList) Descriptor() ([]byte, []int) { - return fileDescriptor_InstanceCommunication_5d8f1fc97439e9d3, []int{1} + return fileDescriptor_93ee26d3627c79da, []int{1} } + func (m *FunctionStatusList) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_FunctionStatusList.Unmarshal(m, b) } func (m *FunctionStatusList) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { return xxx_messageInfo_FunctionStatusList.Marshal(b, m, deterministic) } -func (dst *FunctionStatusList) XXX_Merge(src proto.Message) { - xxx_messageInfo_FunctionStatusList.Merge(dst, src) +func (m *FunctionStatusList) XXX_Merge(src proto.Message) { + xxx_messageInfo_FunctionStatusList.Merge(m, src) } func (m *FunctionStatusList) XXX_Size() int { return xxx_messageInfo_FunctionStatusList.Size(m) @@ -336,16 +342,17 @@ func (m *MetricsData) Reset() { *m = MetricsData{} } func (m *MetricsData) String() string { return proto.CompactTextString(m) } func (*MetricsData) ProtoMessage() {} func (*MetricsData) Descriptor() ([]byte, []int) { - return fileDescriptor_InstanceCommunication_5d8f1fc97439e9d3, []int{2} + return fileDescriptor_93ee26d3627c79da, []int{2} } + func (m *MetricsData) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_MetricsData.Unmarshal(m, b) } func (m *MetricsData) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { return xxx_messageInfo_MetricsData.Marshal(b, m, deterministic) } -func (dst *MetricsData) XXX_Merge(src proto.Message) { - xxx_messageInfo_MetricsData.Merge(dst, src) +func (m *MetricsData) XXX_Merge(src proto.Message) { + xxx_messageInfo_MetricsData.Merge(m, src) } func (m *MetricsData) XXX_Size() int { return xxx_messageInfo_MetricsData.Size(m) @@ -451,16 +458,17 @@ func (m *HealthCheckResult) Reset() { *m = HealthCheckResult{} } func (m *HealthCheckResult) String() string { return proto.CompactTextString(m) } func (*HealthCheckResult) ProtoMessage() {} func (*HealthCheckResult) Descriptor() ([]byte, []int) { - return fileDescriptor_InstanceCommunication_5d8f1fc97439e9d3, []int{3} + return fileDescriptor_93ee26d3627c79da, []int{3} } + func (m *HealthCheckResult) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_HealthCheckResult.Unmarshal(m, b) } func (m *HealthCheckResult) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { return xxx_messageInfo_HealthCheckResult.Marshal(b, m, deterministic) } -func (dst *HealthCheckResult) XXX_Merge(src proto.Message) { - xxx_messageInfo_HealthCheckResult.Merge(dst, src) +func (m *HealthCheckResult) XXX_Merge(src proto.Message) { + xxx_messageInfo_HealthCheckResult.Merge(m, src) } func (m *HealthCheckResult) XXX_Size() int { return xxx_messageInfo_HealthCheckResult.Size(m) @@ -489,16 +497,17 @@ func (m *Metrics) Reset() { *m = Metrics{} } func (m *Metrics) String() string { return proto.CompactTextString(m) } func (*Metrics) ProtoMessage() {} func (*Metrics) Descriptor() ([]byte, []int) { - return fileDescriptor_InstanceCommunication_5d8f1fc97439e9d3, []int{4} + return fileDescriptor_93ee26d3627c79da, []int{4} } + func (m *Metrics) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_Metrics.Unmarshal(m, b) } func (m *Metrics) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { return xxx_messageInfo_Metrics.Marshal(b, m, deterministic) } -func (dst *Metrics) XXX_Merge(src proto.Message) { - xxx_messageInfo_Metrics.Merge(dst, src) +func (m *Metrics) XXX_Merge(src proto.Message) { + xxx_messageInfo_Metrics.Merge(m, src) } func (m *Metrics) XXX_Size() int { return xxx_messageInfo_Metrics.Size(m) @@ -529,16 +538,17 @@ func (m *Metrics_InstanceMetrics) Reset() { *m = Metrics_InstanceMetrics func (m *Metrics_InstanceMetrics) String() string { return proto.CompactTextString(m) } func (*Metrics_InstanceMetrics) ProtoMessage() {} func (*Metrics_InstanceMetrics) Descriptor() ([]byte, []int) { - return fileDescriptor_InstanceCommunication_5d8f1fc97439e9d3, []int{4, 0} + return fileDescriptor_93ee26d3627c79da, []int{4, 0} } + func (m *Metrics_InstanceMetrics) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_Metrics_InstanceMetrics.Unmarshal(m, b) } func (m *Metrics_InstanceMetrics) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { return xxx_messageInfo_Metrics_InstanceMetrics.Marshal(b, m, deterministic) } -func (dst *Metrics_InstanceMetrics) XXX_Merge(src proto.Message) { - xxx_messageInfo_Metrics_InstanceMetrics.Merge(dst, src) +func (m *Metrics_InstanceMetrics) XXX_Merge(src proto.Message) { + xxx_messageInfo_Metrics_InstanceMetrics.Merge(m, src) } func (m *Metrics_InstanceMetrics) XXX_Size() int { return xxx_messageInfo_Metrics_InstanceMetrics.Size(m) @@ -581,11 +591,9 @@ func init() { proto.RegisterType((*Metrics_InstanceMetrics)(nil), "proto.Metrics.InstanceMetrics") } -func init() { - proto.RegisterFile("InstanceCommunication.proto", fileDescriptor_InstanceCommunication_5d8f1fc97439e9d3) -} +func init() { proto.RegisterFile("InstanceCommunication.proto", fileDescriptor_93ee26d3627c79da) } -var fileDescriptor_InstanceCommunication_5d8f1fc97439e9d3 = []byte{ +var fileDescriptor_93ee26d3627c79da = []byte{ // 917 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x56, 0xef, 0x6e, 0x1b, 0x45, 0x10, 0xcf, 0xd5, 0x75, 0x9d, 0x8c, 0x53, 0x27, 0x9e, 0xc4, 0xe1, 0x70, 0xa5, 0x60, 0x0e, 0x54, @@ -646,3 +654,227 @@ var fileDescriptor_InstanceCommunication_5d8f1fc97439e9d3 = []byte{ 0x0f, 0xc5, 0xeb, 0x07, 0x1a, 0xfd, 0xf2, 0x9f, 0x00, 0x00, 0x00, 0xff, 0xff, 0x52, 0xef, 0x2d, 0x97, 0x48, 0x0a, 0x00, 0x00, } + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConn + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion4 + +// InstanceControlClient is the client API for InstanceControl service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. +type InstanceControlClient interface { + GetFunctionStatus(ctx context.Context, in *empty.Empty, opts ...grpc.CallOption) (*FunctionStatus, error) + GetAndResetMetrics(ctx context.Context, in *empty.Empty, opts ...grpc.CallOption) (*MetricsData, error) + ResetMetrics(ctx context.Context, in *empty.Empty, opts ...grpc.CallOption) (*empty.Empty, error) + GetMetrics(ctx context.Context, in *empty.Empty, opts ...grpc.CallOption) (*MetricsData, error) + HealthCheck(ctx context.Context, in *empty.Empty, opts ...grpc.CallOption) (*HealthCheckResult, error) +} + +type instanceControlClient struct { + cc *grpc.ClientConn +} + +func NewInstanceControlClient(cc *grpc.ClientConn) InstanceControlClient { + return &instanceControlClient{cc} +} + +func (c *instanceControlClient) GetFunctionStatus(ctx context.Context, in *empty.Empty, opts ...grpc.CallOption) (*FunctionStatus, error) { + out := new(FunctionStatus) + err := c.cc.Invoke(ctx, "/proto.InstanceControl/GetFunctionStatus", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *instanceControlClient) GetAndResetMetrics(ctx context.Context, in *empty.Empty, opts ...grpc.CallOption) (*MetricsData, error) { + out := new(MetricsData) + err := c.cc.Invoke(ctx, "/proto.InstanceControl/GetAndResetMetrics", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *instanceControlClient) ResetMetrics(ctx context.Context, in *empty.Empty, opts ...grpc.CallOption) (*empty.Empty, error) { + out := new(empty.Empty) + err := c.cc.Invoke(ctx, "/proto.InstanceControl/ResetMetrics", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *instanceControlClient) GetMetrics(ctx context.Context, in *empty.Empty, opts ...grpc.CallOption) (*MetricsData, error) { + out := new(MetricsData) + err := c.cc.Invoke(ctx, "/proto.InstanceControl/GetMetrics", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *instanceControlClient) HealthCheck(ctx context.Context, in *empty.Empty, opts ...grpc.CallOption) (*HealthCheckResult, error) { + out := new(HealthCheckResult) + err := c.cc.Invoke(ctx, "/proto.InstanceControl/HealthCheck", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// InstanceControlServer is the server API for InstanceControl service. +type InstanceControlServer interface { + GetFunctionStatus(context.Context, *empty.Empty) (*FunctionStatus, error) + GetAndResetMetrics(context.Context, *empty.Empty) (*MetricsData, error) + ResetMetrics(context.Context, *empty.Empty) (*empty.Empty, error) + GetMetrics(context.Context, *empty.Empty) (*MetricsData, error) + HealthCheck(context.Context, *empty.Empty) (*HealthCheckResult, error) +} + +// UnimplementedInstanceControlServer can be embedded to have forward compatible implementations. +type UnimplementedInstanceControlServer struct { +} + +func (*UnimplementedInstanceControlServer) GetFunctionStatus(ctx context.Context, req *empty.Empty) (*FunctionStatus, error) { + return nil, status.Errorf(codes.Unimplemented, "method GetFunctionStatus not implemented") +} +func (*UnimplementedInstanceControlServer) GetAndResetMetrics(ctx context.Context, req *empty.Empty) (*MetricsData, error) { + return nil, status.Errorf(codes.Unimplemented, "method GetAndResetMetrics not implemented") +} +func (*UnimplementedInstanceControlServer) ResetMetrics(ctx context.Context, req *empty.Empty) (*empty.Empty, error) { + return nil, status.Errorf(codes.Unimplemented, "method ResetMetrics not implemented") +} +func (*UnimplementedInstanceControlServer) GetMetrics(ctx context.Context, req *empty.Empty) (*MetricsData, error) { + return nil, status.Errorf(codes.Unimplemented, "method GetMetrics not implemented") +} +func (*UnimplementedInstanceControlServer) HealthCheck(ctx context.Context, req *empty.Empty) (*HealthCheckResult, error) { + return nil, status.Errorf(codes.Unimplemented, "method HealthCheck not implemented") +} + +func RegisterInstanceControlServer(s *grpc.Server, srv InstanceControlServer) { + s.RegisterService(&_InstanceControl_serviceDesc, srv) +} + +func _InstanceControl_GetFunctionStatus_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(empty.Empty) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(InstanceControlServer).GetFunctionStatus(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/proto.InstanceControl/GetFunctionStatus", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(InstanceControlServer).GetFunctionStatus(ctx, req.(*empty.Empty)) + } + return interceptor(ctx, in, info, handler) +} + +func _InstanceControl_GetAndResetMetrics_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(empty.Empty) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(InstanceControlServer).GetAndResetMetrics(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/proto.InstanceControl/GetAndResetMetrics", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(InstanceControlServer).GetAndResetMetrics(ctx, req.(*empty.Empty)) + } + return interceptor(ctx, in, info, handler) +} + +func _InstanceControl_ResetMetrics_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(empty.Empty) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(InstanceControlServer).ResetMetrics(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/proto.InstanceControl/ResetMetrics", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(InstanceControlServer).ResetMetrics(ctx, req.(*empty.Empty)) + } + return interceptor(ctx, in, info, handler) +} + +func _InstanceControl_GetMetrics_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(empty.Empty) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(InstanceControlServer).GetMetrics(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/proto.InstanceControl/GetMetrics", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(InstanceControlServer).GetMetrics(ctx, req.(*empty.Empty)) + } + return interceptor(ctx, in, info, handler) +} + +func _InstanceControl_HealthCheck_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(empty.Empty) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(InstanceControlServer).HealthCheck(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/proto.InstanceControl/HealthCheck", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(InstanceControlServer).HealthCheck(ctx, req.(*empty.Empty)) + } + return interceptor(ctx, in, info, handler) +} + +var _InstanceControl_serviceDesc = grpc.ServiceDesc{ + ServiceName: "proto.InstanceControl", + HandlerType: (*InstanceControlServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "GetFunctionStatus", + Handler: _InstanceControl_GetFunctionStatus_Handler, + }, + { + MethodName: "GetAndResetMetrics", + Handler: _InstanceControl_GetAndResetMetrics_Handler, + }, + { + MethodName: "ResetMetrics", + Handler: _InstanceControl_ResetMetrics_Handler, + }, + { + MethodName: "GetMetrics", + Handler: _InstanceControl_GetMetrics_Handler, + }, + { + MethodName: "HealthCheck", + Handler: _InstanceControl_HealthCheck_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "InstanceCommunication.proto", +} diff --git a/pulsar-function-go/pb/Request.pb.go b/pulsar-function-go/pb/Request.pb.go index 733d0b06b6b8c..827994fccca01 100644 --- a/pulsar-function-go/pb/Request.pb.go +++ b/pulsar-function-go/pb/Request.pb.go @@ -17,14 +17,13 @@ // under the License. // -// Code generated by protoc-gen-go. DO NOT EDIT. -// source: Request.proto +package api -package pb - -import proto "github.com/golang/protobuf/proto" -import fmt "fmt" -import math "math" +import ( + fmt "fmt" + proto "github.com/golang/protobuf/proto" + math "math" +) // Reference imports to suppress errors if they are not otherwise used. var _ = proto.Marshal @@ -35,7 +34,7 @@ var _ = math.Inf // is compatible with the proto package it is being compiled against. // A compilation error at this line likely means your copy of the // proto package needs to be updated. -const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package +const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package type ServiceRequest_ServiceRequestType int32 @@ -50,6 +49,7 @@ var ServiceRequest_ServiceRequestType_name = map[int32]string{ 1: "DELETE", 2: "INITIALIZE", } + var ServiceRequest_ServiceRequestType_value = map[string]int32{ "UPDATE": 0, "DELETE": 1, @@ -59,8 +59,9 @@ var ServiceRequest_ServiceRequestType_value = map[string]int32{ func (x ServiceRequest_ServiceRequestType) String() string { return proto.EnumName(ServiceRequest_ServiceRequestType_name, int32(x)) } + func (ServiceRequest_ServiceRequestType) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_Request_db06a8eacb0614b9, []int{0, 0} + return fileDescriptor_ac56d74daff02b5a, []int{0, 0} } type ServiceRequest struct { @@ -77,16 +78,17 @@ func (m *ServiceRequest) Reset() { *m = ServiceRequest{} } func (m *ServiceRequest) String() string { return proto.CompactTextString(m) } func (*ServiceRequest) ProtoMessage() {} func (*ServiceRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_Request_db06a8eacb0614b9, []int{0} + return fileDescriptor_ac56d74daff02b5a, []int{0} } + func (m *ServiceRequest) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_ServiceRequest.Unmarshal(m, b) } func (m *ServiceRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { return xxx_messageInfo_ServiceRequest.Marshal(b, m, deterministic) } -func (dst *ServiceRequest) XXX_Merge(src proto.Message) { - xxx_messageInfo_ServiceRequest.Merge(dst, src) +func (m *ServiceRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_ServiceRequest.Merge(m, src) } func (m *ServiceRequest) XXX_Size() int { return xxx_messageInfo_ServiceRequest.Size(m) @@ -126,13 +128,13 @@ func (m *ServiceRequest) GetWorkerId() string { } func init() { - proto.RegisterType((*ServiceRequest)(nil), "proto.ServiceRequest") proto.RegisterEnum("proto.ServiceRequest_ServiceRequestType", ServiceRequest_ServiceRequestType_name, ServiceRequest_ServiceRequestType_value) + proto.RegisterType((*ServiceRequest)(nil), "proto.ServiceRequest") } -func init() { proto.RegisterFile("Request.proto", fileDescriptor_Request_db06a8eacb0614b9) } +func init() { proto.RegisterFile("Request.proto", fileDescriptor_ac56d74daff02b5a) } -var fileDescriptor_Request_db06a8eacb0614b9 = []byte{ +var fileDescriptor_ac56d74daff02b5a = []byte{ // 247 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x0d, 0x4a, 0x2d, 0x2c, 0x4d, 0x2d, 0x2e, 0xd1, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x05, 0x53, 0x52, 0x7c, 0x6e, diff --git a/pulsar-function-go/pb/doc.go b/pulsar-function-go/pb/doc.go index aafd43ad51a60..e888179578e11 100644 --- a/pulsar-function-go/pb/doc.go +++ b/pulsar-function-go/pb/doc.go @@ -31,4 +31,4 @@ // revision: 548c726b8e7f0e163b1132c9ada6ba83d6bec572 // // Files generated by the protoc-gen-go program should not be modified. -package pb +package api diff --git a/pulsar-function-go/pb/generate.sh b/pulsar-function-go/pb/generate.sh index ab20949616c69..5e302e8f7ac73 100755 --- a/pulsar-function-go/pb/generate.sh +++ b/pulsar-function-go/pb/generate.sh @@ -54,7 +54,7 @@ fi protoFiles="${protoDefinitions}/*.proto" protoc \ - --go_out=import_path=${pkg}:. \ + --go_out=import_path=${pkg},plugins=grpc:. \ --proto_path="${protoDefinitions}" ${protoFiles} pulsarGitRev=$(git -C ${pulsarSrc} rev-parse HEAD) diff --git a/pulsar-function-go/pf/context.go b/pulsar-function-go/pf/context.go index 9681ae7f36a2f..d8e7dbf3232bf 100644 --- a/pulsar-function-go/pf/context.go +++ b/pulsar-function-go/pf/context.go @@ -21,12 +21,12 @@ package pf import ( "context" + "time" ) type FunctionContext struct { instanceConf *instanceConf userConfigs map[string]interface{} - inputTopics []string logAppender *LogAppender } @@ -43,7 +43,14 @@ func (c *FunctionContext) GetInstanceID() int { } func (c *FunctionContext) GetInputTopics() []string { - return c.inputTopics + inputMap := c.instanceConf.funcDetails.GetSource().InputSpecs + inputTopics := make([]string, len(inputMap)) + i := 0 + for k := range inputMap { + inputTopics[i] = k + i++ + } + return inputTopics } func (c *FunctionContext) GetOutputTopic() string { @@ -66,6 +73,25 @@ func (c *FunctionContext) GetFuncID() string { return c.instanceConf.funcID } +func (c *FunctionContext) GetPort() int { + return c.instanceConf.port +} + +func (c *FunctionContext) GetClusterName() string { + return c.instanceConf.clusterName +} + +func (c *FunctionContext) GetExpectedHealthCheckInterval() int32 { + return c.instanceConf.expectedHealthCheckInterval +} +func (c *FunctionContext) GetExpectedHealthCheckIntervalAsDuration() time.Duration { + return time.Duration(c.instanceConf.expectedHealthCheckInterval) +} + +func (c *FunctionContext) GetMaxIdleTime() int64 { + return int64(c.GetExpectedHealthCheckIntervalAsDuration() * 3 * time.Second) +} + func (c *FunctionContext) GetFuncVersion() string { return c.instanceConf.funcVersion } diff --git a/pulsar-function-go/pf/context_test.go b/pulsar-function-go/pf/context_test.go index ce95738b9a783..1357994240e6a 100644 --- a/pulsar-function-go/pf/context_test.go +++ b/pulsar-function-go/pf/context_test.go @@ -32,8 +32,10 @@ func TestContext(t *testing.T) { fc := NewFuncContext() ctx = NewContext(ctx, fc) if resfc, ok := FromContext(ctx); ok { + assert.Equal(t, []string{"persistent://public/default/topic-01"}, resfc.GetInputTopics()) assert.Equal(t, "1.0.0", resfc.GetFuncVersion()) assert.Equal(t, "pulsar-function", resfc.GetFuncID()) assert.Equal(t, "go-function", resfc.GetFuncName()) + assert.Equal(t, "persistent://public/default/topic-02", resfc.GetOutputTopic()) } -} +} \ No newline at end of file diff --git a/pulsar-function-go/pf/instance.go b/pulsar-function-go/pf/instance.go index abd60fd104848..634b3793448b6 100644 --- a/pulsar-function-go/pf/instance.go +++ b/pulsar-function-go/pf/instance.go @@ -21,12 +21,13 @@ package pf import ( "context" + "github.com/golang/protobuf/ptypes/empty" "math" "time" "github.com/apache/pulsar-client-go/pulsar" log "github.com/apache/pulsar/pulsar-function-go/logutil" - "github.com/apache/pulsar/pulsar-function-go/pb" + pb "github.com/apache/pulsar/pulsar-function-go/pb" ) type goInstance struct { @@ -35,19 +36,57 @@ type goInstance struct { producer pulsar.Producer consumers map[string]pulsar.Consumer client pulsar.Client + lastHealthCheckTs int64 } + // newGoInstance init goInstance and init function context func newGoInstance() *goInstance { goInstance := &goInstance{ context: NewFuncContext(), consumers: make(map[string]pulsar.Consumer), } + now := time.Now() + goInstance.lastHealthCheckTs = now.UnixNano() return goInstance } +func (gi *goInstance) processSpawnerHealthCheckTimer(tkr *time.Ticker){ + log.Info("Starting processSpawnerHealthCheckTimer") + now := time.Now() + maxIdleTime := gi.context.GetMaxIdleTime() + timeSinceLastCheck := now.UnixNano() - gi.lastHealthCheckTs + if (timeSinceLastCheck) > (maxIdleTime) { + log.Error("Haven't received health check from spawner in a while. Stopping instance...") + gi.close() + // os.Exit(1) + tkr.Stop() + } +} + +func (gi *goInstance) startScheduler(){ + if gi.context.instanceConf.expectedHealthCheckInterval > 0 { + log.Info("Starting Scheduler") + go func() { + log.Info("Started Scheduler") + tkr := time.NewTicker(time.Millisecond * 1000 * gi.context.GetExpectedHealthCheckIntervalAsDuration()) + for range tkr.C { + log.Info("Starting Timer") + go gi.processSpawnerHealthCheckTimer(tkr) + } + }() + } +} + func (gi *goInstance) startFunction(function function) error { gi.function = function + + // start proccess spawner health check timer + now := time.Now(); + gi.lastHealthCheckTs = now.UnixNano() + + gi.startScheduler() + err := gi.setupClient() if err != nil { log.Errorf("setup client failed, error is:%v", err) @@ -69,10 +108,13 @@ func (gi *goInstance) startFunction(function function) error { return err } - idleDuration := getIdleTimeout(time.Millisecond * gi.context.instanceConf.killAfterIdle) + idleDuration := getIdleTimeout(time.Millisecond * gi.context.instanceConf.killAfterIdleMs) idleTimer := time.NewTimer(idleDuration) defer idleTimer.Stop() + servicer := InstanceControlServicer{goInstance:gi} + servicer.serve(gi) + CLOSE: for { idleTimer.Reset(idleDuration) @@ -215,7 +257,6 @@ func (gi *goInstance) setupConsumer() (chan pulsar.ConsumerMessage, error) { return nil, err } gi.consumers[topic] = consumer - gi.context.inputTopics = append(gi.context.inputTopics, topic) } return channel, nil } @@ -323,3 +364,27 @@ func (gi *goInstance) close() { gi.client.Close() } } + + +func (gi *goInstance) healthCheck() *pb.HealthCheckResult { + now := time.Now() + gi.lastHealthCheckTs = now.UnixNano() + healthCheckResult := pb.HealthCheckResult{Success: true} + return &healthCheckResult +} + +func (gi *goInstance) getFunctionStatus() *pb.FunctionStatus { + return nil // Not implemented until we add the statistics features +} + +func (gi *goInstance) getAndResetMetrics() *pb.MetricsData { + return nil // Not implemented until we add the statistics features +} + +func (gi *goInstance) resetMetrics() *empty.Empty { + return nil // Not implemented until we add the statistics features +} + +func (gi *goInstance) getMetrics() *pb.MetricsData { + return nil // Not implemented until we add the statistics features +} diff --git a/pulsar-function-go/pf/instanceConf.go b/pulsar-function-go/pf/instanceConf.go index 449b22e65bbeb..0f5259113c844 100644 --- a/pulsar-function-go/pf/instanceConf.go +++ b/pulsar-function-go/pf/instanceConf.go @@ -24,7 +24,7 @@ import ( "time" "github.com/apache/pulsar/pulsar-function-go/conf" - "github.com/apache/pulsar/pulsar-function-go/pb" + pb "github.com/apache/pulsar/pulsar-function-go/pb" ) // This is the config passed to the Golang Instance. Contains all the information @@ -38,7 +38,8 @@ type instanceConf struct { port int clusterName string pulsarServiceURL string - killAfterIdle time.Duration + killAfterIdleMs time.Duration + expectedHealthCheckInterval int32 } func newInstanceConf() *instanceConf { @@ -55,7 +56,8 @@ func newInstanceConf() *instanceConf { port: cfg.Port, clusterName: cfg.ClusterName, pulsarServiceURL: cfg.PulsarServiceURL, - killAfterIdle: cfg.KillAfterIdleMs, + killAfterIdleMs: cfg.KillAfterIdleMs, + expectedHealthCheckInterval: cfg.ExpectedHealthCheckInterval, funcDetails: pb.FunctionDetails{ Tenant: cfg.Tenant, Namespace: cfg.NameSpace, diff --git a/pulsar-function-go/pf/instanceConf_test.go b/pulsar-function-go/pf/instanceConf_test.go index 7b71b119e8a72..351014e327443 100644 --- a/pulsar-function-go/pf/instanceConf_test.go +++ b/pulsar-function-go/pf/instanceConf_test.go @@ -29,4 +29,4 @@ func TestInstanceConf_GetInstanceName(t *testing.T) { instanceConf := newInstanceConf() str := instanceConf.getInstanceName() assert.Equal(t, "101", str) -} +} \ No newline at end of file diff --git a/pulsar-function-go/pf/instanceControlServicer.go b/pulsar-function-go/pf/instanceControlServicer.go new file mode 100644 index 0000000000000..4f032a65d4cab --- /dev/null +++ b/pulsar-function-go/pf/instanceControlServicer.go @@ -0,0 +1,71 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + +package pf + +import ( + "context" + "fmt" + "github.com/golang/protobuf/ptypes/empty" + "net" + "google.golang.org/grpc" + log "github.com/apache/pulsar/pulsar-function-go/logutil" + pb "github.com/apache/pulsar/pulsar-function-go/pb" +) + + +type InstanceControlServicer struct { + goInstance *goInstance +} +func (icServicer *InstanceControlServicer) GetFunctionStatus(ctx context.Context, req *empty.Empty) (*pb.FunctionStatus, error) { + return icServicer.goInstance.getFunctionStatus(), nil + //return nil, status.Errorf(codes.Unimplemented, "method GetFunctionStatus not implemented") +} +func (icServicer *InstanceControlServicer) GetAndResetMetrics(ctx context.Context, req *empty.Empty) (*pb.MetricsData, error) { + return icServicer.goInstance.getAndResetMetrics(), nil +} +func (icServicer *InstanceControlServicer) ResetMetrics(ctx context.Context, req *empty.Empty) (*empty.Empty, error) { + return icServicer.goInstance.resetMetrics(), nil +} +func (icServicer *InstanceControlServicer) GetMetrics(ctx context.Context, req *empty.Empty) (*pb.MetricsData, error) { + return icServicer.goInstance.getMetrics(), nil +} +func (icServicer *InstanceControlServicer) HealthCheck(ctx context.Context, req *empty.Empty) (*pb.HealthCheckResult, error) { + return icServicer.goInstance.healthCheck(), nil +} + +func (icServicer *InstanceControlServicer) serve(goInstance *goInstance) *grpc.Server { + // create a listener on TCP port + lis, err := net.Listen("tcp", fmt.Sprintf(":%d", goInstance.context.GetPort())) + if err != nil { + log.Fatalf("failed to listen: %v", err) + } + // create a gRPC server object + grpcServer := grpc.NewServer() + // must register before we start the service. + pb.RegisterInstanceControlServer(grpcServer, icServicer) + // start the server + log.Infof("Serving InstanceCommunication on port %d", goInstance.context.GetPort()) + go func() { + if err := grpcServer.Serve(lis); err != nil { + log.Fatalf("Server exited with error: %v", err) + } + }() + return grpcServer +} \ No newline at end of file diff --git a/pulsar-function-go/pf/instanceControlServicer_test.go b/pulsar-function-go/pf/instanceControlServicer_test.go new file mode 100644 index 0000000000000..24c1f69f0019b --- /dev/null +++ b/pulsar-function-go/pf/instanceControlServicer_test.go @@ -0,0 +1,77 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + +package pf + +import ( + "context" + pb "github.com/apache/pulsar/pulsar-function-go/pb" + "github.com/golang/protobuf/ptypes/empty" + "github.com/stretchr/testify/assert" + "google.golang.org/grpc" + "google.golang.org/grpc/test/bufconn" + "log" + "net" + "testing" + "time" +) +const bufSize = 1024 * 1024 + +var lis *bufconn.Listener + + + +func bufDialer(string, time.Duration) (net.Conn, error) { + return lis.Dial() +} +func TestInstanceControlServicer_serve_creates_valid_instance(t *testing.T) { + lis = bufconn.Listen(bufSize) + // create a gRPC server object + grpcServer := grpc.NewServer() + instance := newGoInstance() + servicer := InstanceControlServicer{instance} + // must register before we start the service. + pb.RegisterInstanceControlServer(grpcServer, &servicer) + // start the server + log.Printf("Serving InstanceCommunication on port %d", instance.context.GetPort()) + + go func() { + if err := grpcServer.Serve(lis); err != nil { + log.Fatalf("Server exited with error: %v", err) + } + }() + + // Now we can setup the client: + + ctx := context.Background() + conn, err := grpc.DialContext(ctx, "bufnet", grpc.WithDialer(bufDialer), grpc.WithInsecure()) + if err != nil { + t.Fatalf("Failed to dial bufnet: %v", err) + } + defer conn.Close() + client := pb.NewInstanceControlClient(conn) + resp, err := client.HealthCheck(ctx, &empty.Empty{}) + if err != nil { + t.Fatalf("SayHello failed: %v", err) + } + + // Test for output. + log.Printf("Response: %+v", resp.Success) + assert.Equal(t, resp.Success, true) +} \ No newline at end of file diff --git a/pulsar-function-go/pf/instance_test.go b/pulsar-function-go/pf/instance_test.go new file mode 100644 index 0000000000000..ed7b81cc029e7 --- /dev/null +++ b/pulsar-function-go/pf/instance_test.go @@ -0,0 +1,94 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + +package pf + +import ( + "fmt" + "github.com/stretchr/testify/assert" + "strconv" + "testing" + "time" +) + + +func testProcessSpawnerHealthCheckTimer(tkr *time.Ticker, lastHealthCheckTs int64, expectedHealthCheckInterval int32, counter *int ){ + fmt.Println("Starting processSpawnerHealthCheckTimer") + now := time.Now() + maxIdleTime := int64(time.Duration(expectedHealthCheckInterval) * 3 * time.Second) + fmt.Println("maxIdleTime is: " + strconv.FormatInt(maxIdleTime, 10)) + timeSinceLastCheck := now.UnixNano() - lastHealthCheckTs + fmt.Println("timeSinceLastCheck is: " + strconv.FormatInt(timeSinceLastCheck, 10)) + if (timeSinceLastCheck) > (maxIdleTime) { + fmt.Println("Haven't received health check from spawner in a while. Stopping instance...") + // os.Exit(1) + tkr.Stop() + } else { + fmt.Println("Continuing to check") + *counter++ + } +} + +func testStartScheduler(counter *int){ + now := time.Now() + lastHealthCheckTs := now.UnixNano() + + var expectedHealthCheckInterval int32 = 1 + if expectedHealthCheckInterval > 0 { + fmt.Println("Starting Scheduler") + go func() { + fmt.Println("Started Scheduler") + period := time.Second * time.Duration(expectedHealthCheckInterval) + fmt.Println("period is: " + period.String()) + tkr := time.NewTicker(period) + for range tkr.C { + fmt.Println("Starting Timer") + testProcessSpawnerHealthCheckTimer(tkr, lastHealthCheckTs, expectedHealthCheckInterval, counter) + } + }() + } +} + + +func TestInstance_HeartbeatTimer(t *testing.T) { + counter := 0 + testStartScheduler(&counter) + time.Sleep(time.Second * 10) + assert.Equal(t, 2, counter) +} + +func TestTime_EqualsThreeSecondsFixed(t *testing.T) { + var expectedHealthCheckInterval int32 = 3 + timeAmount := time.Millisecond * 1000 * time.Duration(expectedHealthCheckInterval) + assert.Equal(t, time.Second * 3, timeAmount) +} +func TestTime_EqualsThreeSecondsTimed(t *testing.T) { + start := time.Now() + startTime := start.UnixNano() + + time.Sleep(time.Second * 3) + + end := time.Now() + endTime := end.UnixNano() + + diff := endTime - startTime + + assert.True(t, time.Duration(diff) > time.Second * 3) + assert.True(t, time.Duration(diff) < time.Millisecond * 3100) +} \ No newline at end of file