Skip to content

Commit

Permalink
Implement TapByResource in Tap Service
Browse files Browse the repository at this point in the history
The TapByResource endpoint was previously a stub.

Implement end-to-end tapByResource functionality, with support for
specifying any kubernetes resource(s) as target and destination.

Fixes #803, #49

Signed-off-by: Andrew Seigner <siggy@buoyant.io>
  • Loading branch information
siggy committed Apr 22, 2018
1 parent 79304cd commit c28d1c7
Show file tree
Hide file tree
Showing 9 changed files with 749 additions and 46 deletions.
19 changes: 18 additions & 1 deletion controller/api/public/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,24 @@ func (c *grpcOverHttpClient) Tap(ctx context.Context, req *pb.TapRequest, _ ...g
}

func (c *grpcOverHttpClient) TapByResource(ctx context.Context, req *pb.TapByResourceRequest, _ ...grpc.CallOption) (pb.Api_TapByResourceClient, error) {
return nil, fmt.Errorf("Unimplemented")
url := c.endpointNameToPublicApiUrl("TapByResource")
httpRsp, err := c.post(ctx, url, req)
if err != nil {
return nil, err
}

if err = checkIfResponseHasConduitError(httpRsp); err != nil {
httpRsp.Body.Close()
return nil, err
}

go func() {
<-ctx.Done()
log.Debug("Closing response body after context marked as done")
httpRsp.Body.Close()
}()

return &tapClient{ctx: ctx, reader: bufio.NewReader(httpRsp.Body)}, nil
}

func (c *grpcOverHttpClient) apiRequest(ctx context.Context, endpoint string, req proto.Message, protoResponse proto.Message) error {
Expand Down
1 change: 1 addition & 0 deletions controller/api/public/grpc_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ spec:
replicaSetInformer.Informer().HasSynced,
podInformer.Informer().HasSynced,
replicationControllerInformer.Informer().HasSynced,
serviceInformer.Informer().HasSynced,
) {
t.Fatalf("timed out wait for caches to sync")
}
Expand Down
35 changes: 30 additions & 5 deletions controller/api/public/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@ import (
)

var (
statSummaryPath = fullUrlPathFor("StatSummary")
versionPath = fullUrlPathFor("Version")
listPodsPath = fullUrlPathFor("ListPods")
tapPath = fullUrlPathFor("Tap")
selfCheckPath = fullUrlPathFor("SelfCheck")
statSummaryPath = fullUrlPathFor("StatSummary")
versionPath = fullUrlPathFor("Version")
listPodsPath = fullUrlPathFor("ListPods")
tapPath = fullUrlPathFor("Tap")
tapByResourcePath = fullUrlPathFor("TapByResource")
selfCheckPath = fullUrlPathFor("SelfCheck")
)

type handler struct {
Expand All @@ -50,6 +51,8 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
h.handleListPods(w, req)
case tapPath:
h.handleTap(w, req)
case tapByResourcePath:
h.handleTapByResource(w, req)
case selfCheckPath:
h.handleSelfCheck(w, req)
default:
Expand Down Expand Up @@ -164,6 +167,28 @@ func (h *handler) handleTap(w http.ResponseWriter, req *http.Request) {
}
}

func (h *handler) handleTapByResource(w http.ResponseWriter, req *http.Request) {
flushableWriter, err := newStreamingWriter(w)
if err != nil {
writeErrorToHttpResponse(w, err)
return
}

var protoRequest pb.TapByResourceRequest
err = httpRequestToProto(req, &protoRequest)
if err != nil {
writeErrorToHttpResponse(w, err)
return
}

server := tapServer{w: flushableWriter, req: req}
err = h.grpcServer.TapByResource(&protoRequest, server)
if err != nil {
writeErrorToHttpResponse(w, err)
return
}
}

