Skip to content

Commit

Permalink
Merge pull request #1 from PolideaInternal/fix-to-issue-8286-cloud-me…
Browse files Browse the repository at this point in the history
…morystore-memcached-operators

Proposal of fixes to Google Cloud Memorystore Memcached Operators
  • Loading branch information
tanjinP authored Oct 15, 2020
2 parents aaca868 + 146201b commit e7d3e3e
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from urllib.parse import urlparse

from google.cloud.redis_v1.gapic.enums import FailoverInstanceRequest, Instance
from google.cloud.memcache_v1beta2.types import cloud_memcache

from airflow import models
from airflow.operators.bash import BashOperator
Expand Down Expand Up @@ -50,10 +51,16 @@

GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")

INSTANCE_NAME = os.environ.get("GCP_MEMORYSTORE_INSTANCE_NAME", "test-memorystore")
INSTANCE_NAME_2 = os.environ.get("GCP_MEMORYSTORE_INSTANCE_NAME2", "test-memorystore-2")
INSTANCE_NAME_3 = os.environ.get("GCP_MEMORYSTORE_INSTANCE_NAME3", "test-memorystore-3")
INSTANCE_NAME_4 = os.environ.get("GCP_MEMORYSTORE_INSTANCE_NAME4", "test-memorystore-4")
MEMORYSTORE_REDIS_INSTANCE_NAME = os.environ.get("GCP_MEMORYSTORE_INSTANCE_NAME", "test-memorystoreredis-")
MEMORYSTORE_REDIS_INSTANCE_NAME_2 = os.environ.get(
"GCP_MEMORYSTORE_INSTANCE_NAME2", "test-memorystore-redis-2"
)
MEMORYSTORE_REDIS_INSTANCE_NAME_3 = os.environ.get(
"GCP_MEMORYSTORE_INSTANCE_NAME3", "test-memorystore-redis-3"
)
MEMORYSTORE_MEMCACHED_INSTANCE_NAME = os.environ.get(
"GCP_MEMORYSTORE_INSTANCE_NAME4", "test-memorystore-memcached-1"
)

EXPORT_GCS_URL = os.environ.get("GCP_MEMORYSTORE_EXPORT_GCS_URL", "gs://test-memorystore/my-export.rdb")
EXPORT_GCS_URL_PARTS = urlparse(EXPORT_GCS_URL)
Expand All @@ -71,7 +78,7 @@


