Skip to content

Commit

Permalink
Implement package syncing (#76)
Browse files Browse the repository at this point in the history
- This implements "Packages" spec section https://github.com/open-telemetry/opamp-spec/blob/main/specification.md#packages-1
- TODO: increase test coverage to handle many more edge cases.
- TODO: add package usage to the example Agent/Server.
  • Loading branch information
tigrannajaryan authored May 25, 2022
1 parent a3fd1a4 commit 26dad6c
Show file tree
Hide file tree
Showing 14 changed files with 953 additions and 47 deletions.
254 changes: 253 additions & 1 deletion client/clientimpl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"math/rand"
"net/http"
"net/http/httptest"
"net/url"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -397,6 +398,7 @@ func TestIncludesDetailsOnReconnect(t *testing.T) {
eventually(t, func() bool { return atomic.LoadInt64(&receivedDetails) == 1 })

// close the Agent connection. expect it to reconnect and send details again.
require.NotNil(t, client.conn)
err := client.conn.Close()
assert.NoError(t, err)

Expand Down Expand Up @@ -444,7 +446,7 @@ func TestSetEffectiveConfig(t *testing.T) {
settings.OpAMPServerURL = "ws://" + srv.Endpoint
prepareClient(t, &settings, client)

assert.NoError(t, client.Start(context.Background(), settings))
require.NoError(t, client.Start(context.Background(), settings))

// Verify config is delivered.
eventually(
Expand Down Expand Up @@ -972,3 +974,253 @@ func TestRemoteConfigUpdate(t *testing.T) {
})
}
}

type packageTestCase struct {
name string
errorOnCallback bool
available *protobufs.PackagesAvailable
expectedStatus *protobufs.PackageStatuses
expectedFileContent map[string][]byte
}

const packageUpdateErrorMsg = "cannot update packages"

func verifyUpdatePackages(t *testing.T, testCase packageTestCase) {
testClients(t, func(t *testing.T, client OpAMPClient) {

// Start a Server.
srv := internal.StartMockServer(t)
srv.EnableExpectMode()

localPackageState := internal.NewInMemPackagesStore()

var syncerDoneCh <-chan struct{}

// Prepare a callback that returns either success or failure.
onPackagesAvailable := func(ctx context.Context, packages *protobufs.PackagesAvailable, syncer types.PackagesSyncer) error {
if testCase.errorOnCallback {
return errors.New(packageUpdateErrorMsg)
} else {
syncerDoneCh = syncer.Done()
err := syncer.Sync(ctx)
require.NoError(t, err)
return nil
}
}

// Start a client.
settings := types.StartSettings{
OpAMPServerURL: "ws://" + srv.Endpoint,
Callbacks: types.CallbacksStruct{
OnPackagesAvailableFunc: onPackagesAvailable,
},
PackagesStateProvider: localPackageState,
}
prepareClient(t, &settings, client)

// Client --->
assert.NoError(t, client.Start(context.Background(), settings))

// ---> Server
srv.Expect(func(msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
// Send the packages to the Agent.
return &protobufs.ServerToAgent{
InstanceUid: msg.InstanceUid,
PackagesAvailable: testCase.available,
}
})

// The Agent will try to install the packages and will send the status
// report about it back to the Server.

var lastStatusHash []byte

// ---> Server
// Wait for the expected package statuses to be received.
srv.EventuallyExpect("full PackageStatuses",
func(msg *protobufs.AgentToServer) (*protobufs.ServerToAgent, bool) {
expectedStatusReceived := false

status := msg.PackageStatuses
require.NotNil(t, status)
assert.EqualValues(t, testCase.expectedStatus.ServerProvidedAllPackagesHash, status.ServerProvidedAllPackagesHash)
lastStatusHash = status.Hash

// Verify individual package statuses.
for name, pkgExpected := range testCase.expectedStatus.Packages {
pkgStatus := status.Packages[name]
if pkgStatus == nil {
// Package status not yet included in the report.
continue
}
switch pkgStatus.Status {
case protobufs.PackageStatus_InstallFailed:
assert.Contains(t, pkgStatus.ErrorMessage, pkgExpected.ErrorMessage)

case protobufs.PackageStatus_Installed:
assert.EqualValues(t, pkgExpected.AgentHasHash, pkgStatus.AgentHasHash)
assert.EqualValues(t, pkgExpected.AgentHasVersion, pkgStatus.AgentHasVersion)
assert.Empty(t, pkgStatus.ErrorMessage)
default:
assert.Empty(t, pkgStatus.ErrorMessage)
}
assert.EqualValues(t, pkgExpected.ServerOfferedHash, pkgStatus.ServerOfferedHash)
assert.EqualValues(t, pkgExpected.ServerOfferedVersion, pkgStatus.ServerOfferedVersion)

if pkgStatus.Status == pkgExpected.Status {
expectedStatusReceived = true
assert.Len(t, status.Packages, len(testCase.available.Packages))
}
}
assert.NotNil(t, status.Hash)

return &protobufs.ServerToAgent{InstanceUid: msg.InstanceUid}, expectedStatusReceived
})

if syncerDoneCh != nil {
// Wait until all syncing is done.
<-syncerDoneCh

for pkgName, receivedContent := range localPackageState.GetContent() {
expectedContent := testCase.expectedFileContent[pkgName]
assert.EqualValues(t, expectedContent, receivedContent)
}
}

// Client --->
// Trigger another status report by setting AgentDescription.
_ = client.SetAgentDescription(client.AgentDescription())

// ---> Server
srv.EventuallyExpect("compressed PackageStatuses",
func(msg *protobufs.AgentToServer) (*protobufs.ServerToAgent, bool) {
// Ensure that compressed status is received.
status := msg.PackageStatuses
require.NotNil(t, status)
compressedReceived := status.ServerProvidedAllPackagesHash == nil
if compressedReceived {
assert.Nil(t, status.ServerProvidedAllPackagesHash)
assert.Nil(t, status.Packages)
}
assert.NotNil(t, status.Hash)
assert.Equal(t, lastStatusHash, status.Hash)

response := &protobufs.ServerToAgent{InstanceUid: msg.InstanceUid}

if compressedReceived {
// Ask for full report again.
response.Flags = protobufs.ServerToAgent_ReportPackageStatuses
} else {
// Keep triggering status report by setting AgentDescription
// until the compressed PackageStatuses arrives.
_ = client.SetAgentDescription(client.AgentDescription())
}

return response, compressedReceived
})

// Shutdown the Server.
srv.Close()

// Shutdown the client.
err := client.Stop(context.Background())
assert.NoError(t, err)
})
}

// Downloadable package file constants.
const packageFileURL = "/validfile.pkg"

var packageFileContent = []byte("Package File Content")

func createDownloadSrv(t *testing.T) *httptest.Server {
m := http.NewServeMux()
m.HandleFunc(packageFileURL,
func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
_, err := w.Write(packageFileContent)
assert.NoError(t, err)
},
)

srv := httptest.NewServer(m)

u, err := url.Parse(srv.URL)
if err != nil {
t.Fatal(err)
}
endpoint := u.Host
testhelpers.WaitForEndpoint(endpoint)

return srv
}

