Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Evengrid] Regenrate code gen #13584

Merged
merged 3 commits into from
Sep 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# coding=utf-8
# --------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# Code generated by Microsoft (R) AutoRest Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is regenerated.
# --------------------------------------------------------------------------

from typing import Any

from azure.core.configuration import Configuration
from azure.core.pipeline import policies

VERSION = "unknown"

class EventGridPublisherClientConfiguration(Configuration):
"""Configuration for EventGridPublisherClient.

Note that all parameters used to create this instance are saved as instance
attributes.

"""

def __init__(
self,
**kwargs: Any
) -> None:
super(EventGridPublisherClientConfiguration, self).__init__(**kwargs)

self.api_version = "2018-01-01"
kwargs.setdefault('sdk_moniker', 'eventgridpublisherclient/{}'.format(VERSION))
self._configure(**kwargs)

def _configure(
self,
**kwargs: Any
) -> None:
self.user_agent_policy = kwargs.get('user_agent_policy') or policies.UserAgentPolicy(**kwargs)
self.headers_policy = kwargs.get('headers_policy') or policies.HeadersPolicy(**kwargs)
self.proxy_policy = kwargs.get('proxy_policy') or policies.ProxyPolicy(**kwargs)
self.logging_policy = kwargs.get('logging_policy') or policies.NetworkTraceLoggingPolicy(**kwargs)
self.http_logging_policy = kwargs.get('http_logging_policy') or policies.HttpLoggingPolicy(**kwargs)
self.retry_policy = kwargs.get('retry_policy') or policies.AsyncRetryPolicy(**kwargs)
self.custom_hook_policy = kwargs.get('custom_hook_policy') or policies.CustomHookPolicy(**kwargs)
self.redirect_policy = kwargs.get('redirect_policy') or policies.AsyncRedirectPolicy(**kwargs)
self.authentication_policy = kwargs.get('authentication_policy')
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# coding=utf-8
# --------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# Code generated by Microsoft (R) AutoRest Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is regenerated.
# --------------------------------------------------------------------------

from typing import Any

from azure.core import AsyncPipelineClient
from msrest import Deserializer, Serializer

from ._configuration import EventGridPublisherClientConfiguration
from .operations import EventGridPublisherClientOperationsMixin
from .. import models


class EventGridPublisherClient(EventGridPublisherClientOperationsMixin):
"""EventGrid Python Publisher Client.

"""

def __init__(
self,
**kwargs: Any
) -> None:
base_url = 'https://{topicHostname}'
self._config = EventGridPublisherClientConfiguration(**kwargs)
self._client = AsyncPipelineClient(base_url=base_url, config=self._config, **kwargs)

client_models = {k: v for k, v in models.__dict__.items() if isinstance(v, type)}
self._serialize = Serializer(client_models)
self._deserialize = Deserializer(client_models)


async def close(self) -> None:
await self._client.close()

async def __aenter__(self) -> "EventGridPublisherClient":
await self._client.__aenter__()
return self

async def __aexit__(self, *exc_details) -> None:
await self._client.__aexit__(*exc_details)
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# coding=utf-8
# --------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# Code generated by Microsoft (R) AutoRest Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is regenerated.
# --------------------------------------------------------------------------

from ._event_grid_publisher_client_operations import EventGridPublisherClientOperationsMixin

__all__ = [
'EventGridPublisherClientOperationsMixin',
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
# coding=utf-8
# --------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# Code generated by Microsoft (R) AutoRest Code Generator.
# Changes may cause incorrect behavior and will be lost if the code is regenerated.
# --------------------------------------------------------------------------
from typing import Any, Callable, Dict, Generic, List, Optional, TypeVar
import warnings

from azure.core.exceptions import HttpResponseError, ResourceExistsError, ResourceNotFoundError, map_error
from azure.core.pipeline import PipelineResponse
from azure.core.pipeline.transport import AsyncHttpResponse, HttpRequest

from ... import models

T = TypeVar('T')
ClsType = Optional[Callable[[PipelineResponse[HttpRequest, AsyncHttpResponse], T, Dict[str, Any]], Any]]

class EventGridPublisherClientOperationsMixin:

async def publish_events(
self,
topic_hostname: str,
events: List["models.EventGridEvent"],
**kwargs
) -> None:
"""Publishes a batch of events to an Azure Event Grid topic.

:param topic_hostname: The host name of the topic, e.g. topic1.westus2-1.eventgrid.azure.net.
:type topic_hostname: str
:param events: An array of events to be published to Event Grid.
:type events: list[~event_grid_publisher_client.models.EventGridEvent]
:keyword callable cls: A custom type or function that will be passed the direct response
:return: None, or the result of cls(response)
:rtype: None
:raises: ~azure.core.exceptions.HttpResponseError
"""
cls = kwargs.pop('cls', None) # type: ClsType[None]
error_map = {404: ResourceNotFoundError, 409: ResourceExistsError}
error_map.update(kwargs.pop('error_map', {}))
api_version = "2018-01-01"
content_type = kwargs.pop("content_type", "application/json")

# Construct URL
url = self.publish_events.metadata['url'] # type: ignore
path_format_arguments = {
'topicHostname': self._serialize.url("topic_hostname", topic_hostname, 'str', skip_quote=True),
}
url = self._client.format_url(url, **path_format_arguments)

# Construct parameters
query_parameters = {} # type: Dict[str, Any]
query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str')

# Construct headers
header_parameters = {} # type: Dict[str, Any]
header_parameters['Content-Type'] = self._serialize.header("content_type", content_type, 'str')

body_content_kwargs = {} # type: Dict[str, Any]
body_content = self._serialize.body(events, '[EventGridEvent]')
body_content_kwargs['content'] = body_content
request = self._client.post(url, query_parameters, header_parameters, **body_content_kwargs)
pipeline_response = await self._client._pipeline.run(request, stream=False, **kwargs)
response = pipeline_response.http_response

if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)

