Skip to content

Commit

Permalink
fixup! Proposal of fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
Tobiasz Kędzierski committed Oct 15, 2020
1 parent ad93f53 commit 146201b
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -258,10 +258,8 @@
start_date=dates.days_ago(1),
tags=['example'],
) as dag:
pass

# [START howto_operator_create_instance_memcached]
create_instance = CloudMemorystoreMemcachedCreateInstanceOperator(
create_memcached_instance = CloudMemorystoreMemcachedCreateInstanceOperator(
task_id="create-instance",
location="europe-north1",
instance_id=MEMORYSTORE_MEMCACHED_INSTANCE_NAME,
Expand All @@ -271,7 +269,7 @@
# [END howto_operator_create_instance_memcached]

# [START howto_operator_delete_instance_memcached]
delete_instance = CloudMemorystoreMemcachedDeleteInstanceOperator(
delete_memcached_instance = CloudMemorystoreMemcachedDeleteInstanceOperator(
task_id="delete-instance",
location="europe-north1",
instance=MEMORYSTORE_MEMCACHED_INSTANCE_NAME,
Expand All @@ -280,7 +278,7 @@
# [END howto_operator_delete_instance_memcached]

# [START howto_operator_get_instance_memcached]
get_instance = CloudMemorystoreMemcachedGetInstanceOperator(
get_memcached_instance = CloudMemorystoreMemcachedGetInstanceOperator(
task_id="get-instance",
location="europe-north1",
instance=MEMORYSTORE_MEMCACHED_INSTANCE_NAME,
Expand All @@ -289,48 +287,44 @@
# [END howto_operator_get_instance_memcached]

# [START howto_operator_list_instances_memcached]
list_instances = CloudMemorystoreMemcachedListInstancesOperator(
list_memcached_instances = CloudMemorystoreMemcachedListInstancesOperator(
task_id="list-instances", location="-", project_id=GCP_PROJECT_ID
)
# [END howto_operator_list_instances_memcached]

# # [START howto_operator_update_instance_memcached]
mask = cloud_memcache.field_mask.FieldMask(paths=["displayName", "New Name"])

update_instance = CloudMemorystoreMemcachedUpdateInstanceOperator(
update_memcached_instance = CloudMemorystoreMemcachedUpdateInstanceOperator(
task_id="update-instance",
location="europe-north1",
instance_id=MEMORYSTORE_MEMCACHED_INSTANCE_NAME,
project_id=GCP_PROJECT_ID,
update_mask=mask,
instance={"memory_size_gb": 2},
update_mask=cloud_memcache.field_mask.FieldMask(paths=["node_count"]),
instance={"node_count": 2},
)
# [END howto_operator_update_instance_memcached]

# [START howto_operator_update_and_apply_parameters_memcached]

update_parameters = CloudMemorystoreMemcachedUpdateParametersOperator(
update_memcached_parameters = CloudMemorystoreMemcachedUpdateParametersOperator(
task_id="update-parameters",
location="europe-north1",
instance_id=MEMORYSTORE_MEMCACHED_INSTANCE_NAME,
project_id=GCP_PROJECT_ID,
update_mask="protocol,hash_algorithm",
parameters={"protocol": "ascii", "hash_algorithm": "jenkins"},
update_mask={"paths": ["params"]},
parameters={"params": {"protocol": "ascii", "hash_algorithm": "jenkins"}},
)

apply_parameters = CloudMemorystoreMemcachedApplyParametersOperator(
apply_memcached_parameters = CloudMemorystoreMemcachedApplyParametersOperator(
task_id="apply-parameters",
location="europe-north1",
instance_id=MEMORYSTORE_MEMCACHED_INSTANCE_NAME,
project_id=GCP_PROJECT_ID,
node_ids=["node-1", "node-2"],
node_ids=["node-a-1"],
apply_all=False,
)

# update_parameters >> apply_parameters
# [END howto_operator_update_and_apply_parameters_memcached]

# create_instance >> update_instance >> [list_instances, get_instance] >> update_parameters
create_instance >> [list_instances, get_instance] >> update_parameters
update_parameters >> apply_parameters
apply_parameters >> delete_instance
create_memcached_instance >> [list_memcached_instances, get_memcached_instance]
create_memcached_instance >> update_memcached_instance >> update_memcached_parameters
update_memcached_parameters >> apply_memcached_parameters >> delete_memcached_instance
13 changes: 10 additions & 3 deletions airflow/providers/google/cloud/hooks/cloud_memorystore.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
Hooks for Cloud Memorystore service
"""
from typing import Dict, Optional, Sequence, Tuple, Union
import json

from google.api_core.exceptions import NotFound
from google.api_core import path_template
Expand All @@ -29,6 +30,7 @@
from google.cloud.redis_v1.gapic.enums import FailoverInstanceRequest
from google.cloud.redis_v1.types import FieldMask, InputConfig, Instance, OutputConfig
from google.protobuf.json_format import ParseDict
import proto

from airflow import version
from airflow.exceptions import AirflowException
Expand Down Expand Up @@ -576,6 +578,11 @@ def _append_label(instance: cloud_memcache.Instance, key: str, val: str) -> clou
instance.labels.update({key: val})
return instance

@staticmethod
def proto_message_to_dict(message: proto.Message) -> dict:
"""Helper method to parse protobuf message to dictionary."""
return json.loads(message.__class__.to_json(message))

@GoogleBaseHook.fallback_to_default_project_id
def apply_parameters(
self,
Expand Down Expand Up @@ -685,7 +692,7 @@ def create_instance(
self.log.info("Instance not exists.")

if isinstance(instance, dict):
instance = ParseDict(instance, cloud_memcache.Instance())
instance = cloud_memcache.Instance(instance)
elif not isinstance(instance, cloud_memcache.Instance):
raise AirflowException("instance is not instance of Instance type or python dict")

Expand Down Expand Up @@ -870,7 +877,7 @@ def update_instance(
metadata = metadata or ()

if isinstance(instance, dict):
instance = ParseDict(instance, cloud_memcache.Instance())
instance = cloud_memcache.Instance(instance)
elif not isinstance(instance, cloud_memcache.Instance):
raise AirflowException("instance is not instance of Instance type or python dict")

Expand Down Expand Up @@ -931,7 +938,7 @@ def update_parameters(
metadata = metadata or ()

if isinstance(parameters, dict):
parameters = ParseDict(parameters, cloud_memcache.MemcacheParameters())
parameters = cloud_memcache.MemcacheParameters(parameters)
elif not isinstance(parameters, cloud_memcache.MemcacheParameters):
raise AirflowException("instance is not instance of MemcacheParameters type or python dict")

Expand Down
6 changes: 3 additions & 3 deletions airflow/providers/google/cloud/operators/cloud_memorystore.py
Original file line number Diff line number Diff line change
Expand Up @@ -1290,7 +1290,7 @@ def execute(self, context: Dict):
timeout=self.timeout,
metadata=self.metadata,
)
return MessageToDict(result)
return hook.proto_message_to_dict(result)


class CloudMemorystoreMemcachedDeleteInstanceOperator(BaseOperator):
Expand Down Expand Up @@ -1438,7 +1438,7 @@ def execute(self, context: Dict):
timeout=self.timeout,
metadata=self.metadata,
)
return MessageToDict(result)
return hook.proto_message_to_dict(result)


class CloudMemorystoreMemcachedListInstancesOperator(BaseOperator):
Expand Down Expand Up @@ -1520,7 +1520,7 @@ def execute(self, context: Dict):
timeout=self.timeout,
metadata=self.metadata,
)
instances = [MessageToDict(a) for a in result]
instances = [hook.proto_message_to_dict(a) for a in result]
return instances


Expand Down
16 changes: 16 additions & 0 deletions tests/providers/google/cloud/hooks/test_cloud_memorystore.py
Original file line number Diff line number Diff line change
Expand Up @@ -599,3 +599,19 @@ def test_update_instance(self, mock_get_conn, mock_project_id):
timeout=TEST_TIMEOUT,
metadata=TEST_METADATA,
)

def test_proto_functions(self):
instance_dict = {
'name': 'test_name',
'node_count': 1,
'node_config': {'cpu_count': 1, 'memory_size_mb': 1024},
}
instance = cloud_memcache.Instance(instance_dict)
instance_dict_result = self.hook.proto_message_to_dict(instance)
self.assertEqual(instance_dict_result["name"], instance_dict["name"])
self.assertEqual(
instance_dict_result["nodeConfig"]["cpuCount"], instance_dict["node_config"]["cpu_count"]
)
self.assertEqual(
instance_dict_result["nodeConfig"]["memorySizeMb"], instance_dict["node_config"]["memory_size_mb"]
)

0 comments on commit 146201b

Please sign in to comment.