Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New setting for pg_notify listener DB settings, add keepalive #14755

Merged
merged 1 commit into from
Jan 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 17 additions & 6 deletions awx/main/dispatch/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,22 @@ def close(self):
self.conn.close()


def create_listener_connection():
conf = settings.DATABASES['default'].copy()
conf['OPTIONS'] = conf.get('OPTIONS', {}).copy()
# Modify the application name to distinguish from other connections the process might use
conf['OPTIONS']['application_name'] = get_application_name(settings.CLUSTER_HOST_ID, function='listener')

# Apply overrides specifically for the listener connection
for k, v in settings.LISTENER_DATABASES.get('default', {}).items():
conf[k] = v
for k, v in settings.LISTENER_DATABASES.get('default', {}).get('OPTIONS', {}).items():
conf['OPTIONS'][k] = v

connection_data = f"dbname={conf['NAME']} host={conf['HOST']} user={conf['USER']} password={conf['PASSWORD']} port={conf['PORT']}"
return psycopg.connect(connection_data, autocommit=True, **conf['OPTIONS'])


@contextmanager
def pg_bus_conn(new_connection=False, select_timeout=None):
'''
Expand All @@ -106,12 +122,7 @@ def pg_bus_conn(new_connection=False, select_timeout=None):
'''

if new_connection:
conf = settings.DATABASES['default'].copy()
conf['OPTIONS'] = conf.get('OPTIONS', {}).copy()
# Modify the application name to distinguish from other connections the process might use
conf['OPTIONS']['application_name'] = get_application_name(settings.CLUSTER_HOST_ID, function='listener')
connection_data = f"dbname={conf['NAME']} host={conf['HOST']} user={conf['USER']} password={conf['PASSWORD']} port={conf['PORT']}"
conn = psycopg.connect(connection_data, autocommit=True, **conf['OPTIONS'])
conn = create_listener_connection()
else:
if pg_connection.connection is None:
pg_connection.connect()
Expand Down
5 changes: 4 additions & 1 deletion awx/main/dispatch/worker/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,10 @@ def run_periodic_tasks(self):
# bypasses pg_notify for scheduled tasks
self.dispatch_task(body)

self.pg_is_down = False
if self.pg_is_down:
logger.info('Dispatcher listener connection established')
self.pg_is_down = False

self.listen_start = time.time()

return self.scheduler.time_until_next_run()
Expand Down
12 changes: 12 additions & 0 deletions awx/settings/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,18 @@
}
}

# Special database overrides for dispatcher connections listening to pg_notify
LISTENER_DATABASES = {
'default': {
'OPTIONS': {
'keepalives': 1,
'keepalives_idle': 5,
'keepalives_interval': 5,
'keepalives_count': 5,
},
}
}

# Whether or not the deployment is a K8S-based deployment
# In K8S-based deployments, instances have zero capacity - all playbook
# automation is intended to flow through defined Container Groups that
Expand Down
Loading