if cls:
return cls(pipeline_response, None, {})

publish_events.metadata = {'url': '/api/events'} # type: ignore

async def publish_cloud_event_events(
self,
topic_hostname: str,
events: List["models.CloudEvent"],
**kwargs
) -> None:
"""Publishes a batch of events to an Azure Event Grid topic.

:param topic_hostname: The host name of the topic, e.g. topic1.westus2-1.eventgrid.azure.net.
:type topic_hostname: str
:param events: An array of events to be published to Event Grid.
:type events: list[~event_grid_publisher_client.models.CloudEvent]
:keyword callable cls: A custom type or function that will be passed the direct response
:return: None, or the result of cls(response)
:rtype: None
:raises: ~azure.core.exceptions.HttpResponseError
"""
cls = kwargs.pop('cls', None) # type: ClsType[None]
error_map = {404: ResourceNotFoundError, 409: ResourceExistsError}
error_map.update(kwargs.pop('error_map', {}))
api_version = "2018-01-01"
content_type = kwargs.pop("content_type", "application/cloudevents-batch+json; charset=utf-8")

# Construct URL
url = self.publish_cloud_event_events.metadata['url'] # type: ignore
path_format_arguments = {
'topicHostname': self._serialize.url("topic_hostname", topic_hostname, 'str', skip_quote=True),
}
url = self._client.format_url(url, **path_format_arguments)

# Construct parameters
query_parameters = {} # type: Dict[str, Any]
query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str')

# Construct headers
header_parameters = {} # type: Dict[str, Any]
header_parameters['Content-Type'] = self._serialize.header("content_type", content_type, 'str')

body_content_kwargs = {} # type: Dict[str, Any]
body_content = self._serialize.body(events, '[CloudEvent]')
body_content_kwargs['content'] = body_content
request = self._client.post(url, query_parameters, header_parameters, **body_content_kwargs)
pipeline_response = await self._client._pipeline.run(request, stream=False, **kwargs)
response = pipeline_response.http_response

if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)

if cls:
return cls(pipeline_response, None, {})

publish_cloud_event_events.metadata = {'url': '/api/events'} # type: ignore

async def publish_custom_event_events(
self,
topic_hostname: str,
events: List[object],
**kwargs
) -> None:
"""Publishes a batch of events to an Azure Event Grid topic.

:param topic_hostname: The host name of the topic, e.g. topic1.westus2-1.eventgrid.azure.net.
:type topic_hostname: str
:param events: An array of events to be published to Event Grid.
:type events: list[object]
:keyword callable cls: A custom type or function that will be passed the direct response
:return: None, or the result of cls(response)
:rtype: None
:raises: ~azure.core.exceptions.HttpResponseError
"""
cls = kwargs.pop('cls', None) # type: ClsType[None]
error_map = {404: ResourceNotFoundError, 409: ResourceExistsError}
error_map.update(kwargs.pop('error_map', {}))
api_version = "2018-01-01"
content_type = kwargs.pop("content_type", "application/json")

# Construct URL
url = self.publish_custom_event_events.metadata['url'] # type: ignore
path_format_arguments = {
'topicHostname': self._serialize.url("topic_hostname", topic_hostname, 'str', skip_quote=True),
}
url = self._client.format_url(url, **path_format_arguments)

# Construct parameters
query_parameters = {} # type: Dict[str, Any]
query_parameters['api-version'] = self._serialize.query("api_version", api_version, 'str')

# Construct headers
header_parameters = {} # type: Dict[str, Any]
header_parameters['Content-Type'] = self._serialize.header("content_type", content_type, 'str')

body_content_kwargs = {} # type: Dict[str, Any]
body_content = self._serialize.body(events, '[object]')
body_content_kwargs['content'] = body_content
request = self._client.post(url, query_parameters, header_parameters, **body_content_kwargs)
pipeline_response = await self._client._pipeline.run(request, stream=False, **kwargs)
response = pipeline_response.http_response

if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
raise HttpResponseError(response=response)

if cls:
return cls(pipeline_response, None, {})

publish_custom_event_events.metadata = {'url': '/api/events'} # type: ignore
Loading