Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

from __future__ import annotations

import os
from datetime import datetime

from airflow.decorators import task
Expand All @@ -30,7 +31,6 @@
from airflow.models.dag import DAG
from airflow.providers.amazon.aws.hooks.dynamodb import DynamoDBHook
from airflow.providers.amazon.aws.transfers.hive_to_dynamodb import HiveToDynamoDBOperator
from airflow.utils import db
from airflow.utils.trigger_rule import TriggerRule

from system.amazon.aws.utils import SystemTestContextBuilder
Expand Down Expand Up @@ -102,16 +102,21 @@ def delete_dynamodb_table(table_name):
# is hosted on EMR. You must set the host name of the connection
# to match your EMR cluster's hostname.
@task
def configure_hive_connection(connection_id, hostname):
db.merge_conn(
Connection(
conn_id=connection_id,
conn_type="hiveserver2",
host=hostname,
port=10000,
)
def configure_hive_connection(connection_id: str, hostname: str):
"""
Setup Hive connection using environment variables instead of database operations.
This approach is cleaner and compatible with Airflow 3.
"""
c = Connection(
conn_id=connection_id,
conn_type="hiveserver2",
host=hostname,
port=10000,
)

envvar = f"AIRFLOW_CONN_{c.conn_id.upper()}"
os.environ[envvar] = c.get_uri()


with DAG(
dag_id=DAG_ID,
Expand Down