diff --git a/providers/oracle/src/airflow/providers/oracle/hooks/oracle.py b/providers/oracle/src/airflow/providers/oracle/hooks/oracle.py index e28fed8a58018..698572486ef2a 100644 --- a/providers/oracle/src/airflow/providers/oracle/hooks/oracle.py +++ b/providers/oracle/src/airflow/providers/oracle/hooks/oracle.py @@ -144,7 +144,7 @@ def get_conn(self) -> oracledb.Connection: """ - conn = self.get_connection(self.oracle_conn_id) # type: ignore[attr-defined] + conn = self.get_connection(self.get_conn_id()) conn_config: dict[str, Any] = {"user": conn.login, "password": conn.password} sid = conn.extra_dejson.get("sid") mod = conn.extra_dejson.get("module") @@ -231,18 +231,18 @@ def get_conn(self) -> oracledb.Connection: if expire_time: conn_config["expire_time"] = expire_time - conn = oracledb.connect(**conn_config) # type: ignore[assignment] + oracle_conn = oracledb.connect(**conn_config) if mod is not None: - conn.module = mod # type: ignore[attr-defined] + oracle_conn.module = mod # if Connection.schema is defined, set schema after connecting successfully # cannot be part of conn_config # https://python-oracledb.readthedocs.io/en/latest/api_manual/connection.html?highlight=schema#Connection.current_schema # Only set schema when not using conn.schema as Service Name if schema and service_name: - conn.current_schema = schema # type: ignore[attr-defined] + oracle_conn.current_schema = schema - return conn # type: ignore[return-value] + return oracle_conn def insert_rows( self, @@ -292,7 +292,7 @@ def insert_rows( conn = self.get_conn() if self.supports_autocommit: self.set_autocommit(conn, False) - cur = conn.cursor() # type: ignore[attr-defined] + cur = conn.cursor() i = 0 for row in rows: i += 1 @@ -312,11 +312,11 @@ def insert_rows( sql = f"INSERT /*+ APPEND */ INTO {table} {target_fields} VALUES ({','.join(values)})" cur.execute(sql) if i % commit_every == 0: - conn.commit() # type: ignore[attr-defined] + conn.commit() self.log.info("Loaded %s into %s rows so far", i, table) - conn.commit() # type: ignore[attr-defined] + conn.commit() cur.close() - conn.close() # type: ignore[attr-defined] + conn.close() self.log.info("Done loading. Loaded a total of %s rows", i) def bulk_insert_rows( @@ -349,7 +349,7 @@ def bulk_insert_rows( conn = self.get_conn() if self.supports_autocommit: self.set_autocommit(conn, False) - cursor = conn.cursor() # type: ignore[attr-defined] + cursor = conn.cursor() values_base = target_fields or rows[0] if bool(sequence_column) ^ bool(sequence_name): @@ -382,7 +382,7 @@ def bulk_insert_rows( if row_count % commit_every == 0: cursor.prepare(prepared_stm) cursor.executemany(None, row_chunk) - conn.commit() # type: ignore[attr-defined] + conn.commit() self.log.info("[%s] inserted %s rows", table, row_count) # Empty chunk row_chunk = [] @@ -390,10 +390,10 @@ def bulk_insert_rows( if row_chunk: cursor.prepare(prepared_stm) cursor.executemany(None, row_chunk) - conn.commit() # type: ignore[attr-defined] + conn.commit() self.log.info("[%s] inserted %s rows", table, row_count) cursor.close() - conn.close() # type: ignore[attr-defined] + conn.close() def callproc( self, @@ -452,7 +452,7 @@ def handler(cursor): def get_uri(self) -> str: """Get the URI for the Oracle connection.""" - conn = self.get_connection(self.oracle_conn_id) # type: ignore[attr-defined] + conn = self.get_connection(self.get_conn_id()) login = conn.login password = conn.password host = conn.host