func createPackageTestCase(name string, downloadSrv *httptest.Server) packageTestCase {
return packageTestCase{
name: name,
errorOnCallback: false,
available: &protobufs.PackagesAvailable{
Packages: map[string]*protobufs.PackageAvailable{
"package1": {
Type: protobufs.PackageAvailable_TopLevelPackage,
Version: "1.0.0",
File: &protobufs.DownloadableFile{
DownloadUrl: downloadSrv.URL + packageFileURL,
ContentHash: []byte{4, 5},
},
Hash: []byte{1, 2, 3},
},
},
AllPackagesHash: []byte{1, 2, 3, 4, 5},
},

expectedStatus: &protobufs.PackageStatuses{
Packages: map[string]*protobufs.PackageStatus{
"package1": {
Name: "package1",
AgentHasVersion: "1.0.0",
AgentHasHash: []byte{1, 2, 3},
ServerOfferedVersion: "1.0.0",
ServerOfferedHash: []byte{1, 2, 3},
Status: protobufs.PackageStatus_Installed,
ErrorMessage: "",
},
},
ServerProvidedAllPackagesHash: []byte{1, 2, 3, 4, 5},
},

expectedFileContent: map[string][]byte{
"package1": packageFileContent,
},
}
}

func TestUpdatePackages(t *testing.T) {

downloadSrv := createDownloadSrv(t)
defer downloadSrv.Close()

// A success case.
var tests []packageTestCase
tests = append(tests, createPackageTestCase("success", downloadSrv))

// A case when downloading the file fails because the URL is incorrect.
notFound := createPackageTestCase("downloadable file not found", downloadSrv)
notFound.available.Packages["package1"].File.DownloadUrl = downloadSrv.URL + "/notfound"
notFound.expectedStatus.Packages["package1"].Status = protobufs.PackageStatus_InstallFailed
notFound.expectedStatus.Packages["package1"].ErrorMessage = "cannot download"
tests = append(tests, notFound)

// A case when OnPackagesAvailable callback returns an error.
errorOnCallback := createPackageTestCase("error on callback", downloadSrv)
errorOnCallback.expectedStatus.Packages["package1"].Status = protobufs.PackageStatus_InstallFailed
errorOnCallback.expectedStatus.Packages["package1"].ErrorMessage = packageUpdateErrorMsg
errorOnCallback.errorOnCallback = true
tests = append(tests, errorOnCallback)

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
verifyUpdatePackages(t, test)
})
}
}
8 changes: 7 additions & 1 deletion client/httpclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,5 +76,11 @@ func (c *httpClient) runUntilStopped(ctx context.Context) {
// Start the HTTP sender. This will make request/responses with retries for
// failures and will wait with configured polling interval if there is nothing
// to send.
c.sender.Run(ctx, c.opAMPServerURL, c.common.Callbacks, &c.common.ClientSyncedState)
c.sender.Run(
ctx,
c.opAMPServerURL,
c.common.Callbacks,
&c.common.ClientSyncedState,
c.common.PackagesStateProvider,
)
}
50 changes: 38 additions & 12 deletions client/internal/clientcommon.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@ import (

"github.com/open-telemetry/opamp-go/client/types"
"github.com/open-telemetry/opamp-go/protobufs"
"google.golang.org/protobuf/proto"
)

