diff --git a/sdk/messaging/eventgrid/aznamespaces/CHANGELOG.md b/sdk/messaging/eventgrid/aznamespaces/CHANGELOG.md new file mode 100644 index 000000000000..49286b8872b2 --- /dev/null +++ b/sdk/messaging/eventgrid/aznamespaces/CHANGELOG.md @@ -0,0 +1,51 @@ +# Release History + +## 0.4.1 (Unreleased) + +### Features Added + +### Breaking Changes + +- This module has been moved from its previous location in `azeventgrid` to this location (`github.com/Azure/azure-sdk-for-go/sdk/messaging/eventgrid/aznamespaces`). + +### Bugs Fixed + +### Other Changes + +## 0.4.0 (2023-11-27) + +### Features Added + +- New functionality for Event Grid namespaces: + - Client.PublishCloudEvent can be used to publish a single `messaging.CloudEvent`. + - Client.RenewCloudEventLocks can extend the lock time for a set of events. + - Client.ReleaseCloudEvents (via ReleaseCloudEventsOptions.ReleaseDelayInSeconds) can release an event with a + server-side delay, allowing the message to remain unavailable for a configured period of time. + +### Breaking Changes + +- FailedLockToken, included in the response for settlement functions, has an `Error` field, which contains the data previously + in `ErrorDescription` and `ErrorCode`. +- Settlement functions (AcknowledgeCloudEvents, ReleaseCloudEvents, RejectCloudEvents) take lock tokens as a parameter. + +## 0.3.0 (2023-10-17) + +### Breaking Changes + +- Client constructors that take a `key string` parameter for a credential now require an `*azcore.KeyCredential` or `*azcore.SASCredential`. + +## 0.2.0 (2023-09-12) + +### Features Added + +- The publisher client for Event Grid topics has been added as a sub-package under `publisher`. + +### Other Changes + +- Documentation and examples added for Event Grid namespace client. + +## 0.1.0 (2023-07-11) + +### Features Added + +- Initial preview for the Event Grid package for Event Grid Namespaces diff --git a/sdk/messaging/eventgrid/aznamespaces/LICENSE.txt b/sdk/messaging/eventgrid/aznamespaces/LICENSE.txt new file mode 100644 index 000000000000..22aed37e650b --- /dev/null +++ b/sdk/messaging/eventgrid/aznamespaces/LICENSE.txt @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) Microsoft Corporation. + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/sdk/messaging/eventgrid/aznamespaces/README.md b/sdk/messaging/eventgrid/aznamespaces/README.md new file mode 100644 index 000000000000..bd9aaae97e6a --- /dev/null +++ b/sdk/messaging/eventgrid/aznamespaces/README.md @@ -0,0 +1,119 @@ +# Azure Event Grid Namespaces Client Module for Go + +[Azure Event Grid](https://learn.microsoft.com/azure/event-grid/overview) is a highly scalable, fully managed Pub Sub message distribution service that offers flexible message consumption patterns. For more information about Event Grid see: [link](https://learn.microsoft.com/azure/event-grid/overview). + +This client module allows you to publish events and receive events using the [Pull delivery](https://learn.microsoft.com/azure/event-grid/pull-delivery-overview) API. + +> NOTE: This client does not work with Event Grid Basic. Use the [publisher.Client][godoc_publisher_client] in the `publisher` sub-package instead. + +Key links: +- [Source code][source] +- [API Reference Documentation][godoc] +- [Product documentation](https://azure.microsoft.com/services/event-grid/) +- [Samples][godoc_examples] + +## Getting started + +### Install the package + +Install the Azure Event Grid Namespaces client module for Go with `go get`: + +```bash +go get github.com/Azure/azure-sdk-for-go/sdk/messaging/eventgrid/aznamespaces +``` + +### Prerequisites + +- Go, version 1.18 or higher +- An [Azure subscription](https://azure.microsoft.com/free/) +- An [Event Grid namespace][ms_namespace]. You can create an Event Grid namespace using the [Azure Portal][ms_create_namespace]. +- An [Event Grid namespace topic][ms_topic]. You can create an Event Grid namespace topic using the [Azure Portal][ms_create_topic]. + +### Authenticate the client + +Event Grid namespace clients authenticate using a shared key credential. An example of that can be viewed here: [ExampleNewClientWithSharedKeyCredential][godoc_example_newclient]. + +# Key concepts + +An Event Grid namespace is a container for multiple types of resources, including [**namespace topics**][ms_topic]: +- A [**namespace topic**][ms_topic] contains CloudEvents that you publish, via [Client.PublishCloudEvents][godoc_client_publish]. +- A [**topic subscription**][ms_subscription], associated with a single topic, can be used to receive events via [Client.ReceiveEvents][godoc_client_receive]. + +Namespaces also offer access using MQTT, although that is not covered in this package. + +# Examples + +Examples for various scenarios can be found on [pkg.go.dev][godoc_examples] or in the example*_test.go files in our GitHub repo for [aznamespaces][source]. + +# Troubleshooting + +### Logging + +This module uses the classification-based logging implementation in `azcore`. To enable console logging for all SDK modules, set the environment variable `AZURE_SDK_GO_LOGGING` to `all`. + +Use the `azcore/log` package to control log event output. + +```go +import ( + "fmt" + azlog "github.com/Azure/azure-sdk-for-go/sdk/azcore/log" +) + +// print log output to stdout +azlog.SetListener(func(event azlog.Event, s string) { + fmt.Printf("[%s] %s\n", event, s) +}) +``` + +# Next steps + +More sample code should go here, along with links out to the appropriate example tests. + +## Contributing +For details on contributing to this repository, see the [contributing guide][azure_sdk_for_go_contributing]. + +This project welcomes contributions and suggestions. Most contributions require you to agree to a +Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us +the rights to use your contribution. For details, visit https://cla.microsoft.com. + +When you submit a pull request, a CLA-bot will automatically determine whether you need to provide +a CLA and decorate the PR appropriately (e.g., label, comment). Simply follow the instructions +provided by the bot. You will only need to do this once across all repos using our CLA. + +This project has adopted the [Microsoft Open Source Code of Conduct](https://opensource.microsoft.com/codeofconduct/). +For more information see the [Code of Conduct FAQ](https://opensource.microsoft.com/codeofconduct/faq/) or +contact [opencode@microsoft.com](mailto:opencode@microsoft.com) with any additional questions or comments. + +### Additional Helpful Links for Contributors +Many people all over the world have helped make this project better. You'll want to check out: + +* [What are some good first issues for new contributors to the repo?](https://github.com/azure/azure-sdk-for-go/issues?q=is%3Aopen+is%3Aissue+label%3A%22up+for+grabs%22) +* [How to build and test your change][azure_sdk_for_go_contributing_developer_guide] +* [How you can make a change happen!][azure_sdk_for_go_contributing_pull_requests] +* Frequently Asked Questions (FAQ) and Conceptual Topics in the detailed [Azure SDK for Go wiki](https://github.com/azure/azure-sdk-for-go/wiki). + + +### Reporting security issues and security bugs + +Security issues and bugs should be reported privately, via email, to the Microsoft Security Response Center (MSRC) . You should receive a response within 24 hours. If for some reason you do not, please follow up via email to ensure we received your original message. Further information, including the MSRC PGP key, can be found in the [Security TechCenter](https://www.microsoft.com/msrc/faqs-report-an-issue). + +### License + +Azure SDK for Go is licensed under the [MIT](https://github.com/Azure/azure-sdk-for-go/blob/main/sdk/template/aztemplate/LICENSE.txt) license. + + +[azure_sdk_for_go_contributing]: https://github.com/Azure/azure-sdk-for-go/blob/main/CONTRIBUTING.md +[azure_sdk_for_go_contributing_developer_guide]: https://github.com/Azure/azure-sdk-for-go/blob/main/CONTRIBUTING.md#developer-guide +[azure_sdk_for_go_contributing_pull_requests]: https://github.com/Azure/azure-sdk-for-go/blob/main/CONTRIBUTING.md#pull-requests +[source]: https://aka.ms/azsdk/go/namespaces/src +[godoc]: https://aka.ms/azsdk/go/namespaces/pkg +[godoc_client_publish]: https://aka.ms/azsdk/go/namespaces/pkg#Client.PublishCloudEvents +[godoc_client_receive]: https://aka.ms/azsdk/go/namespaces/pkg#Client.ReceiveCloudEvents +[godoc_examples]: https://aka.ms/azsdk/go/namespaces/pkg#pkg-examples +[godoc_example_newclient]: https://aka.ms/azsdk/go/namespaces/pkg#example-NewClientWithSharedKeyCredential +[godoc_publisher_client]: https://aka.ms/azsdk/go/eventgrid/pkg/#Client +[ms_namespace]: https://learn.microsoft.com/azure/event-grid/concepts-pull-delivery#namespaces +[ms_topic]: https://learn.microsoft.com/azure/event-grid/concepts-pull-delivery#namespace-topics +[ms_subscription]: https://learn.microsoft.com/azure/event-grid/concepts-pull-delivery#event-subscriptions +[ms_create_namespace]: https://learn.microsoft.com/azure/event-grid/create-view-manage-namespaces +[ms_create_topic]: https://learn.microsoft.com/azure/event-grid/create-view-manage-namespace-topics diff --git a/sdk/messaging/eventgrid/aznamespaces/autorest.md b/sdk/messaging/eventgrid/aznamespaces/autorest.md new file mode 100644 index 000000000000..de57bef43076 --- /dev/null +++ b/sdk/messaging/eventgrid/aznamespaces/autorest.md @@ -0,0 +1,207 @@ +## Go + +``` yaml +title: EventGridClient +description: Azure Event Grid client +generated-metadata: false +clear-output-folder: false +go: true +input-file: + - https://raw.githubusercontent.com/Azure/azure-rest-api-specs/2264262e0c7575a794cc395609d2342c7e598149/specification/eventgrid/data-plane/Microsoft.EventGrid/preview/2023-10-01-preview/EventGrid.json +license-header: MICROSOFT_MIT_NO_VERSION +module: github.com/Azure/azure-sdk-for-go/sdk/messaging/eventgrid/aznamespaces +openapi-type: "data-plane" +output-folder: ../aznamespaces +override-client-name: Client +security: "AADToken" +use: "@autorest/go@4.0.0-preview.63" +version: "^3.0.0" +slice-elements-byval: true +remove-non-reference-schema: true +``` + +Make sure the content type is setup properly for publishing single and multiple events. + +```yaml +directive: + - from: + - client.go + where: $ + transform: | + return $.replace( + /(func \(client \*Client\) publishCloudEventsCreateRequest.+?)return req, nil/s, + '$1\nreq.Raw().Header.Set("Content-type", "application/cloudevents-batch+json; charset=utf-8")\nreturn req, nil'); + - from: + - client.go + where: $ + transform: | + return $.replace( + /(func \(client \*Client\) publishCloudEventCreateRequest.+?)return req, nil/s, + '$1\nreq.Raw().Header.Set("Content-type", "application/cloudevents+json; charset=utf-8")\nreturn req, nil'); +``` + +Fix the error type so it's a bit more presentable, and looks like an error for this package. + +```yaml +directive: + - from: swagger-document + where: $.definitions["Azure.Core.Foundations.Error"] + debug: true + transform: | + $.properties = { + code: $.properties["code"], + message: { + ...$.properties["message"], + "x-ms-client-name": "InternalErrorMessageRename" + }, + }; + $["x-ms-client-name"] = "Error"; + + - from: swagger-document + where: $.definitions + transform: delete $["Azure.Core.Foundations.InnerError"]; + + - from: + - models.go + - models_serde.go + where: $ + transform: return $.replace(/InternalErrorMessageRename/g, "message"); + + - from: + - models.go + - models_serde.go + where: $ + transform: | + return $ + .replace(/\/\/ AzureCoreFoundationsErrorResponse.+?\n}/gs, "") + .replace(/\/\/ MarshalJSON implements the json\.Marshaller interface for type AzureCoreFoundationsErrorResponse\..+?\n}/gs, "") + .replace(/\/\/ UnmarshalJSON implements the json\.Unmarshaller interface for type AzureCoreFoundationsErrorResponse\..+?\n}/gs, ""); +``` + +Trim out the 'Interface any' for types that are empty. + +```yaml +directive: + - from: responses.go + where: $ + transform: $.replace(/\s+\/\/ Anything\s+Interface any/sg, "$1"); +``` + +For functions that have empty responses (ie, PublishCloudEvent +and PublishCloudEvents) we can remove the schema attribute, which cleans +up the PublishCloudEventResponse/PublishCloudEventsResponse +so they don't have a vestigial `Interface any` field. + +```yaml +directive: + # remove the 'Interface any' that's generated for an empty response object. + - from: + - swagger-document + where: $["x-ms-paths"]["/topics/{topicName}:publish?_overload=publishCloudEvents"].post.responses["200"] + transform: delete $["schema"]; + - from: + - swagger-document + where: $["paths"]["/topics/{topicName}:publish"].post.responses["200"] + transform: delete $["schema"]; +``` + +Use azcore's CloudEvent type instead of a locally generated version. + +```yaml +directive: + # replace references to the "generated" CloudEvent to the actual version in azcore/messaging + - from: + - client.go + - models.go + - responses.go + - options.go + where: $ + transform: | + return $.replace(/\[\]CloudEvent/g, "[]messaging.CloudEvent") + .replace(/\*CloudEvent/g, "messaging.CloudEvent") + .replace(/event CloudEvent/g, "event messaging.CloudEvent") + - from: swagger-document + where: $.definitions.CloudEvent + transform: $["x-ms-external"] = true + # make the endpoint a parameter of the client constructor + - from: swagger-document + where: $["x-ms-parameterized-host"] + transform: $.parameters[0]["x-ms-parameter-location"] = "client" + # delete client name prefix from method options and response types + - from: + - client.go + - models.go + - responses.go + - options.go + where: $ + transform: return $.replace(/Client(\w+)((?:Options|Response))/g, "$1$2"); +``` + +Fix incorrect string formatting for the "release with delay" + +```yaml +directive: + - from: swagger-document + where: $.paths["/topics/{topicName}/eventsubscriptions/{eventSubscriptionName}:release"].post.parameters[3] + transform: $.type = "integer"; + - from: client.go + where: $ + transform: return $.replace(/string\(\*options.ReleaseDelayInSeconds\)/g, "fmt.Sprintf(\"%d\", *options.ReleaseDelayInSeconds)") +``` + +Add doc for ReleaseDelay enum + +```yaml +directive: + - from: constants.go + where: $ + transform: return $.replace(/type ReleaseDelay int32/, "// ReleaseDelay indicates how long the service should delay before releasing an event.\ntype ReleaseDelay int32") +``` + +We want to flatten out the settlement arg functions so we'll internalize +them and do the flattening in custom code. + +```yaml +directive: + # Rename the functions so they're internal + + - from: client.go + where: $ + transform: return $.replace(/func \(client \*Client\) RejectCloudEvents\(/, "func \(client \*Client\) internalRejectCloudEvents(") + - from: client.go + where: $ + transform: return $.replace(/func \(client \*Client\) AcknowledgeCloudEvents\(/, "func \(client \*Client\) internalAcknowledgeCloudEvents(") + - from: client.go + where: $ + transform: return $.replace(/func \(client \*Client\) ReleaseCloudEvents\(/, "func \(client \*Client\) internalReleaseCloudEvents(") + - from: client.go + where: $ + transform: return $.replace(/func \(client \*Client\) RenewCloudEventLocks\(/, "func \(client \*Client\) internalRenewCloudEventLocks(") + + # Rename the old param bags to be internal as well + + - from: + - client.go + - models.go + - models_serde.go + where: $ + transform: return $.replace(/\bReleaseOptions\b/g, "releaseOptions") + - from: + - client.go + - models.go + - models_serde.go + where: $ + transform: return $.replace(/\bRejectOptions\b/g, "rejectOptions") + - from: + - client.go + - models.go + - models_serde.go + where: $ + transform: return $.replace(/\bAcknowledgeOptions\b/g, "acknowledgeOptions") + - from: + - client.go + - models.go + - models_serde.go + where: $ + transform: return $.replace(/\RenewLockOptions\b/g, "renewLockOptions") +``` diff --git a/sdk/messaging/eventgrid/aznamespaces/build.go b/sdk/messaging/eventgrid/aznamespaces/build.go new file mode 100644 index 000000000000..cb6532d1fe2d --- /dev/null +++ b/sdk/messaging/eventgrid/aznamespaces/build.go @@ -0,0 +1,7 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +//go:generate autorest ./autorest.md +//go:generate goimports -w . + +package aznamespaces diff --git a/sdk/messaging/eventgrid/aznamespaces/ci.yml b/sdk/messaging/eventgrid/aznamespaces/ci.yml new file mode 100644 index 000000000000..ba9ad5952034 --- /dev/null +++ b/sdk/messaging/eventgrid/aznamespaces/ci.yml @@ -0,0 +1,35 @@ +# NOTE: Please refer to https://aka.ms/azsdk/engsys/ci-yaml before editing this file. +trigger: + branches: + include: + - main + - feature/* + - hotfix/* + - release/* + paths: + include: + - sdk/messaging/eventgrid/aznamespaces + +pr: + branches: + include: + - main + - feature/* + - hotfix/* + - release/* + paths: + include: + - sdk/messaging/eventgrid/aznamespaces + +stages: +- template: /eng/pipelines/templates/jobs/archetype-sdk-client.yml + parameters: + ServiceDirectory: "messaging/eventgrid/aznamespaces" + RunLiveTests: true + UsePipelineProxy: false + Location: westus2 + EnvVars: + AZURE_CLIENT_ID: $(AZNAMESPACES_CLIENT_ID) + AZURE_TENANT_ID: $(AZNAMESPACES_TENANT_ID) + AZURE_CLIENT_SECRET: $(AZNAMESPACES_CLIENT_SECRET) + AZURE_SUBSCRIPTION_ID: $(AZNAMESPACES_SUBSCRIPTION_ID) diff --git a/sdk/messaging/eventgrid/aznamespaces/client.go b/sdk/messaging/eventgrid/aznamespaces/client.go new file mode 100644 index 000000000000..255c9a97eab4 --- /dev/null +++ b/sdk/messaging/eventgrid/aznamespaces/client.go @@ -0,0 +1,447 @@ +//go:build go1.18 +// +build go1.18 + +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. +// Code generated by Microsoft (R) AutoRest Code Generator. DO NOT EDIT. +// Changes may cause incorrect behavior and will be lost if the code is regenerated. + +package aznamespaces + +import ( + "context" + "errors" + "fmt" + "net/http" + "net/url" + "strconv" + "strings" + + "github.com/Azure/azure-sdk-for-go/sdk/azcore" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/messaging" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/policy" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime" +) + +// Client contains the methods for the Client group. +// Don't use this type directly, use a constructor function instead. +type Client struct { + internal *azcore.Client + endpoint string +} + +// AcknowledgeCloudEvents - Acknowledge batch of Cloud Events. The server responds with an HTTP 200 status code if the request +// is successfully accepted. The response body will include the set of successfully acknowledged +// lockTokens, along with other failed lockTokens with their corresponding error information. Successfully acknowledged events +// will no longer be available to any consumer. +// If the operation fails it returns an *azcore.ResponseError type. +// +// Generated from API version 2023-10-01-preview +// - topicName - Topic Name. +// - eventSubscriptionName - Event Subscription Name. +// - acknowledgeOptions - acknowledgeOptions. +// - options - AcknowledgeCloudEventsOptions contains the optional parameters for the Client.AcknowledgeCloudEvents method. +func (client *Client) internalAcknowledgeCloudEvents(ctx context.Context, topicName string, eventSubscriptionName string, acknowledgeOptions acknowledgeOptions, options *AcknowledgeCloudEventsOptions) (AcknowledgeCloudEventsResponse, error) { + var err error + req, err := client.acknowledgeCloudEventsCreateRequest(ctx, topicName, eventSubscriptionName, acknowledgeOptions, options) + if err != nil { + return AcknowledgeCloudEventsResponse{}, err + } + httpResp, err := client.internal.Pipeline().Do(req) + if err != nil { + return AcknowledgeCloudEventsResponse{}, err + } + if !runtime.HasStatusCode(httpResp, http.StatusOK) { + err = runtime.NewResponseError(httpResp) + return AcknowledgeCloudEventsResponse{}, err + } + resp, err := client.acknowledgeCloudEventsHandleResponse(httpResp) + return resp, err +} + +// acknowledgeCloudEventsCreateRequest creates the AcknowledgeCloudEvents request. +func (client *Client) acknowledgeCloudEventsCreateRequest(ctx context.Context, topicName string, eventSubscriptionName string, acknowledgeOptions acknowledgeOptions, options *AcknowledgeCloudEventsOptions) (*policy.Request, error) { + urlPath := "/topics/{topicName}/eventsubscriptions/{eventSubscriptionName}:acknowledge" + if topicName == "" { + return nil, errors.New("parameter topicName cannot be empty") + } + urlPath = strings.ReplaceAll(urlPath, "{topicName}", url.PathEscape(topicName)) + if eventSubscriptionName == "" { + return nil, errors.New("parameter eventSubscriptionName cannot be empty") + } + urlPath = strings.ReplaceAll(urlPath, "{eventSubscriptionName}", url.PathEscape(eventSubscriptionName)) + req, err := runtime.NewRequest(ctx, http.MethodPost, runtime.JoinPaths(client.endpoint, urlPath)) + if err != nil { + return nil, err + } + reqQP := req.Raw().URL.Query() + reqQP.Set("api-version", "2023-10-01-preview") + req.Raw().URL.RawQuery = reqQP.Encode() + req.Raw().Header["Accept"] = []string{"application/json"} + if err := runtime.MarshalAsJSON(req, acknowledgeOptions); err != nil { + return nil, err + } + return req, nil +} + +// acknowledgeCloudEventsHandleResponse handles the AcknowledgeCloudEvents response. +func (client *Client) acknowledgeCloudEventsHandleResponse(resp *http.Response) (AcknowledgeCloudEventsResponse, error) { + result := AcknowledgeCloudEventsResponse{} + if err := runtime.UnmarshalAsJSON(resp, &result.AcknowledgeResult); err != nil { + return AcknowledgeCloudEventsResponse{}, err + } + return result, nil +} + +// PublishCloudEvent - Publish Single Cloud Event to namespace topic. In case of success, the server responds with an HTTP +// 200 status code with an empty JSON object in response. Otherwise, the server can return various +// error codes. For example, 401: which indicates authorization failure, 403: which indicates quota exceeded or message is +// too large, 410: which indicates that specific topic is not found, 400: for bad +// request, and 500: for internal server error. +// If the operation fails it returns an *azcore.ResponseError type. +// +// Generated from API version 2023-10-01-preview +// - topicName - Topic Name. +// - event - Single Cloud Event being published. +// - options - PublishCloudEventOptions contains the optional parameters for the Client.PublishCloudEvent method. +func (client *Client) PublishCloudEvent(ctx context.Context, topicName string, event messaging.CloudEvent, options *PublishCloudEventOptions) (PublishCloudEventResponse, error) { + var err error + req, err := client.publishCloudEventCreateRequest(ctx, topicName, event, options) + if err != nil { + return PublishCloudEventResponse{}, err + } + httpResp, err := client.internal.Pipeline().Do(req) + if err != nil { + return PublishCloudEventResponse{}, err + } + if !runtime.HasStatusCode(httpResp, http.StatusOK) { + err = runtime.NewResponseError(httpResp) + return PublishCloudEventResponse{}, err + } + return PublishCloudEventResponse{}, nil +} + +// publishCloudEventCreateRequest creates the PublishCloudEvent request. +func (client *Client) publishCloudEventCreateRequest(ctx context.Context, topicName string, event messaging.CloudEvent, options *PublishCloudEventOptions) (*policy.Request, error) { + urlPath := "/topics/{topicName}:publish" + if topicName == "" { + return nil, errors.New("parameter topicName cannot be empty") + } + urlPath = strings.ReplaceAll(urlPath, "{topicName}", url.PathEscape(topicName)) + req, err := runtime.NewRequest(ctx, http.MethodPost, runtime.JoinPaths(client.endpoint, urlPath)) + if err != nil { + return nil, err + } + reqQP := req.Raw().URL.Query() + reqQP.Set("api-version", "2023-10-01-preview") + req.Raw().URL.RawQuery = reqQP.Encode() + req.Raw().Header["Accept"] = []string{"application/json"} + if err := runtime.MarshalAsJSON(req, event); err != nil { + return nil, err + } + + req.Raw().Header.Set("Content-type", "application/cloudevents+json; charset=utf-8") + return req, nil +} + +// PublishCloudEvents - Publish Batch Cloud Event to namespace topic. In case of success, the server responds with an HTTP +// 200 status code with an empty JSON object in response. Otherwise, the server can return various error +// codes. For example, 401: which indicates authorization failure, 403: which indicates quota exceeded or message is too large, +// 410: which indicates that specific topic is not found, 400: for bad +// request, and 500: for internal server error. +// If the operation fails it returns an *azcore.ResponseError type. +// +// Generated from API version 2023-10-01-preview +// - topicName - Topic Name. +// - events - Array of Cloud Events being published. +// - options - PublishCloudEventsOptions contains the optional parameters for the Client.PublishCloudEvents method. +func (client *Client) PublishCloudEvents(ctx context.Context, topicName string, events []messaging.CloudEvent, options *PublishCloudEventsOptions) (PublishCloudEventsResponse, error) { + var err error + req, err := client.publishCloudEventsCreateRequest(ctx, topicName, events, options) + if err != nil { + return PublishCloudEventsResponse{}, err + } + httpResp, err := client.internal.Pipeline().Do(req) + if err != nil { + return PublishCloudEventsResponse{}, err + } + if !runtime.HasStatusCode(httpResp, http.StatusOK) { + err = runtime.NewResponseError(httpResp) + return PublishCloudEventsResponse{}, err + } + return PublishCloudEventsResponse{}, nil +} + +// publishCloudEventsCreateRequest creates the PublishCloudEvents request. +func (client *Client) publishCloudEventsCreateRequest(ctx context.Context, topicName string, events []messaging.CloudEvent, options *PublishCloudEventsOptions) (*policy.Request, error) { + urlPath := "/topics/{topicName}:publish" + if topicName == "" { + return nil, errors.New("parameter topicName cannot be empty") + } + urlPath = strings.ReplaceAll(urlPath, "{topicName}", url.PathEscape(topicName)) + req, err := runtime.NewRequest(ctx, http.MethodPost, runtime.JoinPaths(client.endpoint, urlPath)) + if err != nil { + return nil, err + } + reqQP := req.Raw().URL.Query() + reqQP.Set("api-version", "2023-10-01-preview") + req.Raw().URL.RawQuery = reqQP.Encode() + req.Raw().Header["Accept"] = []string{"application/json"} + if err := runtime.MarshalAsJSON(req, events); err != nil { + return nil, err + } + + req.Raw().Header.Set("Content-type", "application/cloudevents-batch+json; charset=utf-8") + return req, nil +} + +// ReceiveCloudEvents - Receive Batch of Cloud Events from the Event Subscription. +// If the operation fails it returns an *azcore.ResponseError type. +// +// Generated from API version 2023-10-01-preview +// - topicName - Topic Name. +// - eventSubscriptionName - Event Subscription Name. +// - options - ReceiveCloudEventsOptions contains the optional parameters for the Client.ReceiveCloudEvents method. +func (client *Client) ReceiveCloudEvents(ctx context.Context, topicName string, eventSubscriptionName string, options *ReceiveCloudEventsOptions) (ReceiveCloudEventsResponse, error) { + var err error + req, err := client.receiveCloudEventsCreateRequest(ctx, topicName, eventSubscriptionName, options) + if err != nil { + return ReceiveCloudEventsResponse{}, err + } + httpResp, err := client.internal.Pipeline().Do(req) + if err != nil { + return ReceiveCloudEventsResponse{}, err + } + if !runtime.HasStatusCode(httpResp, http.StatusOK) { + err = runtime.NewResponseError(httpResp) + return ReceiveCloudEventsResponse{}, err + } + resp, err := client.receiveCloudEventsHandleResponse(httpResp) + return resp, err +} + +// receiveCloudEventsCreateRequest creates the ReceiveCloudEvents request. +func (client *Client) receiveCloudEventsCreateRequest(ctx context.Context, topicName string, eventSubscriptionName string, options *ReceiveCloudEventsOptions) (*policy.Request, error) { + urlPath := "/topics/{topicName}/eventsubscriptions/{eventSubscriptionName}:receive" + if topicName == "" { + return nil, errors.New("parameter topicName cannot be empty") + } + urlPath = strings.ReplaceAll(urlPath, "{topicName}", url.PathEscape(topicName)) + if eventSubscriptionName == "" { + return nil, errors.New("parameter eventSubscriptionName cannot be empty") + } + urlPath = strings.ReplaceAll(urlPath, "{eventSubscriptionName}", url.PathEscape(eventSubscriptionName)) + req, err := runtime.NewRequest(ctx, http.MethodPost, runtime.JoinPaths(client.endpoint, urlPath)) + if err != nil { + return nil, err + } + reqQP := req.Raw().URL.Query() + reqQP.Set("api-version", "2023-10-01-preview") + if options != nil && options.MaxEvents != nil { + reqQP.Set("maxEvents", strconv.FormatInt(int64(*options.MaxEvents), 10)) + } + if options != nil && options.MaxWaitTime != nil { + reqQP.Set("maxWaitTime", strconv.FormatInt(int64(*options.MaxWaitTime), 10)) + } + req.Raw().URL.RawQuery = reqQP.Encode() + req.Raw().Header["Accept"] = []string{"application/json"} + return req, nil +} + +// receiveCloudEventsHandleResponse handles the ReceiveCloudEvents response. +func (client *Client) receiveCloudEventsHandleResponse(resp *http.Response) (ReceiveCloudEventsResponse, error) { + result := ReceiveCloudEventsResponse{} + if err := runtime.UnmarshalAsJSON(resp, &result.ReceiveResult); err != nil { + return ReceiveCloudEventsResponse{}, err + } + return result, nil +} + +// RejectCloudEvents - Reject batch of Cloud Events. The server responds with an HTTP 200 status code if the request is successfully +// accepted. The response body will include the set of successfully rejected lockTokens, +// along with other failed lockTokens with their corresponding error information. +// If the operation fails it returns an *azcore.ResponseError type. +// +// Generated from API version 2023-10-01-preview +// - topicName - Topic Name. +// - eventSubscriptionName - Event Subscription Name. +// - rejectOptions - rejectOptions +// - options - RejectCloudEventsOptions contains the optional parameters for the Client.RejectCloudEvents method. +func (client *Client) internalRejectCloudEvents(ctx context.Context, topicName string, eventSubscriptionName string, rejectOptions rejectOptions, options *RejectCloudEventsOptions) (RejectCloudEventsResponse, error) { + var err error + req, err := client.rejectCloudEventsCreateRequest(ctx, topicName, eventSubscriptionName, rejectOptions, options) + if err != nil { + return RejectCloudEventsResponse{}, err + } + httpResp, err := client.internal.Pipeline().Do(req) + if err != nil { + return RejectCloudEventsResponse{}, err + } + if !runtime.HasStatusCode(httpResp, http.StatusOK) { + err = runtime.NewResponseError(httpResp) + return RejectCloudEventsResponse{}, err + } + resp, err := client.rejectCloudEventsHandleResponse(httpResp) + return resp, err +} + +// rejectCloudEventsCreateRequest creates the RejectCloudEvents request. +func (client *Client) rejectCloudEventsCreateRequest(ctx context.Context, topicName string, eventSubscriptionName string, rejectOptions rejectOptions, options *RejectCloudEventsOptions) (*policy.Request, error) { + urlPath := "/topics/{topicName}/eventsubscriptions/{eventSubscriptionName}:reject" + if topicName == "" { + return nil, errors.New("parameter topicName cannot be empty") + } + urlPath = strings.ReplaceAll(urlPath, "{topicName}", url.PathEscape(topicName)) + if eventSubscriptionName == "" { + return nil, errors.New("parameter eventSubscriptionName cannot be empty") + } + urlPath = strings.ReplaceAll(urlPath, "{eventSubscriptionName}", url.PathEscape(eventSubscriptionName)) + req, err := runtime.NewRequest(ctx, http.MethodPost, runtime.JoinPaths(client.endpoint, urlPath)) + if err != nil { + return nil, err + } + reqQP := req.Raw().URL.Query() + reqQP.Set("api-version", "2023-10-01-preview") + req.Raw().URL.RawQuery = reqQP.Encode() + req.Raw().Header["Accept"] = []string{"application/json"} + if err := runtime.MarshalAsJSON(req, rejectOptions); err != nil { + return nil, err + } + return req, nil +} + +// rejectCloudEventsHandleResponse handles the RejectCloudEvents response. +func (client *Client) rejectCloudEventsHandleResponse(resp *http.Response) (RejectCloudEventsResponse, error) { + result := RejectCloudEventsResponse{} + if err := runtime.UnmarshalAsJSON(resp, &result.RejectResult); err != nil { + return RejectCloudEventsResponse{}, err + } + return result, nil +} + +// ReleaseCloudEvents - Release batch of Cloud Events. The server responds with an HTTP 200 status code if the request is +// successfully accepted. The response body will include the set of successfully released lockTokens, +// along with other failed lockTokens with their corresponding error information. +// If the operation fails it returns an *azcore.ResponseError type. +// +// Generated from API version 2023-10-01-preview +// - topicName - Topic Name. +// - eventSubscriptionName - Event Subscription Name. +// - releaseOptions - releaseOptions +// - options - ReleaseCloudEventsOptions contains the optional parameters for the Client.ReleaseCloudEvents method. +func (client *Client) internalReleaseCloudEvents(ctx context.Context, topicName string, eventSubscriptionName string, releaseOptions releaseOptions, options *ReleaseCloudEventsOptions) (ReleaseCloudEventsResponse, error) { + var err error + req, err := client.releaseCloudEventsCreateRequest(ctx, topicName, eventSubscriptionName, releaseOptions, options) + if err != nil { + return ReleaseCloudEventsResponse{}, err + } + httpResp, err := client.internal.Pipeline().Do(req) + if err != nil { + return ReleaseCloudEventsResponse{}, err + } + if !runtime.HasStatusCode(httpResp, http.StatusOK) { + err = runtime.NewResponseError(httpResp) + return ReleaseCloudEventsResponse{}, err + } + resp, err := client.releaseCloudEventsHandleResponse(httpResp) + return resp, err +} + +// releaseCloudEventsCreateRequest creates the ReleaseCloudEvents request. +func (client *Client) releaseCloudEventsCreateRequest(ctx context.Context, topicName string, eventSubscriptionName string, releaseOptions releaseOptions, options *ReleaseCloudEventsOptions) (*policy.Request, error) { + urlPath := "/topics/{topicName}/eventsubscriptions/{eventSubscriptionName}:release" + if topicName == "" { + return nil, errors.New("parameter topicName cannot be empty") + } + urlPath = strings.ReplaceAll(urlPath, "{topicName}", url.PathEscape(topicName)) + if eventSubscriptionName == "" { + return nil, errors.New("parameter eventSubscriptionName cannot be empty") + } + urlPath = strings.ReplaceAll(urlPath, "{eventSubscriptionName}", url.PathEscape(eventSubscriptionName)) + req, err := runtime.NewRequest(ctx, http.MethodPost, runtime.JoinPaths(client.endpoint, urlPath)) + if err != nil { + return nil, err + } + reqQP := req.Raw().URL.Query() + reqQP.Set("api-version", "2023-10-01-preview") + if options != nil && options.ReleaseDelayInSeconds != nil { + reqQP.Set("releaseDelayInSeconds", fmt.Sprintf("%v", *options.ReleaseDelayInSeconds)) + } + req.Raw().URL.RawQuery = reqQP.Encode() + req.Raw().Header["Accept"] = []string{"application/json"} + if err := runtime.MarshalAsJSON(req, releaseOptions); err != nil { + return nil, err + } + return req, nil +} + +// releaseCloudEventsHandleResponse handles the ReleaseCloudEvents response. +func (client *Client) releaseCloudEventsHandleResponse(resp *http.Response) (ReleaseCloudEventsResponse, error) { + result := ReleaseCloudEventsResponse{} + if err := runtime.UnmarshalAsJSON(resp, &result.ReleaseResult); err != nil { + return ReleaseCloudEventsResponse{}, err + } + return result, nil +} + +// RenewCloudEventLocks - Renew lock for batch of Cloud Events. The server responds with an HTTP 200 status code if the request +// is successfully accepted. The response body will include the set of successfully renewed +// lockTokens, along with other failed lockTokens with their corresponding error information. +// If the operation fails it returns an *azcore.ResponseError type. +// +// Generated from API version 2023-10-01-preview +// - topicName - Topic Name. +// - eventSubscriptionName - Event Subscription Name. +// - renewLockOptions - renewLockOptions +// - options - RenewCloudEventLocksOptions contains the optional parameters for the Client.RenewCloudEventLocks method. +func (client *Client) internalRenewCloudEventLocks(ctx context.Context, topicName string, eventSubscriptionName string, renewLockOptions renewLockOptions, options *RenewCloudEventLocksOptions) (RenewCloudEventLocksResponse, error) { + var err error + req, err := client.renewCloudEventLocksCreateRequest(ctx, topicName, eventSubscriptionName, renewLockOptions, options) + if err != nil { + return RenewCloudEventLocksResponse{}, err + } + httpResp, err := client.internal.Pipeline().Do(req) + if err != nil { + return RenewCloudEventLocksResponse{}, err + } + if !runtime.HasStatusCode(httpResp, http.StatusOK) { + err = runtime.NewResponseError(httpResp) + return RenewCloudEventLocksResponse{}, err + } + resp, err := client.renewCloudEventLocksHandleResponse(httpResp) + return resp, err +} + +// renewCloudEventLocksCreateRequest creates the RenewCloudEventLocks request. +func (client *Client) renewCloudEventLocksCreateRequest(ctx context.Context, topicName string, eventSubscriptionName string, renewLockOptions renewLockOptions, options *RenewCloudEventLocksOptions) (*policy.Request, error) { + urlPath := "/topics/{topicName}/eventsubscriptions/{eventSubscriptionName}:renewLock" + if topicName == "" { + return nil, errors.New("parameter topicName cannot be empty") + } + urlPath = strings.ReplaceAll(urlPath, "{topicName}", url.PathEscape(topicName)) + if eventSubscriptionName == "" { + return nil, errors.New("parameter eventSubscriptionName cannot be empty") + } + urlPath = strings.ReplaceAll(urlPath, "{eventSubscriptionName}", url.PathEscape(eventSubscriptionName)) + req, err := runtime.NewRequest(ctx, http.MethodPost, runtime.JoinPaths(client.endpoint, urlPath)) + if err != nil { + return nil, err + } + reqQP := req.Raw().URL.Query() + reqQP.Set("api-version", "2023-10-01-preview") + req.Raw().URL.RawQuery = reqQP.Encode() + req.Raw().Header["Accept"] = []string{"application/json"} + if err := runtime.MarshalAsJSON(req, renewLockOptions); err != nil { + return nil, err + } + return req, nil +} + +// renewCloudEventLocksHandleResponse handles the RenewCloudEventLocks response. +func (client *Client) renewCloudEventLocksHandleResponse(resp *http.Response) (RenewCloudEventLocksResponse, error) { + result := RenewCloudEventLocksResponse{} + if err := runtime.UnmarshalAsJSON(resp, &result.RenewCloudEventLocksResult); err != nil { + return RenewCloudEventLocksResponse{}, err + } + return result, nil +} diff --git a/sdk/messaging/eventgrid/aznamespaces/client_custom.go b/sdk/messaging/eventgrid/aznamespaces/client_custom.go new file mode 100644 index 000000000000..7b2f3ab0e785 --- /dev/null +++ b/sdk/messaging/eventgrid/aznamespaces/client_custom.go @@ -0,0 +1,102 @@ +//go:build go1.18 +// +build go1.18 + +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package aznamespaces + +import ( + "context" + + "github.com/Azure/azure-sdk-for-go/sdk/azcore" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/policy" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime" + "github.com/Azure/azure-sdk-for-go/sdk/messaging/eventgrid/aznamespaces/internal" +) + +// ClientOptions contains optional settings for [Client] +type ClientOptions struct { + azcore.ClientOptions +} + +// NewClientWithSharedKeyCredential creates a [Client] using a shared key. +func NewClientWithSharedKeyCredential(endpoint string, keyCred *azcore.KeyCredential, options *ClientOptions) (*Client, error) { + if options == nil { + options = &ClientOptions{} + } + + azc, err := azcore.NewClient(internal.ModuleName+".Client", internal.ModuleVersion, runtime.PipelineOptions{ + PerRetry: []policy.Policy{ + runtime.NewKeyCredentialPolicy(keyCred, "Authorization", &runtime.KeyCredentialPolicyOptions{ + Prefix: "SharedAccessKey ", + }), + }, + }, &options.ClientOptions) + + if err != nil { + return nil, err + } + + return &Client{ + internal: azc, + endpoint: endpoint, + }, nil +} + +// RejectCloudEvents - Reject batch of Cloud Events. The server responds with an HTTP 200 status code if the request is successfully +// accepted. The response body will include the set of successfully rejected lockTokens, +// along with other failed lockTokens with their corresponding error information. +// If the operation fails it returns an *azcore.ResponseError type. +// +// Generated from API version 2023-10-01-preview +// - topicName - Topic Name. +// - eventSubscriptionName - Event Subscription Name. +// - lockTokens - slice of lock tokens. +// - options - RejectCloudEventsOptions contains the optional parameters for the Client.RejectCloudEvents method. +func (client *Client) RejectCloudEvents(ctx context.Context, topicName string, eventSubscriptionName string, lockTokens []string, options *RejectCloudEventsOptions) (RejectCloudEventsResponse, error) { + return client.internalRejectCloudEvents(ctx, topicName, eventSubscriptionName, rejectOptions{LockTokens: lockTokens}, options) +} + +// AcknowledgeCloudEvents - Acknowledge batch of Cloud Events. The server responds with an HTTP 200 status code if the request +// is successfully accepted. The response body will include the set of successfully acknowledged +// lockTokens, along with other failed lockTokens with their corresponding error information. Successfully acknowledged events +// will no longer be available to any consumer. +// If the operation fails it returns an *azcore.ResponseError type. +// +// Generated from API version 2023-10-01-preview +// - topicName - Topic Name. +// - eventSubscriptionName - Event Subscription Name. +// - lockTokens - slice of lock tokens. +// - options - AcknowledgeCloudEventsOptions contains the optional parameters for the Client.AcknowledgeCloudEvents method. +func (client *Client) AcknowledgeCloudEvents(ctx context.Context, topicName string, eventSubscriptionName string, lockTokens []string, options *AcknowledgeCloudEventsOptions) (AcknowledgeCloudEventsResponse, error) { + return client.internalAcknowledgeCloudEvents(ctx, topicName, eventSubscriptionName, acknowledgeOptions{LockTokens: lockTokens}, options) +} + +// ReleaseCloudEvents - Release batch of Cloud Events. The server responds with an HTTP 200 status code if the request is +// successfully accepted. The response body will include the set of successfully released lockTokens, +// along with other failed lockTokens with their corresponding error information. +// If the operation fails it returns an *azcore.ResponseError type. +// +// Generated from API version 2023-10-01-preview +// - topicName - Topic Name. +// - eventSubscriptionName - Event Subscription Name. +// - lockTokens - slice of lock tokens. +// - options - ReleaseCloudEventsOptions contains the optional parameters for the Client.ReleaseCloudEvents method. +func (client *Client) ReleaseCloudEvents(ctx context.Context, topicName string, eventSubscriptionName string, lockTokens []string, options *ReleaseCloudEventsOptions) (ReleaseCloudEventsResponse, error) { + return client.internalReleaseCloudEvents(ctx, topicName, eventSubscriptionName, releaseOptions{LockTokens: lockTokens}, options) +} + +// RenewCloudEventLocks - Renew lock for batch of Cloud Events. The server responds with an HTTP 200 status code if the request +// is successfully accepted. The response body will include the set of successfully renewed +// lockTokens, along with other failed lockTokens with their corresponding error information. +// If the operation fails it returns an *azcore.ResponseError type. +// +// Generated from API version 2023-10-01-preview +// - topicName - Topic Name. +// - eventSubscriptionName - Event Subscription Name. +// - lockTokens - slice of lock tokens. +// - options - RenewCloudEventLocksOptions contains the optional parameters for the Client.RenewCloudEventLocks method. +func (client *Client) RenewCloudEventLocks(ctx context.Context, topicName string, eventSubscriptionName string, lockTokens []string, options *RenewCloudEventLocksOptions) (RenewCloudEventLocksResponse, error) { + return client.internalRenewCloudEventLocks(ctx, topicName, eventSubscriptionName, renewLockOptions{LockTokens: lockTokens}, options) +} diff --git a/sdk/messaging/eventgrid/aznamespaces/client_test.go b/sdk/messaging/eventgrid/aznamespaces/client_test.go new file mode 100644 index 000000000000..530c9e113cd0 --- /dev/null +++ b/sdk/messaging/eventgrid/aznamespaces/client_test.go @@ -0,0 +1,407 @@ +//go:build go1.18 +// +build go1.18 + +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package aznamespaces_test + +import ( + "context" + "net/http" + "testing" + "time" + + "github.com/Azure/azure-sdk-for-go/sdk/azcore" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/messaging" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" + "github.com/Azure/azure-sdk-for-go/sdk/internal/recording" + "github.com/Azure/azure-sdk-for-go/sdk/messaging/eventgrid/aznamespaces" + "github.com/stretchr/testify/require" +) + +func TestFailedAck(t *testing.T) { + c := newClientWrapper(t, nil) + + ce, err := messaging.NewCloudEvent("hello-source", "world", []byte("ack this one"), &messaging.CloudEventOptions{ + DataContentType: to.Ptr("application/octet-stream"), + }) + require.NoError(t, err) + + _, err = c.PublishCloudEvents(context.Background(), c.TestVars.Topic, []messaging.CloudEvent{ce}, nil) + require.NoError(t, err) + + recvResp, err := c.ReceiveCloudEvents(context.Background(), c.TestVars.Topic, c.TestVars.Subscription, &aznamespaces.ReceiveCloudEventsOptions{ + MaxEvents: to.Ptr[int32](1), + MaxWaitTime: to.Ptr[int32](10), + }) + require.NoError(t, err) + + ackResp, err := c.AcknowledgeCloudEvents(context.Background(), c.TestVars.Topic, c.TestVars.Subscription, []string{*recvResp.Value[0].BrokerProperties.LockToken}, nil) + require.NoError(t, err) + require.Empty(t, ackResp.FailedLockTokens) + require.Equal(t, []string{*recvResp.Value[0].BrokerProperties.LockToken}, ackResp.SucceededLockTokens) + + // now let's try to do stuff with an "out of date" token + t.Run("AcknowledgeCloudEvents", func(t *testing.T) { + resp, err := c.AcknowledgeCloudEvents(context.Background(), c.TestVars.Topic, c.TestVars.Subscription, []string{*recvResp.Value[0].BrokerProperties.LockToken}, nil) + require.NoError(t, err) + require.Empty(t, resp.SucceededLockTokens) + requireFailedLockTokens(t, []string{*recvResp.Value[0].BrokerProperties.LockToken}, resp.FailedLockTokens) + }) + + t.Run("RejectCloudEvents", func(t *testing.T) { + resp, err := c.RejectCloudEvents(context.Background(), c.TestVars.Topic, c.TestVars.Subscription, []string{*recvResp.Value[0].BrokerProperties.LockToken}, nil) + require.NoError(t, err) + require.Empty(t, resp.SucceededLockTokens) + requireFailedLockTokens(t, []string{*recvResp.Value[0].BrokerProperties.LockToken}, resp.FailedLockTokens) + }) + + t.Run("ReleaseCloudEvents", func(t *testing.T) { + resp, err := c.ReleaseCloudEvents(context.Background(), c.TestVars.Topic, c.TestVars.Subscription, []string{*recvResp.Value[0].BrokerProperties.LockToken}, nil) + require.NoError(t, err) + require.Empty(t, resp.SucceededLockTokens) + requireFailedLockTokens(t, []string{*recvResp.Value[0].BrokerProperties.LockToken}, resp.FailedLockTokens) + }) +} + +func TestPartialAckFailure(t *testing.T) { + c := newClientWrapper(t, nil) + + ce, err := messaging.NewCloudEvent("hello-source", "world", []byte("event one"), &messaging.CloudEventOptions{ + DataContentType: to.Ptr("application/octet-stream"), + }) + require.NoError(t, err) + + ce2, err := messaging.NewCloudEvent("hello-source", "world", []byte("event two"), &messaging.CloudEventOptions{ + DataContentType: to.Ptr("application/octet-stream"), + }) + require.NoError(t, err) + + _, err = c.PublishCloudEvents(context.Background(), c.TestVars.Topic, []messaging.CloudEvent{ce, ce2}, nil) + require.NoError(t, err) + + events, err := c.ReceiveCloudEvents(context.Background(), c.TestVars.Topic, c.TestVars.Subscription, &aznamespaces.ReceiveCloudEventsOptions{ + MaxEvents: to.Ptr[int32](2), + }) + require.NoError(t, err) + + // we'll ack one now so we can force a failure to happen. + ackResp, err := c.AcknowledgeCloudEvents(context.Background(), c.TestVars.Topic, c.TestVars.Subscription, []string{*events.Value[0].BrokerProperties.LockToken}, nil) + require.NoError(t, err) + require.Empty(t, ackResp.FailedLockTokens) + + // this will result in a partial failure. + ackResp, err = c.AcknowledgeCloudEvents(context.Background(), c.TestVars.Topic, c.TestVars.Subscription, []string{ + *events.Value[0].BrokerProperties.LockToken, + *events.Value[1].BrokerProperties.LockToken, + }, nil) + require.NoError(t, err) + + requireFailedLockTokens(t, []string{*events.Value[0].BrokerProperties.LockToken}, ackResp.FailedLockTokens) + require.Equal(t, []string{*events.Value[1].BrokerProperties.LockToken}, ackResp.SucceededLockTokens) +} + +func TestReject(t *testing.T) { + c := newClientWrapper(t, nil) + + ce, err := messaging.NewCloudEvent("TestAbandon", "world", []byte("event one"), &messaging.CloudEventOptions{ + DataContentType: to.Ptr("application/octet-stream"), + }) + require.NoError(t, err) + + t.Logf("Publishing cloud events") + _, err = c.PublishCloudEvents(context.Background(), c.TestVars.Topic, []messaging.CloudEvent{ce}, nil) + require.NoError(t, err) + + t.Logf("Receiving cloud events") + events, err := c.ReceiveCloudEvents(context.Background(), c.TestVars.Topic, c.TestVars.Subscription, nil) + require.NoError(t, err) + + requireEqualCloudEvent(t, messaging.CloudEvent{ + SpecVersion: "1.0", + DataContentType: to.Ptr("application/octet-stream"), + Data: []byte("event one"), + Source: "TestAbandon", + Type: "world", + }, events.Value[0].Event) + + require.Equal(t, int32(1), *events.Value[0].BrokerProperties.DeliveryCount, "DeliveryCount starts at 1") + + t.Logf("Rejecting cloud events") + rejectResp, err := c.RejectCloudEvents(context.Background(), c.TestVars.Topic, c.TestVars.Subscription, []string{*events.Value[0].BrokerProperties.LockToken}, nil) + require.NoError(t, err) + require.Empty(t, rejectResp.FailedLockTokens) + t.Logf("Done rejecting cloud events") + + events, err = c.ReceiveCloudEvents(context.Background(), c.TestVars.Topic, c.TestVars.Subscription, &aznamespaces.ReceiveCloudEventsOptions{ + MaxEvents: to.Ptr[int32](1), + MaxWaitTime: to.Ptr[int32](10), + }) + require.NoError(t, err) + require.Empty(t, events.Value) +} + +func TestRelease(t *testing.T) { + c := newClientWrapper(t, nil) + + ce, err := messaging.NewCloudEvent("TestAbandon", "world", []byte("event one"), &messaging.CloudEventOptions{ + DataContentType: to.Ptr("application/octet-stream"), + }) + require.NoError(t, err) + + _, err = c.PublishCloudEvents(context.Background(), c.TestVars.Topic, []messaging.CloudEvent{ce}, nil) + require.NoError(t, err) + + events, err := c.ReceiveCloudEvents(context.Background(), c.TestVars.Topic, c.TestVars.Subscription, nil) + require.NoError(t, err) + + requireEqualCloudEvent(t, ce, events.Value[0].Event) + + require.Equal(t, int32(1), *events.Value[0].BrokerProperties.DeliveryCount, "DeliveryCount starts at 1") + + rejectResp, err := c.ReleaseCloudEvents(context.Background(), c.TestVars.Topic, c.TestVars.Subscription, []string{*events.Value[0].BrokerProperties.LockToken}, nil) + require.NoError(t, err) + + if len(rejectResp.FailedLockTokens) > 0 { + for _, flt := range rejectResp.FailedLockTokens { + t.Logf("FailedLockToken:\n ec: %s\n desc: %s\n locktoken:%s", *flt.Error.Code, flt.Error.Error(), *flt.LockToken) + } + require.Fail(t, "Failed to release events") + } + + require.Empty(t, rejectResp.FailedLockTokens) + + events, err = c.ReceiveCloudEvents(context.Background(), c.TestVars.Topic, c.TestVars.Subscription, nil) + require.NoError(t, err) + + require.Equal(t, int32(2), *events.Value[0].BrokerProperties.DeliveryCount, "DeliveryCount is incremented") + ackResp, err := c.AcknowledgeCloudEvents(context.Background(), c.TestVars.Topic, c.TestVars.Subscription, []string{*events.Value[0].BrokerProperties.LockToken}, nil) + require.NoError(t, err) + require.Empty(t, ackResp.FailedLockTokens) +} + +func TestPublishBytes(t *testing.T) { + c := newClientWrapper(t, nil) + + ce, err := messaging.NewCloudEvent("hello-source", "eventType", []byte("TestPublishBytes"), &messaging.CloudEventOptions{ + DataContentType: to.Ptr("application/octet-stream"), + }) + require.NoError(t, err) + + _, err = c.PublishCloudEvent(context.Background(), c.TestVars.Topic, ce, nil) + require.NoError(t, err) + + recvResp, err := c.ReceiveCloudEvents(context.Background(), c.TestVars.Topic, c.TestVars.Subscription, nil) + require.NoError(t, err) + + requireEqualCloudEvent(t, messaging.CloudEvent{ + Source: "hello-source", + SpecVersion: "1.0", + Type: "eventType", + Data: []byte("TestPublishBytes"), + DataContentType: to.Ptr("application/octet-stream"), + }, recvResp.Value[0].Event) +} + +func TestPublishString(t *testing.T) { + c := newClientWrapper(t, nil) + + ce, err := messaging.NewCloudEvent("hello-source", "eventType", "TestPublishString", &messaging.CloudEventOptions{ + DataContentType: to.Ptr("application/json"), + }) + require.NoError(t, err) + + _, err = c.PublishCloudEvent(context.Background(), c.TestVars.Topic, ce, nil) + require.NoError(t, err) + + recvResp, err := c.ReceiveCloudEvents(context.Background(), c.TestVars.Topic, c.TestVars.Subscription, nil) + require.NoError(t, err) + + requireEqualCloudEvent(t, messaging.CloudEvent{ + Source: "hello-source", + SpecVersion: "1.0", + Type: "eventType", + Data: []byte("\"TestPublishString\""), // non []byte returns as the JSON bytes + DataContentType: to.Ptr("application/json"), + }, recvResp.Value[0].Event) +} + +func TestPublishingAndReceivingMultipleCloudEvents(t *testing.T) { + c := newClientWrapper(t, nil) + + testData := []struct { + Send messaging.CloudEvent + Expected messaging.CloudEvent + }{ + { + Send: mustCreateEvent(t, "hello-source", "eventType", []byte("TestPublishingAndReceivingMultipleCloudEvents 1"), &messaging.CloudEventOptions{ + DataContentType: to.Ptr("application/octet-stream"), + }), + Expected: messaging.CloudEvent{ + SpecVersion: "1.0", + Source: "hello-source", + Type: "eventType", + DataContentType: to.Ptr("application/octet-stream"), + Data: []byte("TestPublishingAndReceivingMultipleCloudEvents 1"), + }, + }, + { + Send: mustCreateEvent(t, "hello-source", "eventType", "TestPublishingAndReceivingMultipleCloudEvents 2", &messaging.CloudEventOptions{ + DataContentType: to.Ptr("application/json"), + DataSchema: to.Ptr("https://dataschema"), + Extensions: map[string]any{ + "extension1": "extension1value", + }, + Subject: to.Ptr("subject"), + }), + Expected: messaging.CloudEvent{ + SpecVersion: "1.0", + Source: "hello-source", + Type: "eventType", + DataSchema: to.Ptr("https://dataschema"), + Data: []byte("\"TestPublishingAndReceivingMultipleCloudEvents 2\""), + DataContentType: to.Ptr("application/json"), + Subject: to.Ptr("subject"), + Extensions: map[string]any{ + "extension1": "extension1value", + }, + }, + }, + } + + var batch []messaging.CloudEvent + + for _, td := range testData { + batch = append(batch, td.Send) + } + + // type simpleType struct { + // Name string + // } + + // ce3, err := messaging.NewCloudEvent("hello-source", "eventType", simpleType{Name: "simple type name"}, &messaging.CloudEventOptions{ + // DataContentType: to.Ptr("application/octet-stream"), + // }) + // require.NoError(t, err) + // toSend = append(toSend, ce3) + + // _, err := c.PublishCloudEvents(context.Background(), c.TestVars.Topic, batch, nil) + // require.NoError(t, err) + + t.Logf("\n\n\n=====> starting our test, publishing\n\n\n") + + // _, err := c.PublishCloudEvent(context.Background(), c.TestVars.Topic, batch[0], nil) + // require.NoError(t, err) + + _, err := c.PublishCloudEvents(context.Background(), c.TestVars.Topic, batch, nil) + require.NoError(t, err) + + t.Logf("\n\n\n=====> starting our test, receiving\n\n\n") + + resp, err := c.ReceiveCloudEvents(context.Background(), c.TestVars.Topic, c.TestVars.Subscription, &aznamespaces.ReceiveCloudEventsOptions{ + MaxEvents: to.Ptr(int32(len(batch))), + MaxWaitTime: to.Ptr(int32(60)), + }) + require.NoError(t, err) + require.NotEmpty(t, resp.Value) + + for i := 0; i < len(batch); i++ { + _, err := c.AcknowledgeCloudEvents(context.Background(), c.TestVars.Topic, c.TestVars.Subscription, []string{*resp.Value[i].BrokerProperties.LockToken}, nil) + require.NoError(t, err) + + requireEqualCloudEvent(t, testData[i].Expected, resp.Value[i].Event) + } + + // bytes, err := json.Marshal(simpleType{Name: "simple type name"}) + // require.NoError(t, err) + + // requireEqualCloudEvent(t, messaging.CloudEvent{ + // SpecVersion: "1.0", + // Source: "hello-source", + // Type: "eventType", + // Data: []byte(bytes), + // }, resp.Value[2].Event) +} + +func TestSimpleErrors(t *testing.T) { + c := newClientWrapper(t, nil) + + _, err := c.PublishCloudEvents(context.Background(), c.TestVars.Topic, []messaging.CloudEvent{ + {}, + }, nil) + var respErr *azcore.ResponseError + + require.ErrorAs(t, err, &respErr) + require.Equal(t, http.StatusBadRequest, respErr.StatusCode) + require.Contains(t, respErr.Error(), "'data' attribute is required") +} + +func TestRenewCloudEventLocks(t *testing.T) { + c := newClientWrapper(t, nil) + + ce := mustCreateEvent(t, "source", "eventType", "hello world", nil) + _, err := c.Client.PublishCloudEvent(context.Background(), c.TestVars.Topic, ce, nil) + require.NoError(t, err) + + recvResp, err := c.Client.ReceiveCloudEvents(context.Background(), c.TestVars.Topic, c.TestVars.Subscription, nil) + require.NoError(t, err) + + _, err = c.Client.RenewCloudEventLocks(context.Background(), c.TestVars.Topic, c.TestVars.Subscription, []string{*recvResp.Value[0].BrokerProperties.LockToken}, nil) + require.NoError(t, err) + + ackResp, err := c.Client.AcknowledgeCloudEvents(context.Background(), c.TestVars.Topic, c.TestVars.Subscription, []string{*recvResp.Value[0].BrokerProperties.LockToken}, nil) + require.NoError(t, err) + require.Empty(t, ackResp.FailedLockTokens) +} + +func TestReleaseWithDelay(t *testing.T) { + c := newClientWrapper(t, nil) + + ce := mustCreateEvent(t, "source", "eventType", "hello world", nil) + _, err := c.Client.PublishCloudEvent(context.Background(), c.TestVars.Topic, ce, nil) + require.NoError(t, err) + + recvResp, err := c.Client.ReceiveCloudEvents(context.Background(), c.TestVars.Topic, c.TestVars.Subscription, nil) + require.NoError(t, err) + + releaseResp, err := c.Client.ReleaseCloudEvents(context.Background(), c.TestVars.Topic, c.TestVars.Subscription, []string{*recvResp.Value[0].BrokerProperties.LockToken}, &aznamespaces.ReleaseCloudEventsOptions{ + ReleaseDelayInSeconds: to.Ptr(aznamespaces.ReleaseDelayBy10Seconds), + }) + require.NoError(t, err) + require.Empty(t, releaseResp.FailedLockTokens) + + now := time.Now() + + // message will be available, but not immediately. + recvResp, err = c.Client.ReceiveCloudEvents(context.Background(), c.TestVars.Topic, c.TestVars.Subscription, nil) + require.NoError(t, err) + require.NotEmpty(t, recvResp.Value) + require.Equal(t, int32(2), *recvResp.Value[0].BrokerProperties.DeliveryCount) + + if recording.GetRecordMode() == recording.LiveMode { + // doesn't work when recording but it's somewhat unimportant there. + elapsed := time.Since(now) + require.GreaterOrEqual(t, int(elapsed/time.Second), 8) // give a little wiggle room for potential delays between requests, etc... + } + + ackResp, err := c.Client.AcknowledgeCloudEvents(context.Background(), c.TestVars.Topic, c.TestVars.Subscription, []string{*recvResp.Value[0].BrokerProperties.LockToken}, nil) + require.NoError(t, err) + require.Empty(t, ackResp.FailedLockTokens) +} + +func mustCreateEvent(t *testing.T, source string, eventType string, data any, options *messaging.CloudEventOptions) messaging.CloudEvent { + event, err := messaging.NewCloudEvent(source, eventType, data, options) + require.NoError(t, err) + + return event +} + +func requireFailedLockTokens(t *testing.T, lockTokens []string, flts []aznamespaces.FailedLockToken) { + for i, flt := range flts { + // make sure the lock tokens line up + require.Equal(t, lockTokens[i], *flt.LockToken) + require.Equal(t, flt.Error.Code, to.Ptr("TokenLost")) + require.EqualError(t, flt.Error, "Token has expired.") + } +} diff --git a/sdk/messaging/eventgrid/aznamespaces/constants.go b/sdk/messaging/eventgrid/aznamespaces/constants.go new file mode 100644 index 000000000000..a48e56d0ec4f --- /dev/null +++ b/sdk/messaging/eventgrid/aznamespaces/constants.go @@ -0,0 +1,36 @@ +//go:build go1.18 +// +build go1.18 + +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. +// Code generated by Microsoft (R) AutoRest Code Generator. DO NOT EDIT. +// Changes may cause incorrect behavior and will be lost if the code is regenerated. + +package aznamespaces + +// ReleaseDelay indicates how long the service should delay before releasing an event. +type ReleaseDelay int32 + +const ( + // ReleaseDelayBy0Seconds - Release the event after 0 seconds. + ReleaseDelayBy0Seconds ReleaseDelay = 0 + // ReleaseDelayBy10Seconds - Release the event after 10 seconds. + ReleaseDelayBy10Seconds ReleaseDelay = 10 + // ReleaseDelayBy3600Seconds - Release the event after 3600 seconds. + ReleaseDelayBy3600Seconds ReleaseDelay = 3600 + // ReleaseDelayBy600Seconds - Release the event after 600 seconds. + ReleaseDelayBy600Seconds ReleaseDelay = 600 + // ReleaseDelayBy60Seconds - Release the event after 60 seconds. + ReleaseDelayBy60Seconds ReleaseDelay = 60 +) + +// PossibleReleaseDelayValues returns the possible values for the ReleaseDelay const type. +func PossibleReleaseDelayValues() []ReleaseDelay { + return []ReleaseDelay{ + ReleaseDelayBy0Seconds, + ReleaseDelayBy10Seconds, + ReleaseDelayBy3600Seconds, + ReleaseDelayBy600Seconds, + ReleaseDelayBy60Seconds, + } +} diff --git a/sdk/messaging/eventgrid/aznamespaces/custom_models.go b/sdk/messaging/eventgrid/aznamespaces/custom_models.go new file mode 100644 index 000000000000..2165c147790b --- /dev/null +++ b/sdk/messaging/eventgrid/aznamespaces/custom_models.go @@ -0,0 +1,17 @@ +//go:build go1.18 +// +build go1.18 + +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package aznamespaces + +// Error implements the error interface for type Error. +// Note that the message contents are not contractual and can change over time. +func (e *Error) Error() string { + if e.message == nil { + return "" + } + + return *e.message +} diff --git a/sdk/messaging/eventgrid/aznamespaces/example_publish_and_receive_test.go b/sdk/messaging/eventgrid/aznamespaces/example_publish_and_receive_test.go new file mode 100644 index 000000000000..b13d413fb26f --- /dev/null +++ b/sdk/messaging/eventgrid/aznamespaces/example_publish_and_receive_test.go @@ -0,0 +1,165 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package aznamespaces_test + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "os" + + "github.com/Azure/azure-sdk-for-go/sdk/azcore" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/messaging" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" + "github.com/Azure/azure-sdk-for-go/sdk/messaging/eventgrid/aznamespaces" +) + +func Example_publishAndReceiveCloudEvents() { + endpoint := os.Getenv("EVENTGRID_ENDPOINT") + key := os.Getenv("EVENTGRID_KEY") + topicName := os.Getenv("EVENTGRID_TOPIC") + subscriptionName := os.Getenv("EVENTGRID_SUBSCRIPTION") + + if endpoint == "" || key == "" || topicName == "" || subscriptionName == "" { + return + } + + client, err := aznamespaces.NewClientWithSharedKeyCredential(endpoint, azcore.NewKeyCredential(key), nil) + + if err != nil { + panic(err) + } + + // + // Publish an event with a string payload + // + fmt.Fprintf(os.Stderr, "Published event with a string payload 'hello world'\n") + eventWithString, err := publishAndReceiveEvent(client, topicName, subscriptionName, "application/json", "hello world") + + if err != nil { + panic(err) + } + + fmt.Fprintf(os.Stderr, "Received an event with a string payload\n") + fmt.Fprintf(os.Stderr, "ID: %s\n", eventWithString.Event.ID) + + var str *string + + if err := json.Unmarshal(eventWithString.Event.Data.([]byte), &str); err != nil { + panic(err) + } + + fmt.Fprintf(os.Stderr, " Body: %s\n", *str) // prints 'Body: hello world' + fmt.Fprintf(os.Stderr, " Delivery count: %d\n", eventWithString.BrokerProperties.DeliveryCount) + + // + // Publish an event with a []byte payload + // + eventWithBytes, err := publishAndReceiveEvent(client, topicName, subscriptionName, "application/octet-stream", []byte{0, 1, 2}) + + if err != nil { + panic(err) + } + + fmt.Fprintf(os.Stderr, "ID: %s\n", eventWithBytes.Event.ID) + fmt.Fprintf(os.Stderr, " Body: %#v\n", eventWithBytes.Event.Data.([]byte)) // prints 'Body: []byte{0x0, 0x1, 0x2}' + fmt.Fprintf(os.Stderr, " Delivery count: %d\n", eventWithBytes.BrokerProperties.DeliveryCount) + + // + // Publish an event with a struct as the payload + // + type SampleData struct { + Name string `json:"name"` + } + + eventWithStruct, err := publishAndReceiveEvent(client, topicName, subscriptionName, "application/json", SampleData{Name: "hello"}) + + if err != nil { + panic(err) + } + + var sampleData *SampleData + if err := json.Unmarshal(eventWithStruct.Event.Data.([]byte), &sampleData); err != nil { + panic(err) + } + + fmt.Fprintf(os.Stderr, "ID: %s\n", eventWithStruct.Event.ID) + fmt.Fprintf(os.Stderr, " Body: %#v\n", sampleData) // prints 'Body: &azeventgrid_test.SampleData{Name:"hello"}' + fmt.Fprintf(os.Stderr, " Delivery count: %d\n", eventWithStruct.BrokerProperties.DeliveryCount) + + // Output: +} + +func publishAndReceiveEvent(client *aznamespaces.Client, topicName string, subscriptionName string, dataContentType string, payload any) (aznamespaces.ReceiveDetails, error) { + event, err := messaging.NewCloudEvent("source", "eventType", payload, &messaging.CloudEventOptions{ + DataContentType: &dataContentType, + }) + + if err != nil { + return aznamespaces.ReceiveDetails{}, err + } + + eventsToSend := []messaging.CloudEvent{ + event, + } + + // NOTE: we're sending a single event as an example. For better efficiency it's best if you send + // multiple events at a time. + _, err = client.PublishCloudEvents(context.TODO(), topicName, eventsToSend, nil) + + if err != nil { + return aznamespaces.ReceiveDetails{}, err + } + + events, err := client.ReceiveCloudEvents(context.TODO(), topicName, subscriptionName, &aznamespaces.ReceiveCloudEventsOptions{ + MaxEvents: to.Ptr(int32(1)), + + // Wait for 60 seconds for events. + MaxWaitTime: to.Ptr[int32](60), + }) + + if err != nil { + return aznamespaces.ReceiveDetails{}, err + } + + if len(events.Value) == 0 { + return aznamespaces.ReceiveDetails{}, errors.New("no events received") + } + + // We can (optionally) renew the lock (multiple times) if we want to continue to + // extend the lock time on the event. + _, err = client.RenewCloudEventLocks(context.TODO(), topicName, subscriptionName, []string{ + *events.Value[0].BrokerProperties.LockToken, + }, nil) + + if err != nil { + return aznamespaces.ReceiveDetails{}, err + } + + // This acknowledges the event and causes it to be deleted from the subscription. + // Other options are: + // - client.ReleaseCloudEvents, which invalidates our event lock and allows another subscriber to receive the event. + // - client.RejectCloudEvents, which rejects the event. + // If dead-lettering is configured, the event will be moved into the dead letter queue. + // Otherwise the event is deleted. + ackResp, err := client.AcknowledgeCloudEvents(context.TODO(), topicName, subscriptionName, []string{ + *events.Value[0].BrokerProperties.LockToken, + }, nil) + + if err != nil { + return aznamespaces.ReceiveDetails{}, err + } + + if len(ackResp.FailedLockTokens) > 0 { + // some events failed when we tried to acknowledge them. + for _, failed := range ackResp.FailedLockTokens { + fmt.Printf("Failed to acknowledge event with lock token %s: %s\n", *failed.LockToken, failed.Error) + } + + return aznamespaces.ReceiveDetails{}, errors.New("failed to acknowledge event") + } + + return events.Value[0], nil +} diff --git a/sdk/messaging/eventgrid/aznamespaces/example_test.go b/sdk/messaging/eventgrid/aznamespaces/example_test.go new file mode 100644 index 000000000000..5bca3c151047 --- /dev/null +++ b/sdk/messaging/eventgrid/aznamespaces/example_test.go @@ -0,0 +1,143 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package aznamespaces_test + +import ( + "context" + "fmt" + "log" + "os" + + "github.com/Azure/azure-sdk-for-go/sdk/azcore" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/messaging" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" + "github.com/Azure/azure-sdk-for-go/sdk/messaging/eventgrid/aznamespaces" +) + +func ExampleNewClientWithSharedKeyCredential() { + endpoint := os.Getenv("EVENTGRID_ENDPOINT") + sharedKey := os.Getenv("EVENTGRID_KEY") + + if endpoint == "" || sharedKey == "" { + return + } + + client, err := aznamespaces.NewClientWithSharedKeyCredential(endpoint, azcore.NewKeyCredential(sharedKey), nil) + + if err != nil { + // TODO: Update the following line with your application specific error handling logic + log.Fatalf("ERROR: %s", err) + } + + _ = client // ignore + + // Output: +} + +func ExampleClient_PublishCloudEvents() { + client := getEventGridClient() + + if client == nil { + return + } + + topic := os.Getenv("EVENTGRID_TOPIC") + + // CloudEvent is in github.com/Azure/azure-sdk-for-go/azcore/messaging and can be + // used to transport + + // you can send a variety of different payloads, all of which can be encoded by messaging.CloudEvent + var payloads = []any{ + []byte{1, 2, 3}, + "hello world", + struct{ Value string }{Value: "hello world"}, + } + + var eventsToSend []messaging.CloudEvent + + for _, payload := range payloads { + event, err := messaging.NewCloudEvent("source", "eventType", payload, &messaging.CloudEventOptions{ + DataContentType: to.Ptr("application/octet-stream"), + }) + + if err != nil { + // TODO: Update the following line with your application specific error handling logic + log.Fatalf("ERROR: %s", err) + } + + eventsToSend = append(eventsToSend, event) + } + + _, err := client.PublishCloudEvents(context.TODO(), topic, eventsToSend, nil) + + if err != nil { + // TODO: Update the following line with your application specific error handling logic + log.Fatalf("ERROR: %s", err) + } + + // Output: +} + +func ExampleClient_ReceiveCloudEvents() { + client := getEventGridClient() + + if client == nil { + return + } + + topic := os.Getenv("EVENTGRID_TOPIC") + subscription := os.Getenv("EVENTGRID_SUBSCRIPTION") + + resp, err := client.ReceiveCloudEvents(context.TODO(), topic, subscription, &aznamespaces.ReceiveCloudEventsOptions{ + MaxEvents: to.Ptr[int32](1), + MaxWaitTime: to.Ptr[int32](10), // in seconds + }) + + if err != nil { + // TODO: Update the following line with your application specific error handling logic + log.Fatalf("ERROR: %s", err) + } + + for _, rd := range resp.Value { + lockToken := rd.BrokerProperties.LockToken + + // NOTE: See the documentation for CloudEvent.Data on how your data + // is deserialized. + data := rd.Event.Data + + fmt.Fprintf(os.Stderr, "Event ID:%s, data: %#v, lockToken: %s\n", rd.Event.ID, data, *lockToken) + + // This will complete the message, deleting it from the subscription. + resp, err := client.AcknowledgeCloudEvents(context.TODO(), topic, subscription, []string{*lockToken}, nil) + + if err != nil { + // TODO: Update the following line with your application specific error handling logic + log.Fatalf("ERROR: %s", err) + } + + if len(resp.FailedLockTokens) > 0 { + log.Fatalf("ERROR: %d events were not acknowledged", len(resp.FailedLockTokens)) + } + } + + // Output: +} + +func getEventGridClient() *aznamespaces.Client { + endpoint := os.Getenv("EVENTGRID_ENDPOINT") + sharedKey := os.Getenv("EVENTGRID_KEY") + + if endpoint == "" || sharedKey == "" { + return nil + } + + client, err := aznamespaces.NewClientWithSharedKeyCredential(endpoint, azcore.NewKeyCredential(sharedKey), nil) + + if err != nil { + // TODO: Update the following line with your application specific error handling logic + log.Fatalf("ERROR: %s", err) + } + + return client +} diff --git a/sdk/messaging/eventgrid/aznamespaces/go.mod b/sdk/messaging/eventgrid/aznamespaces/go.mod new file mode 100644 index 000000000000..23634dbbf085 --- /dev/null +++ b/sdk/messaging/eventgrid/aznamespaces/go.mod @@ -0,0 +1,29 @@ +module github.com/Azure/azure-sdk-for-go/sdk/messaging/eventgrid/aznamespaces + +go 1.18 + +require ( + github.com/Azure/azure-sdk-for-go/sdk/azcore v1.9.2 + github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.5.1 + github.com/Azure/azure-sdk-for-go/sdk/internal v1.5.2 + github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventgrid/armeventgrid/v2 v2.2.0-beta.1 + github.com/joho/godotenv v1.5.1 + github.com/stretchr/testify v1.8.4 +) + +require ( + github.com/AzureAD/microsoft-authentication-library-for-go v1.2.1 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/dnaeon/go-vcr v1.2.0 // indirect + github.com/golang-jwt/jwt/v5 v5.2.0 // indirect + github.com/google/uuid v1.5.0 // indirect + github.com/kylelemons/godebug v1.1.0 // indirect + github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + golang.org/x/crypto v0.18.0 // indirect + golang.org/x/net v0.20.0 // indirect + golang.org/x/sys v0.16.0 // indirect + golang.org/x/text v0.14.0 // indirect + gopkg.in/yaml.v2 v2.4.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/sdk/messaging/eventgrid/aznamespaces/go.sum b/sdk/messaging/eventgrid/aznamespaces/go.sum new file mode 100644 index 000000000000..2705e1afacd5 --- /dev/null +++ b/sdk/messaging/eventgrid/aznamespaces/go.sum @@ -0,0 +1,47 @@ +github.com/Azure/azure-sdk-for-go/sdk/azcore v1.9.2 h1:c4k2FIYIh4xtwqrQwV0Ct1v5+ehlNXj5NI/MWVsiTkQ= +github.com/Azure/azure-sdk-for-go/sdk/azcore v1.9.2/go.mod h1:5FDJtLEO/GxwNgUxbwrY3LP0pEoThTQJtk2oysdXHxM= +github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.5.1 h1:sO0/P7g68FrryJzljemN+6GTssUXdANk6aJ7T1ZxnsQ= +github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.5.1/go.mod h1:h8hyGFDsU5HMivxiS2iYFZsgDbU9OnnJ163x5UGVKYo= +github.com/Azure/azure-sdk-for-go/sdk/internal v1.5.2 h1:LqbJ/WzJUwBf8UiaSzgX7aMclParm9/5Vgp+TY51uBQ= +github.com/Azure/azure-sdk-for-go/sdk/internal v1.5.2/go.mod h1:yInRyqWXAuaPrgI7p70+lDDgh3mlBohis29jGMISnmc= +github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventgrid/armeventgrid/v2 v2.2.0-beta.1 h1:FgTgykkGbJGoc5qnADIM84YAXLdqKNmnQHws085Rovo= +github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventgrid/armeventgrid/v2 v2.2.0-beta.1/go.mod h1:ZHJdpjiGjZBBILAyAUTP93YSLF/Foo1J72HSx30gMeQ= +github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/internal v1.1.2 h1:mLY+pNLjCUeKhgnAJWAKhEUQM+RJQo2H1fuGSw1Ky1E= +github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armresources v1.0.0 h1:ECsQtyERDVz3NP3kvDOTLvbQhqWp/x9EsGKtb4ogUr8= +github.com/AzureAD/microsoft-authentication-library-for-go v1.2.1 h1:DzHpqpoJVaCgOUdVHxE8QB52S6NiVdDQvGlny1qvPqA= +github.com/AzureAD/microsoft-authentication-library-for-go v1.2.1/go.mod h1:wP83P5OoQ5p6ip3ScPr0BAq0BvuPAvacpEuSzyouqAI= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dnaeon/go-vcr v1.2.0 h1:zHCHvJYTMh1N7xnV7zf1m1GPBF9Ad0Jk/whtQ1663qI= +github.com/dnaeon/go-vcr v1.2.0/go.mod h1:R4UdLID7HZT3taECzJs4YgbbH6PIGXB6W/sc5OLb6RQ= +github.com/golang-jwt/jwt/v5 v5.2.0 h1:d/ix8ftRUorsN+5eMIlF4T6J8CAt9rch3My2winC1Jw= +github.com/golang-jwt/jwt/v5 v5.2.0/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= +github.com/google/uuid v1.5.0 h1:1p67kYwdtXjb0gL0BPiP1Av9wiZPo5A8z2cWkTZ+eyU= +github.com/google/uuid v1.5.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= +github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= +github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= +github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= +github.com/modocache/gover v0.0.0-20171022184752-b58185e213c5/go.mod h1:caMODM3PzxT8aQXRPkAt8xlV/e7d7w8GM5g0fa5F0D8= +github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c h1:+mdjkGKdHQG3305AYmdv1U2eRNDiU2ErMBj1gwrq8eQ= +github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c/go.mod h1:7rwL4CYBLnjLxUqIJNnCWiEdr3bn6IUYi15bNlnbCCU= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +golang.org/x/crypto v0.18.0 h1:PGVlW0xEltQnzFZ55hkuX5+KLyrMYhHld1YHO4AKcdc= +golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg= +golang.org/x/net v0.20.0 h1:aCL9BSgETF1k+blQaYUBx9hJ9LOGP3gAVemcZlf1Kpo= +golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY= +golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU= +golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/sdk/messaging/eventgrid/aznamespaces/internal/version.go b/sdk/messaging/eventgrid/aznamespaces/internal/version.go new file mode 100644 index 000000000000..36545926402d --- /dev/null +++ b/sdk/messaging/eventgrid/aznamespaces/internal/version.go @@ -0,0 +1,18 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. +//go:build go1.18 +// +build go1.18 + +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package internal + +// Constants to identify the module +const ( + // ModuleName is the module name that shows in telemetry. + ModuleName = "aznamespaces" + + // ModuleVersion is the semantic version (see http://semver.org) of this module. + ModuleVersion = "v0.4.1" +) diff --git a/sdk/messaging/eventgrid/aznamespaces/main_test.go b/sdk/messaging/eventgrid/aznamespaces/main_test.go new file mode 100644 index 000000000000..c3c4c5afa450 --- /dev/null +++ b/sdk/messaging/eventgrid/aznamespaces/main_test.go @@ -0,0 +1,145 @@ +//go:build go1.18 +// +build go1.18 + +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package aznamespaces_test + +import ( + "context" + "fmt" + "log" + "net/url" + "os" + "strings" + "testing" + "time" + + "github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" + "github.com/Azure/azure-sdk-for-go/sdk/azidentity" + "github.com/Azure/azure-sdk-for-go/sdk/internal/recording" + "github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventgrid/armeventgrid/v2" + "github.com/joho/godotenv" +) + +const recordingDirectory = "sdk/messaging/azeventgrid/testdata" + +func TestMain(m *testing.M) { + code := run(m) + os.Exit(code) +} + +func run(m *testing.M) int { + if recording.GetRecordMode() == recording.PlaybackMode || recording.GetRecordMode() == recording.RecordingMode { + proxy, err := recording.StartTestProxy(recordingDirectory, nil) + if err != nil { + panic(err) + } + + defer func() { + err := recording.StopTestProxy(proxy) + if err != nil { + panic(err) + } + }() + } + if err := godotenv.Load(); err != nil { + log.Printf("Failed to load .env file, no integration tests will run: %s", err) + } + + cleanup := createTopicAndUpdateEnv() + defer cleanup() + + return m.Run() +} + +func PollUntilDone[T any](ctx context.Context, fn func() (*runtime.Poller[T], error)) error { + poller, err := fn() + + if err != nil { + return err + } + + _, err = poller.PollUntilDone(ctx, nil) + return err +} + +func createTopicAndUpdateEnv() func() { + azSubID := os.Getenv("AZEVENTGRID_SUBSCRIPTION_ID") + resGroup := os.Getenv("AZEVENTGRID_RESOURCE_GROUP") + + if azSubID == "" || resGroup == "" || recording.GetRecordMode() != recording.LiveMode { + // ie, these are unit tests. + return func() {} + } + + nsURL, err := url.Parse(os.Getenv("EVENTGRID_ENDPOINT")) + + if err != nil { + panic(err) + } + + nsHost := strings.Split(nsURL.Host, ".")[0] + + cred, err := azidentity.NewDefaultAzureCredential(nil) + + if err != nil { + panic(err) + } + + topicClient, err := armeventgrid.NewNamespaceTopicsClient(azSubID, cred, nil) + + if err != nil { + panic(err) + } + + subClient, err := armeventgrid.NewNamespaceTopicEventSubscriptionsClient(azSubID, cred, nil) + + if err != nil { + panic(err) + } + + topicName := fmt.Sprintf("topic-%d", time.Now().UnixNano()) + + os.Setenv("EVENTGRID_TOPIC", topicName) + + subName := "testsubscription1" + + err = PollUntilDone(context.Background(), func() (*runtime.Poller[armeventgrid.NamespaceTopicsClientCreateOrUpdateResponse], error) { + return topicClient.BeginCreateOrUpdate(context.Background(), resGroup, nsHost, topicName, armeventgrid.NamespaceTopic{}, nil) + }) + + if err != nil { + panic(err) + } + + err = PollUntilDone(context.Background(), func() (*runtime.Poller[armeventgrid.NamespaceTopicEventSubscriptionsClientCreateOrUpdateResponse], error) { + return subClient.BeginCreateOrUpdate(context.Background(), + resGroup, + nsHost, + topicName, + subName, + armeventgrid.Subscription{ + Properties: &armeventgrid.SubscriptionProperties{ + DeliveryConfiguration: &armeventgrid.DeliveryConfiguration{ + DeliveryMode: to.Ptr(armeventgrid.DeliveryModeQueue), + }, + }, + }, + nil) + }) + + if err != nil { + panic(err) + } + + fmt.Printf("Created topic %s\n", topicName) + + return func() { + if _, err = topicClient.BeginDelete(context.Background(), resGroup, nsHost, topicName, nil); err != nil { + fmt.Printf("Failed to start the delete for our test topic %s: %s", topicName, err) + } + } +} diff --git a/sdk/messaging/eventgrid/aznamespaces/models.go b/sdk/messaging/eventgrid/aznamespaces/models.go new file mode 100644 index 000000000000..7790117979e0 --- /dev/null +++ b/sdk/messaging/eventgrid/aznamespaces/models.go @@ -0,0 +1,117 @@ +//go:build go1.18 +// +build go1.18 + +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. +// Code generated by Microsoft (R) AutoRest Code Generator. DO NOT EDIT. +// Changes may cause incorrect behavior and will be lost if the code is regenerated. + +package aznamespaces + +import "github.com/Azure/azure-sdk-for-go/sdk/azcore/messaging" + +// acknowledgeOptions - Array of lock tokens for the corresponding received Cloud Events to be acknowledged. +type acknowledgeOptions struct { + // REQUIRED; Array of lock tokens. + LockTokens []string +} + +// AcknowledgeResult - The result of the Acknowledge operation. +type AcknowledgeResult struct { + // REQUIRED; Array of FailedLockToken for failed cloud events. Each FailedLockToken includes the lock token along with the + // related error information (namely, the error code and description). + FailedLockTokens []FailedLockToken + + // REQUIRED; Array of lock tokens for the successfully acknowledged cloud events. + SucceededLockTokens []string +} + +// BrokerProperties - Properties of the Event Broker operation. +type BrokerProperties struct { + // REQUIRED; The attempt count for delivering the event. + DeliveryCount *int32 + + // REQUIRED; The token of the lock on the event. + LockToken *string +} + +// Error - The error object. +type Error struct { + // REQUIRED; One of a server-defined set of error codes. + Code *string + + // REQUIRED; A human-readable representation of the error. + message *string +} + +// FailedLockToken - Failed LockToken information. +type FailedLockToken struct { + // REQUIRED; Error information of the failed operation result for the lock token in the request. + Error *Error + + // REQUIRED; The lock token of an entry in the request. + LockToken *string +} + +// ReceiveDetails - Receive operation details per Cloud Event. +type ReceiveDetails struct { + // REQUIRED; The Event Broker details. + BrokerProperties *BrokerProperties + + // REQUIRED; Cloud Event details. + Event messaging.CloudEvent +} + +// ReceiveResult - Details of the Receive operation response. +type ReceiveResult struct { + // REQUIRED; Array of receive responses, one per cloud event. + Value []ReceiveDetails +} + +// rejectOptions - Array of lock tokens for the corresponding received Cloud Events to be rejected. +type rejectOptions struct { + // REQUIRED; Array of lock tokens. + LockTokens []string +} + +// RejectResult - The result of the Reject operation. +type RejectResult struct { + // REQUIRED; Array of FailedLockToken for failed cloud events. Each FailedLockToken includes the lock token along with the + // related error information (namely, the error code and description). + FailedLockTokens []FailedLockToken + + // REQUIRED; Array of lock tokens for the successfully rejected cloud events. + SucceededLockTokens []string +} + +// releaseOptions - Array of lock tokens for the corresponding received Cloud Events to be released. +type releaseOptions struct { + // REQUIRED; Array of lock tokens. + LockTokens []string +} + +// ReleaseResult - The result of the Release operation. +type ReleaseResult struct { + // REQUIRED; Array of FailedLockToken for failed cloud events. Each FailedLockToken includes the lock token along with the + // related error information (namely, the error code and description). + FailedLockTokens []FailedLockToken + + // REQUIRED; Array of lock tokens for the successfully released cloud events. + SucceededLockTokens []string +} + +// RenewCloudEventLocksResult - The result of the RenewLock operation. +type RenewCloudEventLocksResult struct { + // REQUIRED; Array of FailedLockToken for failed cloud events. Each FailedLockToken includes the lock token along with the + // related error information (namely, the error code and description). + FailedLockTokens []FailedLockToken + + // REQUIRED; Array of lock tokens for the successfully renewed locks. + SucceededLockTokens []string +} + +// renewLockOptions - Array of lock tokens for the corresponding received Cloud Events to be renewed. +type renewLockOptions struct { + // REQUIRED; Array of lock tokens. + LockTokens []string +} diff --git a/sdk/messaging/eventgrid/aznamespaces/models_serde.go b/sdk/messaging/eventgrid/aznamespaces/models_serde.go new file mode 100644 index 000000000000..d7534f66df8c --- /dev/null +++ b/sdk/messaging/eventgrid/aznamespaces/models_serde.go @@ -0,0 +1,420 @@ +//go:build go1.18 +// +build go1.18 + +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. +// Code generated by Microsoft (R) AutoRest Code Generator. DO NOT EDIT. +// Changes may cause incorrect behavior and will be lost if the code is regenerated. + +package aznamespaces + +import ( + "encoding/json" + "fmt" + "reflect" + + "github.com/Azure/azure-sdk-for-go/sdk/azcore" +) + +// MarshalJSON implements the json.Marshaller interface for type acknowledgeOptions. +func (a acknowledgeOptions) MarshalJSON() ([]byte, error) { + objectMap := make(map[string]any) + populate(objectMap, "lockTokens", a.LockTokens) + return json.Marshal(objectMap) +} + +// UnmarshalJSON implements the json.Unmarshaller interface for type acknowledgeOptions. +func (a *acknowledgeOptions) UnmarshalJSON(data []byte) error { + var rawMsg map[string]json.RawMessage + if err := json.Unmarshal(data, &rawMsg); err != nil { + return fmt.Errorf("unmarshalling type %T: %v", a, err) + } + for key, val := range rawMsg { + var err error + switch key { + case "lockTokens": + err = unpopulate(val, "LockTokens", &a.LockTokens) + delete(rawMsg, key) + } + if err != nil { + return fmt.Errorf("unmarshalling type %T: %v", a, err) + } + } + return nil +} + +// MarshalJSON implements the json.Marshaller interface for type AcknowledgeResult. +func (a AcknowledgeResult) MarshalJSON() ([]byte, error) { + objectMap := make(map[string]any) + populate(objectMap, "failedLockTokens", a.FailedLockTokens) + populate(objectMap, "succeededLockTokens", a.SucceededLockTokens) + return json.Marshal(objectMap) +} + +// UnmarshalJSON implements the json.Unmarshaller interface for type AcknowledgeResult. +func (a *AcknowledgeResult) UnmarshalJSON(data []byte) error { + var rawMsg map[string]json.RawMessage + if err := json.Unmarshal(data, &rawMsg); err != nil { + return fmt.Errorf("unmarshalling type %T: %v", a, err) + } + for key, val := range rawMsg { + var err error + switch key { + case "failedLockTokens": + err = unpopulate(val, "FailedLockTokens", &a.FailedLockTokens) + delete(rawMsg, key) + case "succeededLockTokens": + err = unpopulate(val, "SucceededLockTokens", &a.SucceededLockTokens) + delete(rawMsg, key) + } + if err != nil { + return fmt.Errorf("unmarshalling type %T: %v", a, err) + } + } + return nil +} + +// MarshalJSON implements the json.Marshaller interface for type BrokerProperties. +func (b BrokerProperties) MarshalJSON() ([]byte, error) { + objectMap := make(map[string]any) + populate(objectMap, "deliveryCount", b.DeliveryCount) + populate(objectMap, "lockToken", b.LockToken) + return json.Marshal(objectMap) +} + +// UnmarshalJSON implements the json.Unmarshaller interface for type BrokerProperties. +func (b *BrokerProperties) UnmarshalJSON(data []byte) error { + var rawMsg map[string]json.RawMessage + if err := json.Unmarshal(data, &rawMsg); err != nil { + return fmt.Errorf("unmarshalling type %T: %v", b, err) + } + for key, val := range rawMsg { + var err error + switch key { + case "deliveryCount": + err = unpopulate(val, "DeliveryCount", &b.DeliveryCount) + delete(rawMsg, key) + case "lockToken": + err = unpopulate(val, "LockToken", &b.LockToken) + delete(rawMsg, key) + } + if err != nil { + return fmt.Errorf("unmarshalling type %T: %v", b, err) + } + } + return nil +} + +// MarshalJSON implements the json.Marshaller interface for type Error. +func (e Error) MarshalJSON() ([]byte, error) { + objectMap := make(map[string]any) + populate(objectMap, "code", e.Code) + populate(objectMap, "message", e.message) + return json.Marshal(objectMap) +} + +// UnmarshalJSON implements the json.Unmarshaller interface for type Error. +func (e *Error) UnmarshalJSON(data []byte) error { + var rawMsg map[string]json.RawMessage + if err := json.Unmarshal(data, &rawMsg); err != nil { + return fmt.Errorf("unmarshalling type %T: %v", e, err) + } + for key, val := range rawMsg { + var err error + switch key { + case "code": + err = unpopulate(val, "Code", &e.Code) + delete(rawMsg, key) + case "message": + err = unpopulate(val, "message", &e.message) + delete(rawMsg, key) + } + if err != nil { + return fmt.Errorf("unmarshalling type %T: %v", e, err) + } + } + return nil +} + +// MarshalJSON implements the json.Marshaller interface for type FailedLockToken. +func (f FailedLockToken) MarshalJSON() ([]byte, error) { + objectMap := make(map[string]any) + populate(objectMap, "error", f.Error) + populate(objectMap, "lockToken", f.LockToken) + return json.Marshal(objectMap) +} + +// UnmarshalJSON implements the json.Unmarshaller interface for type FailedLockToken. +func (f *FailedLockToken) UnmarshalJSON(data []byte) error { + var rawMsg map[string]json.RawMessage + if err := json.Unmarshal(data, &rawMsg); err != nil { + return fmt.Errorf("unmarshalling type %T: %v", f, err) + } + for key, val := range rawMsg { + var err error + switch key { + case "error": + err = unpopulate(val, "Error", &f.Error) + delete(rawMsg, key) + case "lockToken": + err = unpopulate(val, "LockToken", &f.LockToken) + delete(rawMsg, key) + } + if err != nil { + return fmt.Errorf("unmarshalling type %T: %v", f, err) + } + } + return nil +} + +// MarshalJSON implements the json.Marshaller interface for type ReceiveDetails. +func (r ReceiveDetails) MarshalJSON() ([]byte, error) { + objectMap := make(map[string]any) + populate(objectMap, "brokerProperties", r.BrokerProperties) + populate(objectMap, "event", r.Event) + return json.Marshal(objectMap) +} + +// UnmarshalJSON implements the json.Unmarshaller interface for type ReceiveDetails. +func (r *ReceiveDetails) UnmarshalJSON(data []byte) error { + var rawMsg map[string]json.RawMessage + if err := json.Unmarshal(data, &rawMsg); err != nil { + return fmt.Errorf("unmarshalling type %T: %v", r, err) + } + for key, val := range rawMsg { + var err error + switch key { + case "brokerProperties": + err = unpopulate(val, "BrokerProperties", &r.BrokerProperties) + delete(rawMsg, key) + case "event": + err = unpopulate(val, "Event", &r.Event) + delete(rawMsg, key) + } + if err != nil { + return fmt.Errorf("unmarshalling type %T: %v", r, err) + } + } + return nil +} + +// MarshalJSON implements the json.Marshaller interface for type ReceiveResult. +func (r ReceiveResult) MarshalJSON() ([]byte, error) { + objectMap := make(map[string]any) + populate(objectMap, "value", r.Value) + return json.Marshal(objectMap) +} + +// UnmarshalJSON implements the json.Unmarshaller interface for type ReceiveResult. +func (r *ReceiveResult) UnmarshalJSON(data []byte) error { + var rawMsg map[string]json.RawMessage + if err := json.Unmarshal(data, &rawMsg); err != nil { + return fmt.Errorf("unmarshalling type %T: %v", r, err) + } + for key, val := range rawMsg { + var err error + switch key { + case "value": + err = unpopulate(val, "Value", &r.Value) + delete(rawMsg, key) + } + if err != nil { + return fmt.Errorf("unmarshalling type %T: %v", r, err) + } + } + return nil +} + +// MarshalJSON implements the json.Marshaller interface for type rejectOptions. +func (r rejectOptions) MarshalJSON() ([]byte, error) { + objectMap := make(map[string]any) + populate(objectMap, "lockTokens", r.LockTokens) + return json.Marshal(objectMap) +} + +// UnmarshalJSON implements the json.Unmarshaller interface for type rejectOptions. +func (r *rejectOptions) UnmarshalJSON(data []byte) error { + var rawMsg map[string]json.RawMessage + if err := json.Unmarshal(data, &rawMsg); err != nil { + return fmt.Errorf("unmarshalling type %T: %v", r, err) + } + for key, val := range rawMsg { + var err error + switch key { + case "lockTokens": + err = unpopulate(val, "LockTokens", &r.LockTokens) + delete(rawMsg, key) + } + if err != nil { + return fmt.Errorf("unmarshalling type %T: %v", r, err) + } + } + return nil +} + +// MarshalJSON implements the json.Marshaller interface for type RejectResult. +func (r RejectResult) MarshalJSON() ([]byte, error) { + objectMap := make(map[string]any) + populate(objectMap, "failedLockTokens", r.FailedLockTokens) + populate(objectMap, "succeededLockTokens", r.SucceededLockTokens) + return json.Marshal(objectMap) +} + +// UnmarshalJSON implements the json.Unmarshaller interface for type RejectResult. +func (r *RejectResult) UnmarshalJSON(data []byte) error { + var rawMsg map[string]json.RawMessage + if err := json.Unmarshal(data, &rawMsg); err != nil { + return fmt.Errorf("unmarshalling type %T: %v", r, err) + } + for key, val := range rawMsg { + var err error + switch key { + case "failedLockTokens": + err = unpopulate(val, "FailedLockTokens", &r.FailedLockTokens) + delete(rawMsg, key) + case "succeededLockTokens": + err = unpopulate(val, "SucceededLockTokens", &r.SucceededLockTokens) + delete(rawMsg, key) + } + if err != nil { + return fmt.Errorf("unmarshalling type %T: %v", r, err) + } + } + return nil +} + +// MarshalJSON implements the json.Marshaller interface for type releaseOptions. +func (r releaseOptions) MarshalJSON() ([]byte, error) { + objectMap := make(map[string]any) + populate(objectMap, "lockTokens", r.LockTokens) + return json.Marshal(objectMap) +} + +// UnmarshalJSON implements the json.Unmarshaller interface for type releaseOptions. +func (r *releaseOptions) UnmarshalJSON(data []byte) error { + var rawMsg map[string]json.RawMessage + if err := json.Unmarshal(data, &rawMsg); err != nil { + return fmt.Errorf("unmarshalling type %T: %v", r, err) + } + for key, val := range rawMsg { + var err error + switch key { + case "lockTokens": + err = unpopulate(val, "LockTokens", &r.LockTokens) + delete(rawMsg, key) + } + if err != nil { + return fmt.Errorf("unmarshalling type %T: %v", r, err) + } + } + return nil +} + +// MarshalJSON implements the json.Marshaller interface for type ReleaseResult. +func (r ReleaseResult) MarshalJSON() ([]byte, error) { + objectMap := make(map[string]any) + populate(objectMap, "failedLockTokens", r.FailedLockTokens) + populate(objectMap, "succeededLockTokens", r.SucceededLockTokens) + return json.Marshal(objectMap) +} + +// UnmarshalJSON implements the json.Unmarshaller interface for type ReleaseResult. +func (r *ReleaseResult) UnmarshalJSON(data []byte) error { + var rawMsg map[string]json.RawMessage + if err := json.Unmarshal(data, &rawMsg); err != nil { + return fmt.Errorf("unmarshalling type %T: %v", r, err) + } + for key, val := range rawMsg { + var err error + switch key { + case "failedLockTokens": + err = unpopulate(val, "FailedLockTokens", &r.FailedLockTokens) + delete(rawMsg, key) + case "succeededLockTokens": + err = unpopulate(val, "SucceededLockTokens", &r.SucceededLockTokens) + delete(rawMsg, key) + } + if err != nil { + return fmt.Errorf("unmarshalling type %T: %v", r, err) + } + } + return nil +} + +// MarshalJSON implements the json.Marshaller interface for type RenewCloudEventLocksResult. +func (r RenewCloudEventLocksResult) MarshalJSON() ([]byte, error) { + objectMap := make(map[string]any) + populate(objectMap, "failedLockTokens", r.FailedLockTokens) + populate(objectMap, "succeededLockTokens", r.SucceededLockTokens) + return json.Marshal(objectMap) +} + +// UnmarshalJSON implements the json.Unmarshaller interface for type RenewCloudEventLocksResult. +func (r *RenewCloudEventLocksResult) UnmarshalJSON(data []byte) error { + var rawMsg map[string]json.RawMessage + if err := json.Unmarshal(data, &rawMsg); err != nil { + return fmt.Errorf("unmarshalling type %T: %v", r, err) + } + for key, val := range rawMsg { + var err error + switch key { + case "failedLockTokens": + err = unpopulate(val, "FailedLockTokens", &r.FailedLockTokens) + delete(rawMsg, key) + case "succeededLockTokens": + err = unpopulate(val, "SucceededLockTokens", &r.SucceededLockTokens) + delete(rawMsg, key) + } + if err != nil { + return fmt.Errorf("unmarshalling type %T: %v", r, err) + } + } + return nil +} + +// MarshalJSON implements the json.Marshaller interface for type renewLockOptions. +func (r renewLockOptions) MarshalJSON() ([]byte, error) { + objectMap := make(map[string]any) + populate(objectMap, "lockTokens", r.LockTokens) + return json.Marshal(objectMap) +} + +// UnmarshalJSON implements the json.Unmarshaller interface for type renewLockOptions. +func (r *renewLockOptions) UnmarshalJSON(data []byte) error { + var rawMsg map[string]json.RawMessage + if err := json.Unmarshal(data, &rawMsg); err != nil { + return fmt.Errorf("unmarshalling type %T: %v", r, err) + } + for key, val := range rawMsg { + var err error + switch key { + case "lockTokens": + err = unpopulate(val, "LockTokens", &r.LockTokens) + delete(rawMsg, key) + } + if err != nil { + return fmt.Errorf("unmarshalling type %T: %v", r, err) + } + } + return nil +} + +func populate(m map[string]any, k string, v any) { + if v == nil { + return + } else if azcore.IsNullValue(v) { + m[k] = nil + } else if !reflect.ValueOf(v).IsNil() { + m[k] = v + } +} + +func unpopulate(data json.RawMessage, fn string, v any) error { + if data == nil || string(data) == "null" { + return nil + } + if err := json.Unmarshal(data, v); err != nil { + return fmt.Errorf("struct field %s: %v", fn, err) + } + return nil +} diff --git a/sdk/messaging/eventgrid/aznamespaces/options.go b/sdk/messaging/eventgrid/aznamespaces/options.go new file mode 100644 index 000000000000..d0298e33af28 --- /dev/null +++ b/sdk/messaging/eventgrid/aznamespaces/options.go @@ -0,0 +1,53 @@ +//go:build go1.18 +// +build go1.18 + +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. +// Code generated by Microsoft (R) AutoRest Code Generator. DO NOT EDIT. +// Changes may cause incorrect behavior and will be lost if the code is regenerated. + +package aznamespaces + +// AcknowledgeCloudEventsOptions contains the optional parameters for the Client.AcknowledgeCloudEvents method. +type AcknowledgeCloudEventsOptions struct { + // placeholder for future optional parameters +} + +// PublishCloudEventOptions contains the optional parameters for the Client.PublishCloudEvent method. +type PublishCloudEventOptions struct { + // placeholder for future optional parameters +} + +// PublishCloudEventsOptions contains the optional parameters for the Client.PublishCloudEvents method. +type PublishCloudEventsOptions struct { + // placeholder for future optional parameters +} + +// ReceiveCloudEventsOptions contains the optional parameters for the Client.ReceiveCloudEvents method. +type ReceiveCloudEventsOptions struct { + // Max Events count to be received. Minimum value is 1, while maximum value is 100 events. If not specified, the default value + // is 1. + MaxEvents *int32 + + // Max wait time value for receive operation in Seconds. It is the time in seconds that the server approximately waits for + // the availability of an event and responds to the request. If an event is + // available, the broker responds immediately to the client. Minimum value is 10 seconds, while maximum value is 120 seconds. + // If not specified, the default value is 60 seconds. + MaxWaitTime *int32 +} + +// RejectCloudEventsOptions contains the optional parameters for the Client.RejectCloudEvents method. +type RejectCloudEventsOptions struct { + // placeholder for future optional parameters +} + +// ReleaseCloudEventsOptions contains the optional parameters for the Client.ReleaseCloudEvents method. +type ReleaseCloudEventsOptions struct { + // Release cloud events with the specified delay in seconds. + ReleaseDelayInSeconds *ReleaseDelay +} + +// RenewCloudEventLocksOptions contains the optional parameters for the Client.RenewCloudEventLocks method. +type RenewCloudEventLocksOptions struct { + // placeholder for future optional parameters +} diff --git a/sdk/messaging/eventgrid/aznamespaces/responses.go b/sdk/messaging/eventgrid/aznamespaces/responses.go new file mode 100644 index 000000000000..4a77e1441767 --- /dev/null +++ b/sdk/messaging/eventgrid/aznamespaces/responses.go @@ -0,0 +1,49 @@ +//go:build go1.18 +// +build go1.18 + +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. +// Code generated by Microsoft (R) AutoRest Code Generator. DO NOT EDIT. +// Changes may cause incorrect behavior and will be lost if the code is regenerated. + +package aznamespaces + +// AcknowledgeCloudEventsResponse contains the response from method Client.AcknowledgeCloudEvents. +type AcknowledgeCloudEventsResponse struct { + // The result of the Acknowledge operation. + AcknowledgeResult +} + +// PublishCloudEventResponse contains the response from method Client.PublishCloudEvent. +type PublishCloudEventResponse struct { + // placeholder for future response values +} + +// PublishCloudEventsResponse contains the response from method Client.PublishCloudEvents. +type PublishCloudEventsResponse struct { + // placeholder for future response values +} + +// ReceiveCloudEventsResponse contains the response from method Client.ReceiveCloudEvents. +type ReceiveCloudEventsResponse struct { + // Details of the Receive operation response. + ReceiveResult +} + +// RejectCloudEventsResponse contains the response from method Client.RejectCloudEvents. +type RejectCloudEventsResponse struct { + // The result of the Reject operation. + RejectResult +} + +// ReleaseCloudEventsResponse contains the response from method Client.ReleaseCloudEvents. +type ReleaseCloudEventsResponse struct { + // The result of the Release operation. + ReleaseResult +} + +// RenewCloudEventLocksResponse contains the response from method Client.RenewCloudEventLocks. +type RenewCloudEventLocksResponse struct { + // The result of the RenewLock operation. + RenewCloudEventLocksResult +} diff --git a/sdk/messaging/eventgrid/aznamespaces/shared_test.go b/sdk/messaging/eventgrid/aznamespaces/shared_test.go new file mode 100644 index 000000000000..316205f08a15 --- /dev/null +++ b/sdk/messaging/eventgrid/aznamespaces/shared_test.go @@ -0,0 +1,222 @@ +//go:build go1.18 +// +build go1.18 + +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package aznamespaces_test + +import ( + "crypto/tls" + "fmt" + "net/http" + "os" + "strings" + "testing" + + "github.com/Azure/azure-sdk-for-go/sdk/azcore" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/messaging" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/policy" + "github.com/Azure/azure-sdk-for-go/sdk/internal/recording" + "github.com/Azure/azure-sdk-for-go/sdk/messaging/eventgrid/aznamespaces" + "github.com/stretchr/testify/require" +) + +var fakeTestVars = testVars{ + Key: "key", + Endpoint: "https://fake.eastus-1.eventgrid.azure.net", + Topic: "topic", + Subscription: "subscription", +} + +type testVars struct { + Key string + Endpoint string + Topic string + Subscription string + + // KeyLogPath is the value of environment "SSLKEYLOGFILE_TEST", which + // points to a file on disk where we'll write the TLS pre-master-secret. + // This is useful if you want to trace parts of this test using Wireshark. + KeyLogPath string +} + +func loadEnv() (testVars, error) { + var missing []string + + get := func(n string) string { + if v := os.Getenv(n); v == "" { + missing = append(missing, n) + } + + return os.Getenv(n) + } + + tv := testVars{ + Key: get("EVENTGRID_KEY"), + Endpoint: get("EVENTGRID_ENDPOINT"), + Topic: get("EVENTGRID_TOPIC"), + Subscription: get("EVENTGRID_SUBSCRIPTION"), + } + + if len(missing) > 0 { + return testVars{}, fmt.Errorf("Missing env variables: %s", strings.Join(missing, ",")) + } + + // Setting this variable will cause the test clients to dump out the pre-master-key + // for your HTTP connection. This allows you decrypt a packet capture from wireshark. + // + // If you want to do this just set SSLKEYLOGFILE env var to a path on disk and + // Go will write out the key. + tv.KeyLogPath = os.Getenv("SSLKEYLOGFILE") + return tv, nil +} + +type clientWrapper struct { + *aznamespaces.Client + TestVars testVars +} + +type clientWrapperOptions struct { + DontPurgeEvents bool +} + +func newClientWrapper(t *testing.T, opts *clientWrapperOptions) clientWrapper { + var client *aznamespaces.Client + var tv testVars + + if recording.GetRecordMode() != recording.PlaybackMode { + tmpTestVars, err := loadEnv() + require.NoError(t, err) + tv = tmpTestVars + } else { + tv = fakeTestVars + } + + var options *aznamespaces.ClientOptions + + if recording.GetRecordMode() == recording.LiveMode { + if tv.KeyLogPath != "" { + keyLogWriter, err := os.OpenFile(tv.KeyLogPath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0777) + require.NoError(t, err) + + t.Cleanup(func() { keyLogWriter.Close() }) + + tp := http.DefaultTransport.(*http.Transport).Clone() + tp.TLSClientConfig = &tls.Config{ + KeyLogWriter: keyLogWriter, + } + + options = &aznamespaces.ClientOptions{ + ClientOptions: azcore.ClientOptions{ + Transport: &http.Client{Transport: tp}, + }, + } + } + } else { + options = &aznamespaces.ClientOptions{ + ClientOptions: azcore.ClientOptions{ + Transport: newRecordingTransporter(t, tv), + }, + } + } + + if options != nil { + options.Logging = policy.LogOptions{ + IncludeBody: true, + } + } + + client, err := aznamespaces.NewClientWithSharedKeyCredential(tv.Endpoint, azcore.NewKeyCredential(tv.Key), options) + require.NoError(t, err) + + return clientWrapper{ + Client: client, + TestVars: tv, + } +} + +func newRecordingTransporter(t *testing.T, testVars testVars) policy.Transporter { + transport, err := recording.NewRecordingHTTPClient(t, nil) + require.NoError(t, err) + + err = recording.Start(t, recordingDirectory, nil) + require.NoError(t, err) + + // err = recording.ResetProxy(nil) + // require.NoError(t, err) + + err = recording.AddURISanitizer(fakeTestVars.Endpoint, testVars.Endpoint, nil) + require.NoError(t, err) + + err = recording.AddURISanitizer(fakeTestVars.Topic, testVars.Topic, nil) + require.NoError(t, err) + + err = recording.AddURISanitizer(fakeTestVars.Subscription, testVars.Subscription, nil) + require.NoError(t, err) + + err = recording.AddGeneralRegexSanitizer(`"time": "2023-06-17T00:33:32Z"`, `"time":".+?"`, nil) + require.NoError(t, err) + + err = recording.AddGeneralRegexSanitizer( + `"id":"00000000-0000-0000-0000-000000000000"`, + `"id":"[^"]+"`, nil) + require.NoError(t, err) + + err = recording.AddGeneralRegexSanitizer( + `"lockToken":"fake-lock-token"`, + `"lockToken":\s*"[^"]+"`, nil) + require.NoError(t, err) + + err = recording.AddGeneralRegexSanitizer( + `"lockTokens": ["fake-lock-token"]`, + `"lockTokens":\s*\[\s*"[^"]+"\s*\]`, nil) + require.NoError(t, err) + + err = recording.AddGeneralRegexSanitizer( + `"succeededLockTokens": ["fake-lock-token"]`, + `"succeededLockTokens":\s*\[\s*"[^"]+"\s*\]`, nil) + require.NoError(t, err) + + err = recording.AddGeneralRegexSanitizer( + `"succeededLockTokens": ["fake-lock-token", "fake-lock-token", "fake-lock-token"]`, + `"succeededLockTokens":\s*`+ + `\[`+ + `(\s*"[^"]+"\s*\,){2}`+ + `\s*"[^"]+"\s*`+ + `\]`, nil) + require.NoError(t, err) + + err = recording.AddGeneralRegexSanitizer( + `"lockTokens": ["fake-lock-token", "fake-lock-token"]`, + `"lockTokens":\s*\[\s*"[^"]+"\s*\,\s*"[^"]+"\s*\]`, nil) + require.NoError(t, err) + + err = recording.AddGeneralRegexSanitizer( + `"lockTokens": ["fake-lock-token", "fake-lock-token", "fake-lock-token"]`, + `"lockTokens":\s*`+ + `\[`+ + `(\s*"[^"]+"\s*\,){2}`+ + `\s*"[^"]+"\s*`+ + `\]`, nil) + require.NoError(t, err) + + t.Cleanup(func() { + err := recording.Stop(t, nil) + require.NoError(t, err) + }) + + return transport +} + +func requireEqualCloudEvent(t *testing.T, expected messaging.CloudEvent, actual messaging.CloudEvent) { + t.Helper() + + require.NotEmpty(t, actual.ID, "ID is not empty") + require.NotEmpty(t, actual.SpecVersion, "SpecVersion is not empty") + + expected.ID = actual.ID + expected.Time = actual.Time + + require.Equal(t, actual, expected) +} diff --git a/sdk/messaging/eventgrid/aznamespaces/test-resources.bicep b/sdk/messaging/eventgrid/aznamespaces/test-resources.bicep new file mode 100644 index 000000000000..5459b26f6ba7 --- /dev/null +++ b/sdk/messaging/eventgrid/aznamespaces/test-resources.bicep @@ -0,0 +1,127 @@ +@description('The base resource name.') +param baseName string = resourceGroup().name + +@description('The resource location') +param location string = resourceGroup().location + +@description('The client OID to grant access to test resources.') +param testApplicationOid string + +output RESOURCE_GROUP string = resourceGroup().name +output AZURE_SUBSCRIPTION_ID string = subscription().subscriptionId + +// +// [BEGIN] Event Grid namespace +// + +var namespaceName = '${baseName}-2' +var nsTopicName = 'testtopic1' +var nsSubscriptionName = 'testsubscription1' + +resource ns_resource 'Microsoft.EventGrid/namespaces@2023-06-01-preview' = { + name: namespaceName + location: location + sku: { + name: 'Standard' + capacity: 1 + } + properties: { + isZoneRedundant: true + publicNetworkAccess: 'Enabled' + } +} + +resource ns_testtopic1 'Microsoft.EventGrid/namespaces/topics@2023-06-01-preview' = { + parent: ns_resource + name: nsTopicName + properties: { + publisherType: 'Custom' + inputSchema: 'CloudEventSchemaV1_0' + eventRetentionInDays: 1 + } +} + +resource ns_testtopic1_testsubscription1 'Microsoft.EventGrid/namespaces/topics/eventSubscriptions@2023-06-01-preview' = { + parent: ns_testtopic1 + name: nsSubscriptionName + properties: { + deliveryConfiguration: { + deliveryMode: 'Queue' + queue: { + receiveLockDurationInSeconds: 60 + maxDeliveryCount: 10 + eventTimeToLive: 'P1D' + } + } + eventDeliverySchema: 'CloudEventSchemaV1_0' + filtersConfiguration: { + includedEventTypes: [] + } + } +} + +// https://learn.microsoft.com/en-us/rest/api/eventgrid/controlplane-version2023-06-01-preview/namespaces/list-shared-access-keys?tabs=HTTP +#disable-next-line outputs-should-not-contain-secrets // (this is just how our test deployments work) +output EVENTGRID_KEY string = listKeys(resourceId('Microsoft.EventGrid/namespaces', namespaceName), '2023-06-01-preview').key1 +// TODO: get this formatted properly +output EVENTGRID_ENDPOINT string = 'https://${ns_resource.properties.topicsConfiguration.hostname}' + +output EVENTGRID_TOPIC string = nsTopicName +output EVENTGRID_SUBSCRIPTION string = nsSubscriptionName + +// [END] Event Grid namespace + +// +// [BEGIN] Event Grid topics (publisher) +// + +resource egTopic 'Microsoft.EventGrid/topics@2023-06-01-preview' = { + name: '${baseName}-eg' + location: location + kind: 'Azure' + properties: { + inputSchema: 'EventGridSchema' + } +} + +resource ceTopic 'Microsoft.EventGrid/topics@2023-06-01-preview' = { + name: '${baseName}-ce' + location: location + kind: 'Azure' + properties: { + inputSchema: 'CloudEventSchemaV1_0' + } +} + +resource egContributorRole 'Microsoft.Authorization/roleAssignments@2018-01-01-preview' = { + name: guid('egContributorRoleId${baseName}') + scope: resourceGroup() + properties: { + roleDefinitionId: subscriptionResourceId('Microsoft.Authorization/roleDefinitions', '1e241071-0855-49ea-94dc-649edcd759de') + // roleDefinitionId: '/subscriptions/${subscription().subscriptionId}/providers/Microsoft.Authorization/roleDefinitions/1e241071-0855-49ea-94dc-649edcd759de' + principalId: testApplicationOid + } +} + +resource egDataSenderRole 'Microsoft.Authorization/roleAssignments@2022-04-01' = { + name: guid('egSenderRoleId${baseName}') + scope: resourceGroup() + properties: { + roleDefinitionId: subscriptionResourceId('Microsoft.Authorization/roleDefinitions', 'd5a91429-5739-47e2-a06b-3470a27159e7') + principalId: testApplicationOid + } +} + +output EVENTGRID_TOPIC_NAME string = egTopic.name +#disable-next-line outputs-should-not-contain-secrets // (this is just how our test deployments work) +output EVENTGRID_TOPIC_KEY string = egTopic.listKeys().key1 +output EVENTGRID_TOPIC_ENDPOINT string = egTopic.properties.endpoint + +output EVENTGRID_CE_TOPIC_NAME string = ceTopic.name +#disable-next-line outputs-should-not-contain-secrets // (this is just how our test deployments work) +output EVENTGRID_CE_TOPIC_KEY string = ceTopic.listKeys().key1 +output EVENTGRID_CE_TOPIC_ENDPOINT string = ceTopic.properties.endpoint + +// +// [END] Event Grid topics (publisher) +// diff --git a/sdk/messaging/eventgrid/aznamespaces/time_rfc3339.go b/sdk/messaging/eventgrid/aznamespaces/time_rfc3339.go new file mode 100644 index 000000000000..c85f9aaa3353 --- /dev/null +++ b/sdk/messaging/eventgrid/aznamespaces/time_rfc3339.go @@ -0,0 +1,82 @@ +//go:build go1.18 +// +build go1.18 + +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. +// Code generated by Microsoft (R) AutoRest Code Generator. DO NOT EDIT. +// Changes may cause incorrect behavior and will be lost if the code is regenerated. + +package aznamespaces + +import ( + "regexp" + "strings" + "time" +) + +// Azure reports time in UTC but it doesn't include the 'Z' time zone suffix in some cases. +var tzOffsetRegex = regexp.MustCompile(`(?:Z|z|\+|-)(?:\d+:\d+)*"*$`) + +const ( + utcDateTime = "2006-01-02T15:04:05.999999999" + utcDateTimeJSON = `"` + utcDateTime + `"` + utcDateTimeNoT = "2006-01-02 15:04:05.999999999" + utcDateTimeJSONNoT = `"` + utcDateTimeNoT + `"` + dateTimeNoT = `2006-01-02 15:04:05.999999999Z07:00` + dateTimeJSON = `"` + time.RFC3339Nano + `"` + dateTimeJSONNoT = `"` + dateTimeNoT + `"` +) + +type dateTimeRFC3339 time.Time + +func (t dateTimeRFC3339) MarshalJSON() ([]byte, error) { + tt := time.Time(t) + return tt.MarshalJSON() +} + +func (t dateTimeRFC3339) MarshalText() ([]byte, error) { + tt := time.Time(t) + return tt.MarshalText() +} + +func (t *dateTimeRFC3339) UnmarshalJSON(data []byte) error { + tzOffset := tzOffsetRegex.Match(data) + hasT := strings.Contains(string(data), "T") || strings.Contains(string(data), "t") + var layout string + if tzOffset && hasT { + layout = dateTimeJSON + } else if tzOffset { + layout = dateTimeJSONNoT + } else if hasT { + layout = utcDateTimeJSON + } else { + layout = utcDateTimeJSONNoT + } + return t.Parse(layout, string(data)) +} + +func (t *dateTimeRFC3339) UnmarshalText(data []byte) error { + tzOffset := tzOffsetRegex.Match(data) + hasT := strings.Contains(string(data), "T") || strings.Contains(string(data), "t") + var layout string + if tzOffset && hasT { + layout = time.RFC3339Nano + } else if tzOffset { + layout = dateTimeNoT + } else if hasT { + layout = utcDateTime + } else { + layout = utcDateTimeNoT + } + return t.Parse(layout, string(data)) +} + +func (t *dateTimeRFC3339) Parse(layout, value string) error { + p, err := time.Parse(layout, strings.ToUpper(value)) + *t = dateTimeRFC3339(p) + return err +} + +func (t dateTimeRFC3339) String() string { + return time.Time(t).Format(time.RFC3339Nano) +}