type tapServer struct {
w flushableResponseWriter
req *http.Request
Expand Down
2 changes: 1 addition & 1 deletion controller/api/public/http_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func TestServer(t *testing.T) {
mockGrpcServer.TapStreamsToReturn = expectedTapResponses
mockGrpcServer.ErrorToReturn = nil

tapClient, err := client.Tap(context.TODO(), &pb.TapRequest{})
tapClient, err := client.TapByResource(context.TODO(), &pb.TapByResourceRequest{})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
Expand Down
96 changes: 95 additions & 1 deletion controller/cmd/tap/main.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,22 @@
package main

import (
"context"
"flag"
"fmt"
"os"
"os/signal"
"syscall"
"time"

"github.com/runconduit/conduit/controller/k8s"
"github.com/runconduit/conduit/controller/tap"
"github.com/runconduit/conduit/controller/util"
"github.com/runconduit/conduit/pkg/version"
log "github.com/sirupsen/logrus"
"k8s.io/api/core/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
)

func main() {
Expand All @@ -33,11 +40,98 @@ func main() {
stop := make(chan os.Signal, 1)
signal.Notify(stop, os.Interrupt, syscall.SIGTERM)

server, lis, err := tap.NewServer(*addr, *tapPort, *kubeConfigPath)
clientSet, err := k8s.NewClientSet(*kubeConfigPath)
if err != nil {
log.Fatalf("failed to create Kubernetes client: %s", err)
}

replicaSets, err := k8s.NewReplicaSetStore(clientSet)
if err != nil {
log.Fatalf("NewReplicaSetStore failed: %s", err)
}
err = replicaSets.Run()
if err != nil {
log.Fatalf("replicaSets.Run() failed: %s", err)
}

// index pods by deployment
deploymentIndex := func(obj interface{}) ([]string, error) {
pod, ok := obj.(*v1.Pod)
if !ok {
return nil, fmt.Errorf("object is not a Pod")
}
deployment, err := replicaSets.GetDeploymentForPod(pod)
if err != nil {
log.Debugf("Cannot get deployment for pod %s: %s", pod.Name, err)
return []string{}, nil
}
return []string{deployment}, nil
}

pods, err := k8s.NewPodIndex(clientSet, deploymentIndex)
if err != nil {
log.Fatalf("NewPodIndex failed: %s", err)
}
err = pods.Run()
if err != nil {
log.Fatalf("pods.Run() failed: %s", err)
}

// TODO: factor out with public-api
sharedInformers := informers.NewSharedInformerFactory(clientSet, 10*time.Minute)

namespaceInformer := sharedInformers.Core().V1().Namespaces()
namespaceInformerSynced := namespaceInformer.Informer().HasSynced

deployInformer := sharedInformers.Apps().V1beta2().Deployments()
deployInformerSynced := deployInformer.Informer().HasSynced

replicaSetInformer := sharedInformers.Apps().V1beta2().ReplicaSets()
replicaSetInformerSynced := replicaSetInformer.Informer().HasSynced

podInformer := sharedInformers.Core().V1().Pods()
podInformerSynced := podInformer.Informer().HasSynced

replicationControllerInformer := sharedInformers.Core().V1().ReplicationControllers()
replicationControllerInformerSynced := replicationControllerInformer.Informer().HasSynced

serviceInformer := sharedInformers.Core().V1().Services()
serviceInformerSynced := serviceInformer.Informer().HasSynced

sharedInformers.Start(nil)

server, lis, err := tap.NewServer(
*addr, *tapPort, replicaSets, pods,
namespaceInformer.Lister(),
deployInformer.Lister(),
replicaSetInformer.Lister(),
podInformer.Lister(),
replicationControllerInformer.Lister(),
serviceInformer.Lister(),
)
if err != nil {
log.Fatal(err.Error())
}

go func() {
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()

log.Infof("waiting for caches to sync")
if !cache.WaitForCacheSync(
ctx.Done(),
namespaceInformerSynced,
deployInformerSynced,
replicaSetInformerSynced,
podInformerSynced,
replicationControllerInformerSynced,
serviceInformerSynced,
) {
log.Fatalf("timed out wait for caches to sync")
}
log.Infof("caches synced")
}()

go func() {
log.Println("starting gRPC server on", *addr)
server.Serve(lis)
Expand Down
2 changes: 1 addition & 1 deletion controller/k8s/pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type podIndex struct {
stopCh chan struct{}
}

func NewPodIndex(clientset *kubernetes.Clientset, index cache.IndexFunc) (PodIndex, error) {
func NewPodIndex(clientset kubernetes.Interface, index cache.IndexFunc) (PodIndex, error) {
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"index": index})

podListWatcher := cache.NewListWatchFromClient(
Expand Down
2 changes: 1 addition & 1 deletion controller/k8s/replicasets.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type ReplicaSetStore struct {
stopCh chan struct{}
}

func NewReplicaSetStore(clientset *kubernetes.Clientset) (*ReplicaSetStore, error) {
func NewReplicaSetStore(clientset kubernetes.Interface) (*ReplicaSetStore, error) {
store := cache.NewStore(cache.MetaNamespaceKeyFunc)

replicatSetListWatcher := cache.NewListWatchFromClient(
Expand Down
Loading

0 comments on commit c28d1c7

Please sign in to comment.