var (
ErrAgentDescriptionMissing = errors.New("AgentDescription is nil")
ErrAgentDescriptionNoAttributes = errors.New("AgentDescription has no attributes defined")
errRemoteConfigStatusMissing = errors.New("RemoteConfigStatus is not set")
errAlreadyStarted = errors.New("already started")
errCannotStopNotStarted = errors.New("cannot stop because not started")

errAlreadyStarted = errors.New("already started")
errCannotStopNotStarted = errors.New("cannot stop because not started")
)

// ClientCommon contains the OpAMP logic that is common between WebSocket and
Expand All @@ -29,6 +30,9 @@ type ClientCommon struct {
// Client state storage. This is needed if the Server asks to report the state.
ClientSyncedState ClientSyncedState

// PackagesStateProvider provides access to the local state of packages.
PackagesStateProvider types.PackagesStateProvider

// The transport-specific sender.
sender Sender

Expand Down Expand Up @@ -59,6 +63,7 @@ func (c *ClientCommon) PrepareStart(_ context.Context, settings types.StartSetti
return ErrAgentDescriptionMissing
}

// Prepare remote config status.
if settings.RemoteConfigStatus == nil {
// RemoteConfigStatus is not provided. Start with empty.
settings.RemoteConfigStatus = &protobufs.RemoteConfigStatus{
Expand All @@ -70,6 +75,27 @@ func (c *ClientCommon) PrepareStart(_ context.Context, settings types.StartSetti
return err
}

// Prepare package statuses.
c.PackagesStateProvider = settings.PackagesStateProvider
var packageStatuses *protobufs.PackageStatuses
if settings.PackagesStateProvider != nil {
// Set package status from the value previously saved in the PackagesStateProvider.
var err error
packageStatuses, err = settings.PackagesStateProvider.LastReportedStatuses()
if err != nil {
return err
}
}

if packageStatuses == nil {
// PackageStatuses is not provided. Start with empty.
packageStatuses = &protobufs.PackageStatuses{}
}
if err := c.ClientSyncedState.SetPackageStatuses(packageStatuses); err != nil {
return err
}

// Prepare callbacks.
c.Callbacks = settings.Callbacks
if c.Callbacks == nil {
// Make sure it is always safe to call Callbacks.
Expand Down Expand Up @@ -154,24 +180,24 @@ func (c *ClientCommon) PrepareFirstMessage(ctx context.Context) error {
msg.StatusReport = &protobufs.StatusReport{}
}
msg.StatusReport.AgentDescription = c.ClientSyncedState.AgentDescription()

msg.StatusReport.EffectiveConfig = cfg

msg.StatusReport.RemoteConfigStatus = c.ClientSyncedState.RemoteConfigStatus()
msg.PackageStatuses = c.ClientSyncedState.PackageStatuses()

if msg.PackageStatuses == nil {
msg.PackageStatuses = &protobufs.PackageStatuses{}
if c.PackagesStateProvider != nil {
// We have a state provider, so package related capabilities can work.
msg.StatusReport.Capabilities |= protobufs.AgentCapabilities_AcceptsPackages
msg.StatusReport.Capabilities |= protobufs.AgentCapabilities_ReportsPackageStatuses
}

// TODO: set PackageStatuses.ServerProvidedAllPackagesHash field and
// handle the Hashes for PackageStatuses properly.
},
)
return nil
}

// AgentDescription returns the current state of the AgentDescription.
func (c *ClientCommon) AgentDescription() *protobufs.AgentDescription {
return c.ClientSyncedState.AgentDescription()
// Return a cloned copy to allow caller to do whatever they want with the result.
return proto.Clone(c.ClientSyncedState.AgentDescription()).(*protobufs.AgentDescription)
}

// SetAgentDescription sends a status update to the Server with the new AgentDescription
Expand All @@ -183,7 +209,7 @@ func (c *ClientCommon) SetAgentDescription(descr *protobufs.AgentDescription) er
return err
}
c.sender.NextMessage().UpdateStatus(func(statusReport *protobufs.StatusReport) {
statusReport.AgentDescription = descr
statusReport.AgentDescription = c.ClientSyncedState.AgentDescription()
})
c.sender.ScheduleSend()
return nil
Expand Down
Loading

0 comments on commit 26dad6c

Please sign in to comment.