Skip to content

Commit

Permalink
fix for object-store-connector cron (#269)
Browse files Browse the repository at this point in the history
  • Loading branch information
ravismula authored Nov 6, 2024
1 parent 680fdcb commit 96ce680
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ spec:
- |
# Wait for the Spark pod to be ready
SPARK_POD=$(kubectl get pods -l app.kubernetes.io/name=spark,app.kubernetes.io/component=master -o jsonpath='{.items[0].metadata.name}')
kubectl exec -it $SPARK_POD -- bash -c "/opt/bitnami/spark/bin/spark-submit --master={{ .Values.spark.master.host }} --conf spark.pyspark.driver.python={{ .Values.python_path }} --conf spark.pyspark.python={{ .Values.python_path }} --jars /data/connectors/{{ .Values.connector_source }}/libs/\* /data/connectors/{{ .Values.connector_source }}/{{ .Values.main_file }} -f /data/conf/connectors-python-config.yaml -c {{ .Values.instance_id }}"
kubectl exec -it $SPARK_POD -- bash -c "/opt/bitnami/spark/bin/spark-submit --master=local[*] --conf spark.pyspark.driver.python={{ .Values.python_path }} --conf spark.pyspark.python={{ .Values.python_path }} --jars /data/connectors/{{ .Values.connector_source }}/libs/\* /data/connectors/{{ .Values.connector_source }}/{{ .Values.main_file }} -f /data/conf/connectors-python-config.yaml -c {{ .Values.instance_id }}"
{{- end }}
{{- with .Values.sidecars }}
{{- toYaml . | nindent 12 }}
Expand Down
3 changes: 2 additions & 1 deletion command-service/src/command/connector_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ def process_metadata(self, rel_path, connector_source) -> RegistryResponse:
def execute_query(self, query, params) -> bool:
try:
result = self.db_service.execute_upsert(sql=query, params=params)
print(f"Connector Registry | {result} rows affected")
return result > 0 # Assuming the result is the number of affected rows
except Exception as e:
print(
Expand Down Expand Up @@ -426,7 +427,7 @@ def update_connector_registry(self, _id, ver):
f"UPDATE connector_registry SET status = 'Retired', updated_date = now() WHERE connector_id = %s AND status = 'Live' AND version != %s", (_id, ver)
)
print(
f"Connector Registry | Updated {result} existing rows with connector_id: {_id} and version: {ver}"
f"Connector Registry | Retired {result} versions for connector_id: {_id} and version: {ver}"
)
except Exception as e:
print(
Expand Down
4 changes: 2 additions & 2 deletions command-service/src/service/db_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from config import Config

def reconnect(func: Callable):

def wrapper(db_connection, *args, **kwargs):
tdecorator = retry(wait=wait_exponential(), stop=stop_after_attempt(3))
decorated = tdecorator(func)
Expand Down Expand Up @@ -60,9 +60,9 @@ def execute_upsert(self, sql, params):
db_connection = self.connect()
cursor = db_connection.cursor(cursor_factory=psycopg2.extras.DictCursor)
cursor.execute(sql, params)
db_connection.commit()
record_count = cursor.rowcount
db_connection.close()
# print(f"{record_count} inserted/updated successfully")
return record_count

# @reconnect
Expand Down

0 comments on commit 96ce680

Please sign in to comment.