From 146201bde4d1044d4a7bb570d95ba6f447cd09c3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tobiasz=20K=C4=99dzierski?= Date: Wed, 14 Oct 2020 14:25:32 +0200 Subject: [PATCH] fixup! Proposal of fixes --- .../example_dags/example_cloud_memorystore.py | 36 ++++++++----------- .../google/cloud/hooks/cloud_memorystore.py | 13 +++++-- .../cloud/operators/cloud_memorystore.py | 6 ++-- .../cloud/hooks/test_cloud_memorystore.py | 16 +++++++++ 4 files changed, 44 insertions(+), 27 deletions(-) diff --git a/airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py b/airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py index c985b4fe20d3c2..fb77bb6b53a07c 100644 --- a/airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py +++ b/airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py @@ -258,10 +258,8 @@ start_date=dates.days_ago(1), tags=['example'], ) as dag: - pass - # [START howto_operator_create_instance_memcached] - create_instance = CloudMemorystoreMemcachedCreateInstanceOperator( + create_memcached_instance = CloudMemorystoreMemcachedCreateInstanceOperator( task_id="create-instance", location="europe-north1", instance_id=MEMORYSTORE_MEMCACHED_INSTANCE_NAME, @@ -271,7 +269,7 @@ # [END howto_operator_create_instance_memcached] # [START howto_operator_delete_instance_memcached] - delete_instance = CloudMemorystoreMemcachedDeleteInstanceOperator( + delete_memcached_instance = CloudMemorystoreMemcachedDeleteInstanceOperator( task_id="delete-instance", location="europe-north1", instance=MEMORYSTORE_MEMCACHED_INSTANCE_NAME, @@ -280,7 +278,7 @@ # [END howto_operator_delete_instance_memcached] # [START howto_operator_get_instance_memcached] - get_instance = CloudMemorystoreMemcachedGetInstanceOperator( + get_memcached_instance = CloudMemorystoreMemcachedGetInstanceOperator( task_id="get-instance", location="europe-north1", instance=MEMORYSTORE_MEMCACHED_INSTANCE_NAME, @@ -289,48 +287,44 @@ # [END howto_operator_get_instance_memcached] # [START howto_operator_list_instances_memcached] - list_instances = CloudMemorystoreMemcachedListInstancesOperator( + list_memcached_instances = CloudMemorystoreMemcachedListInstancesOperator( task_id="list-instances", location="-", project_id=GCP_PROJECT_ID ) # [END howto_operator_list_instances_memcached] # # [START howto_operator_update_instance_memcached] - mask = cloud_memcache.field_mask.FieldMask(paths=["displayName", "New Name"]) - - update_instance = CloudMemorystoreMemcachedUpdateInstanceOperator( + update_memcached_instance = CloudMemorystoreMemcachedUpdateInstanceOperator( task_id="update-instance", location="europe-north1", instance_id=MEMORYSTORE_MEMCACHED_INSTANCE_NAME, project_id=GCP_PROJECT_ID, - update_mask=mask, - instance={"memory_size_gb": 2}, + update_mask=cloud_memcache.field_mask.FieldMask(paths=["node_count"]), + instance={"node_count": 2}, ) # [END howto_operator_update_instance_memcached] # [START howto_operator_update_and_apply_parameters_memcached] - - update_parameters = CloudMemorystoreMemcachedUpdateParametersOperator( + update_memcached_parameters = CloudMemorystoreMemcachedUpdateParametersOperator( task_id="update-parameters", location="europe-north1", instance_id=MEMORYSTORE_MEMCACHED_INSTANCE_NAME, project_id=GCP_PROJECT_ID, - update_mask="protocol,hash_algorithm", - parameters={"protocol": "ascii", "hash_algorithm": "jenkins"}, + update_mask={"paths": ["params"]}, + parameters={"params": {"protocol": "ascii", "hash_algorithm": "jenkins"}}, ) - apply_parameters = CloudMemorystoreMemcachedApplyParametersOperator( + apply_memcached_parameters = CloudMemorystoreMemcachedApplyParametersOperator( task_id="apply-parameters", location="europe-north1", instance_id=MEMORYSTORE_MEMCACHED_INSTANCE_NAME, project_id=GCP_PROJECT_ID, - node_ids=["node-1", "node-2"], + node_ids=["node-a-1"], apply_all=False, ) # update_parameters >> apply_parameters # [END howto_operator_update_and_apply_parameters_memcached] - # create_instance >> update_instance >> [list_instances, get_instance] >> update_parameters - create_instance >> [list_instances, get_instance] >> update_parameters - update_parameters >> apply_parameters - apply_parameters >> delete_instance + create_memcached_instance >> [list_memcached_instances, get_memcached_instance] + create_memcached_instance >> update_memcached_instance >> update_memcached_parameters + update_memcached_parameters >> apply_memcached_parameters >> delete_memcached_instance diff --git a/airflow/providers/google/cloud/hooks/cloud_memorystore.py b/airflow/providers/google/cloud/hooks/cloud_memorystore.py index a8875a5e582f7e..c527c73da60741 100644 --- a/airflow/providers/google/cloud/hooks/cloud_memorystore.py +++ b/airflow/providers/google/cloud/hooks/cloud_memorystore.py @@ -19,6 +19,7 @@ Hooks for Cloud Memorystore service """ from typing import Dict, Optional, Sequence, Tuple, Union +import json from google.api_core.exceptions import NotFound from google.api_core import path_template @@ -29,6 +30,7 @@ from google.cloud.redis_v1.gapic.enums import FailoverInstanceRequest from google.cloud.redis_v1.types import FieldMask, InputConfig, Instance, OutputConfig from google.protobuf.json_format import ParseDict +import proto from airflow import version from airflow.exceptions import AirflowException @@ -576,6 +578,11 @@ def _append_label(instance: cloud_memcache.Instance, key: str, val: str) -> clou instance.labels.update({key: val}) return instance + @staticmethod + def proto_message_to_dict(message: proto.Message) -> dict: + """Helper method to parse protobuf message to dictionary.""" + return json.loads(message.__class__.to_json(message)) + @GoogleBaseHook.fallback_to_default_project_id def apply_parameters( self, @@ -685,7 +692,7 @@ def create_instance( self.log.info("Instance not exists.") if isinstance(instance, dict): - instance = ParseDict(instance, cloud_memcache.Instance()) + instance = cloud_memcache.Instance(instance) elif not isinstance(instance, cloud_memcache.Instance): raise AirflowException("instance is not instance of Instance type or python dict") @@ -870,7 +877,7 @@ def update_instance( metadata = metadata or () if isinstance(instance, dict): - instance = ParseDict(instance, cloud_memcache.Instance()) + instance = cloud_memcache.Instance(instance) elif not isinstance(instance, cloud_memcache.Instance): raise AirflowException("instance is not instance of Instance type or python dict") @@ -931,7 +938,7 @@ def update_parameters( metadata = metadata or () if isinstance(parameters, dict): - parameters = ParseDict(parameters, cloud_memcache.MemcacheParameters()) + parameters = cloud_memcache.MemcacheParameters(parameters) elif not isinstance(parameters, cloud_memcache.MemcacheParameters): raise AirflowException("instance is not instance of MemcacheParameters type or python dict") diff --git a/airflow/providers/google/cloud/operators/cloud_memorystore.py b/airflow/providers/google/cloud/operators/cloud_memorystore.py index 6c37ba82af7879..21741def13c8f1 100644 --- a/airflow/providers/google/cloud/operators/cloud_memorystore.py +++ b/airflow/providers/google/cloud/operators/cloud_memorystore.py @@ -1290,7 +1290,7 @@ def execute(self, context: Dict): timeout=self.timeout, metadata=self.metadata, ) - return MessageToDict(result) + return hook.proto_message_to_dict(result) class CloudMemorystoreMemcachedDeleteInstanceOperator(BaseOperator): @@ -1438,7 +1438,7 @@ def execute(self, context: Dict): timeout=self.timeout, metadata=self.metadata, ) - return MessageToDict(result) + return hook.proto_message_to_dict(result) class CloudMemorystoreMemcachedListInstancesOperator(BaseOperator): @@ -1520,7 +1520,7 @@ def execute(self, context: Dict): timeout=self.timeout, metadata=self.metadata, ) - instances = [MessageToDict(a) for a in result] + instances = [hook.proto_message_to_dict(a) for a in result] return instances diff --git a/tests/providers/google/cloud/hooks/test_cloud_memorystore.py b/tests/providers/google/cloud/hooks/test_cloud_memorystore.py index 8352f51a17a3df..8f92e790738d2c 100644 --- a/tests/providers/google/cloud/hooks/test_cloud_memorystore.py +++ b/tests/providers/google/cloud/hooks/test_cloud_memorystore.py @@ -599,3 +599,19 @@ def test_update_instance(self, mock_get_conn, mock_project_id): timeout=TEST_TIMEOUT, metadata=TEST_METADATA, ) + + def test_proto_functions(self): + instance_dict = { + 'name': 'test_name', + 'node_count': 1, + 'node_config': {'cpu_count': 1, 'memory_size_mb': 1024}, + } + instance = cloud_memcache.Instance(instance_dict) + instance_dict_result = self.hook.proto_message_to_dict(instance) + self.assertEqual(instance_dict_result["name"], instance_dict["name"]) + self.assertEqual( + instance_dict_result["nodeConfig"]["cpuCount"], instance_dict["node_config"]["cpu_count"] + ) + self.assertEqual( + instance_dict_result["nodeConfig"]["memorySizeMb"], instance_dict["node_config"]["memory_size_mb"] + )