with models.DAG(
"gcp_cloud_memorystore",
"gcp_cloud_memorystore_redis",
schedule_interval=None, # Override to match your needs
start_date=dates.days_ago(1),
tags=['example'],
Expand All @@ -80,7 +87,7 @@
create_instance = CloudMemorystoreCreateInstanceOperator(
task_id="create-instance",
location="europe-north1",
instance_id=INSTANCE_NAME,
instance_id=MEMORYSTORE_REDIS_INSTANCE_NAME,
instance=FIRST_INSTANCE,
project_id=GCP_PROJECT_ID,
)
Expand All @@ -96,7 +103,7 @@
create_instance_2 = CloudMemorystoreCreateInstanceOperator(
task_id="create-instance-2",
location="europe-north1",
instance_id=INSTANCE_NAME_2,
instance_id=MEMORYSTORE_REDIS_INSTANCE_NAME_2,
instance=SECOND_INSTANCE,
project_id=GCP_PROJECT_ID,
)
Expand All @@ -105,7 +112,7 @@
get_instance = CloudMemorystoreGetInstanceOperator(
task_id="get-instance",
location="europe-north1",
instance=INSTANCE_NAME,
instance=MEMORYSTORE_REDIS_INSTANCE_NAME,
project_id=GCP_PROJECT_ID,
do_xcom_push=True,
)
Expand All @@ -121,7 +128,7 @@
failover_instance = CloudMemorystoreFailoverInstanceOperator(
task_id="failover-instance",
location="europe-north1",
instance=INSTANCE_NAME_2,
instance=MEMORYSTORE_REDIS_INSTANCE_NAME_2,
data_protection_mode=FailoverInstanceRequest.DataProtectionMode.LIMITED_DATA_LOSS,
project_id=GCP_PROJECT_ID,
)
Expand All @@ -143,7 +150,7 @@
update_instance = CloudMemorystoreUpdateInstanceOperator(
task_id="update-instance",
location="europe-north1",
instance_id=INSTANCE_NAME,
instance_id=MEMORYSTORE_REDIS_INSTANCE_NAME,
project_id=GCP_PROJECT_ID,
update_mask={"paths": ["memory_size_gb"]},
instance={"memory_size_gb": 2},
Expand All @@ -164,7 +171,7 @@
export_instance = CloudMemorystoreExportInstanceOperator(
task_id="export-instance",
location="europe-north1",
instance=INSTANCE_NAME,
instance=MEMORYSTORE_REDIS_INSTANCE_NAME,
output_config={"gcs_destination": {"uri": EXPORT_GCS_URL}},
project_id=GCP_PROJECT_ID,
)
Expand All @@ -174,30 +181,33 @@
import_instance = CloudMemorystoreImportOperator(
task_id="import-instance",
location="europe-north1",
instance=INSTANCE_NAME_2,
instance=MEMORYSTORE_REDIS_INSTANCE_NAME_2,
input_config={"gcs_source": {"uri": EXPORT_GCS_URL}},
project_id=GCP_PROJECT_ID,
)
# [END howto_operator_import_instance]

# [START howto_operator_delete_instance]
delete_instance = CloudMemorystoreDeleteInstanceOperator(
task_id="delete-instance", location="europe-north1", instance=INSTANCE_NAME, project_id=GCP_PROJECT_ID
task_id="delete-instance",
location="europe-north1",
instance=MEMORYSTORE_REDIS_INSTANCE_NAME,
project_id=GCP_PROJECT_ID,
)
# [END howto_operator_delete_instance]

delete_instance_2 = CloudMemorystoreDeleteInstanceOperator(
task_id="delete-instance-2",
location="europe-north1",
instance=INSTANCE_NAME_2,
instance=MEMORYSTORE_REDIS_INSTANCE_NAME_2,
project_id=GCP_PROJECT_ID,
)

# [END howto_operator_create_instance_and_import]
create_instance_and_import = CloudMemorystoreCreateInstanceAndImportOperator(
task_id="create-instance-and-import",
location="europe-north1",
instance_id=INSTANCE_NAME_3,
instance_id=MEMORYSTORE_REDIS_INSTANCE_NAME_3,
instance=FIRST_INSTANCE,
input_config={"gcs_source": {"uri": EXPORT_GCS_URL}},
project_id=GCP_PROJECT_ID,
Expand All @@ -208,7 +218,7 @@
scale_instance = CloudMemorystoreScaleInstanceOperator(
task_id="scale-instance",
location="europe-north1",
instance_id=INSTANCE_NAME_3,
instance_id=MEMORYSTORE_REDIS_INSTANCE_NAME_3,
project_id=GCP_PROJECT_ID,
memory_size_gb=3,
)
Expand All @@ -218,7 +228,7 @@
export_and_delete_instance = CloudMemorystoreExportAndDeleteInstanceOperator(
task_id="export-and-delete-instance",
location="europe-north1",
instance=INSTANCE_NAME_3,
instance=MEMORYSTORE_REDIS_INSTANCE_NAME_3,
output_config={"gcs_destination": {"uri": EXPORT_GCS_URL}},
project_id=GCP_PROJECT_ID,
)
Expand All @@ -242,73 +252,79 @@

export_instance >> create_instance_and_import >> scale_instance >> export_and_delete_instance

with models.DAG(
"gcp_cloud_memorystore_memcached",
schedule_interval=None, # Override to match your needs
start_date=dates.days_ago(1),
tags=['example'],
) as dag:
# [START howto_operator_create_instance_memcached]
create_instance_3 = CloudMemorystoreMemcachedCreateInstanceOperator(
task_id="create-instance-3",
create_memcached_instance = CloudMemorystoreMemcachedCreateInstanceOperator(
task_id="create-instance",
location="europe-north1",
instance_id=INSTANCE_NAME_4,
instance_id=MEMORYSTORE_MEMCACHED_INSTANCE_NAME,
instance=MEMCACHED_INSTANCE,
project_id=GCP_PROJECT_ID,
)
# [END howto_operator_create_instance_memcached]

# [START howto_operator_delete_instance_memcached]
delete_instance_3 = CloudMemorystoreMemcachedDeleteInstanceOperator(
task_id="delete-instance-3",
delete_memcached_instance = CloudMemorystoreMemcachedDeleteInstanceOperator(
task_id="delete-instance",
location="europe-north1",
instance=INSTANCE_NAME_4,
instance=MEMORYSTORE_MEMCACHED_INSTANCE_NAME,
project_id=GCP_PROJECT_ID,
)
# [END howto_operator_delete_instance_memcached]

# [START howto_operator_get_instance_memcached]
get_instance_2 = CloudMemorystoreMemcachedGetInstanceOperator(
task_id="get-instance-2",
get_memcached_instance = CloudMemorystoreMemcachedGetInstanceOperator(
task_id="get-instance",
location="europe-north1",
instance=INSTANCE_NAME_4,
instance=MEMORYSTORE_MEMCACHED_INSTANCE_NAME,
project_id=GCP_PROJECT_ID,
)
# [END howto_operator_get_instance_memcached]

# [START howto_operator_list_instances_memcached]
list_instances_2 = CloudMemorystoreMemcachedListInstancesOperator(
task_id="list-instances-2", location="-", project_id=GCP_PROJECT_ID
list_memcached_instances = CloudMemorystoreMemcachedListInstancesOperator(
task_id="list-instances", location="-", project_id=GCP_PROJECT_ID
)
# [END howto_operator_list_instances_memcached]

# [START howto_operator_update_instance_memcached]
update_instance_2 = CloudMemorystoreMemcachedUpdateInstanceOperator(
task_id="update-instance-2",
# # [START howto_operator_update_instance_memcached]
update_memcached_instance = CloudMemorystoreMemcachedUpdateInstanceOperator(
task_id="update-instance",
location="europe-north1",
instance_id=INSTANCE_NAME_4,
instance_id=MEMORYSTORE_MEMCACHED_INSTANCE_NAME,
project_id=GCP_PROJECT_ID,
update_mask={"displayName": "New Name"},
instance={"memory_size_gb": 2},
update_mask=cloud_memcache.field_mask.FieldMask(paths=["node_count"]),
instance={"node_count": 2},
)
# [END howto_operator_update_instance_memcached]

# [START howto_operator_update_and_apply_parameters_memcached]
update_parameters = CloudMemorystoreMemcachedUpdateParametersOperator(
update_memcached_parameters = CloudMemorystoreMemcachedUpdateParametersOperator(
task_id="update-parameters",
location="europe-north1",
instance_id=INSTANCE_NAME_4,
instance_id=MEMORYSTORE_MEMCACHED_INSTANCE_NAME,
project_id=GCP_PROJECT_ID,
update_mask="protocol,hash_algorithm",
parameters={"protocol": "ascii", "hash_algorithm": "jenkins"},
update_mask={"paths": ["params"]},
parameters={"params": {"protocol": "ascii", "hash_algorithm": "jenkins"}},
)

apply_parameters = CloudMemorystoreMemcachedApplyParametersOperator(
apply_memcached_parameters = CloudMemorystoreMemcachedApplyParametersOperator(
task_id="apply-parameters",
location="europe-north1",
instance_id=INSTANCE_NAME_4,
instance_id=MEMORYSTORE_MEMCACHED_INSTANCE_NAME,
project_id=GCP_PROJECT_ID,
node_ids=["node-1", "node-2"],
node_ids=["node-a-1"],
apply_all=False,
)

update_parameters >> apply_parameters
# update_parameters >> apply_parameters
# [END howto_operator_update_and_apply_parameters_memcached]

create_instance_3 >> get_instance_2 >> update_instance_2 >> delete_instance_3
get_instance_2 >> update_parameters
apply_parameters >> delete_instance_3
create_memcached_instance >> [list_memcached_instances, get_memcached_instance]
create_memcached_instance >> update_memcached_instance >> update_memcached_parameters
update_memcached_parameters >> apply_memcached_parameters >> delete_memcached_instance
Loading

0 comments on commit e7d3e3e

Please sign in to comment.