Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ check-api-key:
buildx-create:
docker buildx inspect $(BUILDX_BUILDER_NAME) 2>&1 > /dev/null || \
docker buildx create --name $(BUILDX_BUILDER_NAME) --platform linux/amd64,linux/arm64 --driver docker-container --use --driver-opt network=host || true
docker buildx use $(BUILDX_BUILDER_NAME) || true

.PHONY: build-all # for test purpose build all but output to /dev/null
build-all: BUILD_ARGS ?= --progress=plain --builder $(BUILDX_BUILDER_NAME) --platform linux/amd64,linux/arm64 --output type=tar,dest=/dev/null
Expand Down
133 changes: 118 additions & 15 deletions go/internal/controller/translator/agent/adk_api_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"errors"
"fmt"
"maps"
"net/url"
"os"
"slices"
"strconv"
Expand Down Expand Up @@ -42,6 +43,8 @@ const (

MCPServicePathDefault = "/mcp"
MCPServiceProtocolDefault = v1alpha2.RemoteMCPServerProtocolStreamableHttp

ProxyHostHeader = "x-kagent-host"
)

type ImageConfig struct {
Expand Down Expand Up @@ -73,18 +76,20 @@ type AdkApiTranslator interface {

type TranslatorPlugin = translator.TranslatorPlugin

func NewAdkApiTranslator(kube client.Client, defaultModelConfig types.NamespacedName, plugins []TranslatorPlugin) AdkApiTranslator {
func NewAdkApiTranslator(kube client.Client, defaultModelConfig types.NamespacedName, plugins []TranslatorPlugin, globalProxyURL string) AdkApiTranslator {
return &adkApiTranslator{
kube: kube,
defaultModelConfig: defaultModelConfig,
plugins: plugins,
globalProxyURL: globalProxyURL,
}
}

type adkApiTranslator struct {
kube client.Client
defaultModelConfig types.NamespacedName
plugins []TranslatorPlugin
globalProxyURL string
}

const MAX_DEPTH = 10
Expand Down Expand Up @@ -532,7 +537,8 @@ func (a *adkApiTranslator) translateInlineAgent(ctx context.Context, agent *v1al
// Skip tools that are not applicable to the model provider
switch {
case tool.McpServer != nil:
err := a.translateMCPServerTarget(ctx, cfg, agent.Namespace, tool.McpServer, tool.HeadersFrom)
// Use proxy for MCP server/tool communication
err := a.translateMCPServerTarget(ctx, cfg, agent.Namespace, tool.McpServer, tool.HeadersFrom, a.globalProxyURL)
if err != nil {
return nil, nil, nil, err
}
Expand All @@ -555,15 +561,24 @@ func (a *adkApiTranslator) translateInlineAgent(ctx context.Context, agent *v1al

switch toolAgent.Spec.Type {
case v1alpha2.AgentType_BYO, v1alpha2.AgentType_Declarative:
url := fmt.Sprintf("http://%s.%s:8080", toolAgent.Name, toolAgent.Namespace)
originalURL := fmt.Sprintf("http://%s.%s:8080", toolAgent.Name, toolAgent.Namespace)
headers, err := tool.ResolveHeaders(ctx, a.kube, agent.Namespace)
if err != nil {
return nil, nil, nil, err
}

// If proxy is configured, use proxy URL and set header for Gateway API routing
targetURL := originalURL
if a.globalProxyURL != "" {
targetURL, headers, err = applyProxyURL(originalURL, a.globalProxyURL, headers)
if err != nil {
return nil, nil, nil, err
}
}

cfg.RemoteAgents = append(cfg.RemoteAgents, adk.RemoteAgentConfig{
Name: utils.ConvertToPythonIdentifier(utils.GetObjectRef(toolAgent)),
Url: url,
Url: targetURL,
Headers: headers,
Description: toolAgent.Spec.Description,
})
Expand Down Expand Up @@ -921,14 +936,23 @@ func (a *adkApiTranslator) translateModel(ctx context.Context, namespace, modelC
return nil, nil, nil, fmt.Errorf("unknown model provider: %s", model.Spec.Provider)
}

func (a *adkApiTranslator) translateStreamableHttpTool(ctx context.Context, tool *v1alpha2.RemoteMCPServerSpec, namespace string) (*adk.StreamableHTTPConnectionParams, error) {
func (a *adkApiTranslator) translateStreamableHttpTool(ctx context.Context, tool *v1alpha2.RemoteMCPServerSpec, namespace string, proxyURL string) (*adk.StreamableHTTPConnectionParams, error) {
headers, err := tool.ResolveHeaders(ctx, a.kube, namespace)
if err != nil {
return nil, err
}

// If proxy is configured, use proxy URL and set header for Gateway API routing
targetURL := tool.URL
if proxyURL != "" {
targetURL, headers, err = applyProxyURL(tool.URL, proxyURL, headers)
if err != nil {
return nil, err
}
}

params := &adk.StreamableHTTPConnectionParams{
Url: tool.URL,
Url: targetURL,
Headers: headers,
}
if tool.Timeout != nil {
Expand All @@ -940,17 +964,27 @@ func (a *adkApiTranslator) translateStreamableHttpTool(ctx context.Context, tool
if tool.TerminateOnClose != nil {
params.TerminateOnClose = tool.TerminateOnClose
}

return params, nil
}

func (a *adkApiTranslator) translateSseHttpTool(ctx context.Context, tool *v1alpha2.RemoteMCPServerSpec, namespace string) (*adk.SseConnectionParams, error) {
func (a *adkApiTranslator) translateSseHttpTool(ctx context.Context, tool *v1alpha2.RemoteMCPServerSpec, namespace string, proxyURL string) (*adk.SseConnectionParams, error) {
headers, err := tool.ResolveHeaders(ctx, a.kube, namespace)
if err != nil {
return nil, err
}

// If proxy is configured, use proxy URL and set header for Gateway API routing
targetURL := tool.URL
if proxyURL != "" {
targetURL, headers, err = applyProxyURL(tool.URL, proxyURL, headers)
if err != nil {
return nil, err
}
}

params := &adk.SseConnectionParams{
Url: tool.URL,
Url: targetURL,
Headers: headers,
}
if tool.Timeout != nil {
Expand All @@ -962,7 +996,7 @@ func (a *adkApiTranslator) translateSseHttpTool(ctx context.Context, tool *v1alp
return params, nil
}

func (a *adkApiTranslator) translateMCPServerTarget(ctx context.Context, agent *adk.AgentConfig, agentNamespace string, toolServer *v1alpha2.McpServerTool, toolHeaders []v1alpha2.ValueRef) error {
func (a *adkApiTranslator) translateMCPServerTarget(ctx context.Context, agent *adk.AgentConfig, agentNamespace string, toolServer *v1alpha2.McpServerTool, toolHeaders []v1alpha2.ValueRef, proxyURL string) error {
gvk := toolServer.GroupKind()

switch gvk {
Expand Down Expand Up @@ -993,7 +1027,7 @@ func (a *adkApiTranslator) translateMCPServerTarget(ctx context.Context, agent *

spec.HeadersFrom = append(spec.HeadersFrom, toolHeaders...)

return a.translateRemoteMCPServerTarget(ctx, agent, agentNamespace, spec, toolServer.ToolNames)
return a.translateRemoteMCPServerTarget(ctx, agent, agentNamespace, spec, toolServer.ToolNames, proxyURL)
case schema.GroupKind{
Group: "",
Kind: "RemoteMCPServer",
Expand All @@ -1011,7 +1045,13 @@ func (a *adkApiTranslator) translateMCPServerTarget(ctx context.Context, agent *

remoteMcpServer.Spec.HeadersFrom = append(remoteMcpServer.Spec.HeadersFrom, toolHeaders...)

return a.translateRemoteMCPServerTarget(ctx, agent, agentNamespace, &remoteMcpServer.Spec, toolServer.ToolNames)
// RemoteMCPServer uses user-supplied URLs, but if the URL points to an internal k8s service,
// apply proxy to route through the gateway
proxyURL := ""
if a.globalProxyURL != "" && a.isInternalK8sURL(ctx, remoteMcpServer.Spec.URL, agentNamespace) {
proxyURL = a.globalProxyURL
}
return a.translateRemoteMCPServerTarget(ctx, agent, agentNamespace, &remoteMcpServer.Spec, toolServer.ToolNames, proxyURL)
case schema.GroupKind{
Group: "",
Kind: "Service",
Expand All @@ -1034,7 +1074,7 @@ func (a *adkApiTranslator) translateMCPServerTarget(ctx context.Context, agent *

spec.HeadersFrom = append(spec.HeadersFrom, toolHeaders...)

return a.translateRemoteMCPServerTarget(ctx, agent, agentNamespace, spec, toolServer.ToolNames)
return a.translateRemoteMCPServerTarget(ctx, agent, agentNamespace, spec, toolServer.ToolNames, proxyURL)

default:
return fmt.Errorf("unknown tool server type: %s", gvk)
Expand Down Expand Up @@ -1099,10 +1139,10 @@ func ConvertMCPServerToRemoteMCPServer(mcpServer *v1alpha1.MCPServer) (*v1alpha2
}, nil
}

func (a *adkApiTranslator) translateRemoteMCPServerTarget(ctx context.Context, agent *adk.AgentConfig, agentNamespace string, remoteMcpServer *v1alpha2.RemoteMCPServerSpec, toolNames []string) error {
func (a *adkApiTranslator) translateRemoteMCPServerTarget(ctx context.Context, agent *adk.AgentConfig, agentNamespace string, remoteMcpServer *v1alpha2.RemoteMCPServerSpec, toolNames []string, proxyURL string) error {
switch remoteMcpServer.Protocol {
case v1alpha2.RemoteMCPServerProtocolSse:
tool, err := a.translateSseHttpTool(ctx, remoteMcpServer, agentNamespace)
tool, err := a.translateSseHttpTool(ctx, remoteMcpServer, agentNamespace, proxyURL)
if err != nil {
return err
}
Expand All @@ -1111,7 +1151,7 @@ func (a *adkApiTranslator) translateRemoteMCPServerTarget(ctx context.Context, a
Tools: toolNames,
})
default:
tool, err := a.translateStreamableHttpTool(ctx, remoteMcpServer, agentNamespace)
tool, err := a.translateStreamableHttpTool(ctx, remoteMcpServer, agentNamespace, proxyURL)
if err != nil {
return err
}
Expand All @@ -1125,6 +1165,69 @@ func (a *adkApiTranslator) translateRemoteMCPServerTarget(ctx context.Context, a

// Helper functions

// isInternalK8sURL checks if a URL points to an internal Kubernetes service.
// Internal k8s URLs follow the pattern: http://{name}.{namespace}:{port} or
// http://{name}.{namespace}.svc.cluster.local:{port}
// This method checks if the namespace exists in the cluster to determine if it's internal.
func (a *adkApiTranslator) isInternalK8sURL(ctx context.Context, urlStr, namespace string) bool {
parsedURL, err := url.Parse(urlStr)
if err != nil {
return false
}

hostname := parsedURL.Hostname()
if hostname == "" {
return false
}

// Check if it ends with .svc.cluster.local (definitely internal)
if strings.HasSuffix(hostname, ".svc.cluster.local") {
return true
}

// Extract namespace from hostname pattern: {name}.{namespace}
// Examples: test-mcp-server.kagent -> namespace is "kagent"
parts := strings.Split(hostname, ".")
if len(parts) == 2 {
potentialNamespace := parts[1]

// Check if this namespace exists in the cluster
ns := &corev1.Namespace{}
err := a.kube.Get(ctx, types.NamespacedName{Name: potentialNamespace}, ns)
if err == nil {
// Namespace exists, so this is an internal k8s URL
return true
}
// If namespace doesn't exist, it's likely a TLD or external domain
}

return false
}

func applyProxyURL(originalURL, proxyURL string, headers map[string]string) (targetURL string, updatedHeaders map[string]string, err error) {
// Parse original URL to extract path and hostname
originalURLParsed, err := url.Parse(originalURL)
if err != nil {
return "", nil, fmt.Errorf("failed to parse original URL %q: %w", originalURL, err)
}
proxyURLParsed, err := url.Parse(proxyURL)
if err != nil {
return "", nil, fmt.Errorf("failed to parse proxy URL %q: %w", proxyURL, err)
}

// Use proxy URL with original path
targetURL = fmt.Sprintf("%s://%s%s", proxyURLParsed.Scheme, proxyURLParsed.Host, originalURLParsed.Path)

// Set header to original hostname (without port) for Gateway API routing
updatedHeaders = headers
if updatedHeaders == nil {
updatedHeaders = make(map[string]string)
}
updatedHeaders[ProxyHostHeader] = originalURLParsed.Hostname()

return targetURL, updatedHeaders, nil
}

func computeConfigHash(agentCfg, agentCard, secretData []byte) uint64 {
hasher := sha256.New()
hasher.Write(agentCfg)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package agent_test
import (
"context"
"encoding/json"
"net/url"
"os"
"path/filepath"
"strings"
Expand All @@ -14,6 +15,9 @@ import (

"github.com/kagent-dev/kagent/go/api/v1alpha2"
translator "github.com/kagent-dev/kagent/go/internal/controller/translator/agent"
"github.com/kagent-dev/kmcp/api/v1alpha1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -28,6 +32,7 @@ type TestInput struct {
Operation string `yaml:"operation"` // "translateAgent", "translateTeam", "translateToolServer"
TargetObject string `yaml:"targetObject"` // name of the object to translate
Namespace string `yaml:"namespace"`
ProxyURL string `yaml:"proxyURL,omitempty"` // Optional proxy URL for internally-built k8s URLs
}

// TestGoldenAdkTranslator runs golden tests for the ADK API translator
Expand Down Expand Up @@ -74,20 +79,54 @@ func runGoldenTest(t *testing.T, inputFile, outputsDir, testName string, updateG
scheme := schemev1.Scheme
err = v1alpha2.AddToScheme(scheme)
require.NoError(t, err)
err = v1alpha1.AddToScheme(scheme)
require.NoError(t, err)

// Convert map objects to unstructured and then to typed objects
clientBuilder := fake.NewClientBuilder().WithScheme(scheme)

// Track namespaces we've seen to add them to the fake client
namespacesSeen := make(map[string]bool)

for _, objMap := range testInput.Objects {
// Convert map to unstructured
unstrObj := &unstructured.Unstructured{Object: objMap}

// Track namespace from object metadata
if metadata, ok := objMap["metadata"].(map[string]any); ok {
if ns, ok := metadata["namespace"].(string); ok && ns != "" {
namespacesSeen[ns] = true
}
}

// Extract namespace from URLs in RemoteMCPServer specs
// This is needed because isInternalK8sURL checks if the namespace exists
if kind, ok := objMap["kind"].(string); ok && kind == "RemoteMCPServer" {
if spec, ok := objMap["spec"].(map[string]any); ok {
if urlStr, ok := spec["url"].(string); ok {
if ns := extractNamespaceFromURL(urlStr); ns != "" {
namespacesSeen[ns] = true
}
}
}
}

// Convert to typed object
typedObj, err := convertUnstructuredToTyped(unstrObj, scheme)
require.NoError(t, err)
clientBuilder = clientBuilder.WithObjects(typedObj)
}

// Add namespaces to fake client so namespace existence checks work
for nsName := range namespacesSeen {
ns := &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: nsName,
},
}
clientBuilder = clientBuilder.WithObjects(ns)
}

kubeClient := clientBuilder.Build()

// Create translator with a default model config that points to the first ModelConfig in the objects
Expand Down Expand Up @@ -119,7 +158,9 @@ func runGoldenTest(t *testing.T, inputFile, outputsDir, testName string, updateG
}, agent)
require.NoError(t, err)

result, err = translator.NewAdkApiTranslator(kubeClient, defaultModel, nil).TranslateAgent(ctx, agent)
// Use proxy URL from test input if provided
proxyURL := testInput.ProxyURL
result, err = translator.NewAdkApiTranslator(kubeClient, defaultModel, nil, proxyURL).TranslateAgent(ctx, agent)
require.NoError(t, err)

default:
Expand Down Expand Up @@ -215,3 +256,27 @@ func removeNonDeterministicFields(obj any) any {
return v
}
}

// extractNamespaceFromURL extracts the namespace from a Kubernetes service URL.
// For example, "http://service.namespace:port/path" returns "namespace".
// Returns empty string if the URL is not a valid Kubernetes service URL.
func extractNamespaceFromURL(urlStr string) string {
parsed, err := url.Parse(urlStr)
if err != nil {
return ""
}

// Split hostname by dots: service.namespace or service.namespace.svc.cluster.local
hostname := parsed.Hostname()
parts := strings.Split(hostname, ".")

// Valid patterns:
// - service.namespace (2 parts)
// - service.namespace.svc (3 parts)
// - service.namespace.svc.cluster.local (5 parts)
if len(parts) >= 2 {
return parts[1] // namespace is always the second part
}

return ""
}
Loading
Loading