Skip to content

Commit

Permalink
Fix the FlinkRunner for Dataproc
Browse files Browse the repository at this point in the history
  • Loading branch information
liferoad committed Oct 10, 2024
1 parent 9ceb14e commit 38ecfa9
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ def create_cluster(self, cluster: dict) -> None:
def create_flink_cluster(self) -> None:
"""Calls _create_cluster with a configuration that enables FlinkRunner."""
init_action_path = self.stage_init_action()
# https://cloud.google.com/php/docs/reference/cloud-dataproc/latest/V1.Cluster
cluster = {
'project_id': self.cluster_metadata.project_id,
'cluster_name': self.cluster_metadata.cluster_name,
Expand All @@ -194,7 +195,8 @@ def create_flink_cluster(self) -> None:
},
'service_account_scopes': [
'https://www.googleapis.com/auth/cloud-platform'
]
],
'internal_ip_only': False
},
'master_config': {
# There must be 1 and only 1 instance of master.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,10 +408,11 @@ class Clusters:
To configure a pipeline to run on a local FlinkRunner, explicitly set the
default cluster metadata to None: ib.clusters.set_default_cluster(None).
"""
# Explicitly set the Flink version here to ensure compatibility with 2.1
# Explicitly set the Flink version here to ensure compatibility with 2.2
# Dataproc images:
# https://cloud.google.com/dataproc/docs/concepts/versioning/dataproc-release-2.1
DATAPROC_FLINK_VERSION = '1.15'
# https://cloud.google.com/dataproc/docs/concepts/versioning/dataproc-release-2.2
# you can manually override this by importing Clusters
DATAPROC_FLINK_VERSION = '1.17'

# The minimum worker number to create a Dataproc cluster.
DATAPROC_MINIMUM_WORKER_NUM = 2
Expand Down

0 comments on commit 38ecfa9

Please sign in to comment.