Skip to content

Commit

Permalink
Initial working version
Browse files Browse the repository at this point in the history
  • Loading branch information
dhinesh03 committed Oct 9, 2024
1 parent 745221c commit 98d9140
Show file tree
Hide file tree
Showing 8 changed files with 435 additions and 1 deletion.
32 changes: 32 additions & 0 deletions .github/workflows/publish.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
name: Publish Python 🐍 distributions 📦 to PyPI

on:
release:
types: [created]

jobs:
build-n-publish:
name: Build and publish Python 🐍 distributions 📦 to PyPI
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4

- name: Set up Python
uses: actions/setup-python@v3
with:
python-version: 3.11

- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install setuptools wheel
- name: Build the package
run: |
python setup.py sdist bdist_wheel
- name: Publish distribution 📦 to PyPI
uses: pypa/gh-action-pypi-publish@release/v1
with:
user: __token__
password: ${{ secrets.PYPI_API_TOKEN }}
164 changes: 163 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1 +1,163 @@
# python-appsync-ws-client

# AWS AppSync WebSocket Client

A Python client for subscribing to AWS AppSync GraphQL APIs via WebSockets. This package allows you to connect to the AWS AppSync WebSocket API, handle GraphQL subscriptions, and manage reconnections and retries seamlessly.

## Features

- **GraphQL Subscriptions**: Easily subscribe to GraphQL queries over WebSockets.
- **Automatic Reconnection**: Handles reconnection attempts in case of dropped WebSocket connections.
- **Thread-safe**: Manages multiple subscriptions with thread-safe operations.
- **Callback Handling**: Provides a way to specify callback functions that process subscription data.

## Installation

Install the package via pip:

```bash
pip install appsync-ws-client
```

## Usage

### 1. Initialize the Client

To use the client, provide the WebSocket URL and an authentication function that returns the necessary headers.

```python
from appsync_ws_client.client import GraphQLWebSocketClient

def get_auth_headers():
return {
"host": "xxx.appsync-api.<region>.amazonaws.com",
"Authorization": "<ACCESS_TOKEN>",
}

url = "wss://<your-appsync-endpoint>"
client = GraphQLWebSocketClient(url, auth_function=get_auth_headers)
client.connect()
```

### 2. Subscribing to a GraphQL Query

You can subscribe to a GraphQL query using the `subscribe` method. The subscription requires a GraphQL query, variables (if any), and a callback function to handle the subscription data.

```python
query = '''
subscription OnPriceUpdate {
onPriceUpdate {
id
price
timestamp
}
}
'''

def handle_subscription_data(data):
print("Received subscription data:", data)

subscription_id = client.subscribe(query, variables={}, callback=handle_subscription_data)
```

### 3. Unsubscribing

To unsubscribe from a subscription, use the `unsubscribe` method with the `subscription_id` that was returned when you subscribed.

```python
client.unsubscribe(subscription_id)
```

### 4. Closing the Connection

Ensure you close the WebSocket connection properly when done:

```python
client.close()
```

### 5. Handling Reconnection

The client automatically attempts to reconnect when a WebSocket connection drops. You can control the number of retry attempts by passing `max_retries` to the client. For example:

```python
client = GraphQLWebSocketClient(url, auth_function=get_auth_headers, max_retries=10)
client.connect()
```

## Error Handling

The package will raise the following errors:

- **`TimeoutError`**: Raised when the connection acknowledgment times out.
- **`MaxRetriesExceeded`**: Raised when the maximum number of reconnection attempts is exceeded.

You can also handle WebSocket errors using the client’s internal logging.

## Logging

Logging is built in to help monitor the WebSocket connection and subscription process. Make sure to configure logging in your application as necessary:

```python
import logging

logging.basicConfig(level=logging.INFO)
```

## Example

Here is a full example of setting up the client and subscribing to a GraphQL subscription:

```python
import time
import logging
from appsync_ws_client.client import GraphQLWebSocketClient

logging.basicConfig(level=logging.INFO)

def get_auth_headers():
return {
"host": "xxx.appsync-api.<region>.amazonaws.com",
"Authorization": "<ACCESS_TOKEN>",
}

url = "wss://<your-appsync-endpoint>"
client = GraphQLWebSocketClient(url, auth_function=get_auth_headers)
client.connect()

query = '''
subscription OnPriceUpdate {
onPriceUpdate {
id
price
timestamp
}
}
'''

def handle_subscription_data(data):
print("Received subscription data:", data)

subscription_id = client.subscribe(query, variables={}, callback=handle_subscription_data)

try:
while True:
time.sleep(1) # Keeps the main program alive
except KeyboardInterrupt:
print("Closing WebSocket and shutting down...")
client.close()


# Later, if you want to unsubscribe
client.unsubscribe(subscription_id)

# Always remember to close the connection when done
client.close()
```

## License

This package is licensed under the MIT License. See the [LICENSE](LICENSE) file for more details.

## Contributing

Feel free to open an issue or submit a pull request if you want to contribute!
1 change: 1 addition & 0 deletions appsync_ws_client/ __init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .client import GraphQLWebSocketClient # noqa
193 changes: 193 additions & 0 deletions appsync_ws_client/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
import time
import websocket
import json
import base64
import threading
import uuid
import logging
from typing import Callable, Dict, Any
from .exceptions import MaxRetriesExceeded

logger = logging.getLogger(__name__)


class GraphQLWebSocketClient:
def __init__(self, url: str, auth_function: Callable[[], Dict[str, str]], max_retries: int = 5):
self.url = url
self.auth_function = auth_function
self.ws = None
self.subscriptions = {}
self._is_open = False
self._acknowledged_event = threading.Event()
self.max_retries = max_retries
self.retry_count = 0
self.lock = threading.Lock()

def _build_ws_url(self) -> str:
"""
Build the WebSocket URL with authentication headers.
"""
auth_info = self.auth_function()
headers_encoded = base64.b64encode(json.dumps(auth_info).encode()).decode()
all_url = f"{self.url}?header={headers_encoded}&payload={base64.b64encode(json.dumps({}).encode()).decode()}"
logger.debug(f"WebSocket URL: {all_url}")
return all_url

def connect(self):
"""
Establish the WebSocket connection.
"""
try:
ws_url = self._build_ws_url()
self.ws = websocket.WebSocketApp(
ws_url,
subprotocols=["graphql-ws"],
on_open=self._on_open,
on_message=self._on_message,
on_error=self._on_error,
on_close=self._on_close,
)
thread = threading.Thread(target=self.ws.run_forever)
thread.daemon = True
thread.start()
except Exception as e:
logger.error(f"Failed to connect: {e}")
self._attempt_reconnect()

def _on_open(self, ws):
"""
WebSocket open event handler. Sends connection initialization message.
"""
self._is_open = True
self._acknowledged_event.clear()
self._send_message({"type": "connection_init"})
logger.info("WebSocket connection opened.")
self.retry_count = 0

def _on_message(self, ws, message):
"""
WebSocket message event handler. Process incoming messages.
"""
msg = json.loads(message)
message_type = msg.get('type')

if message_type == "connection_ack":
self._acknowledged_event.set()
logger.info("Connection acknowledged.")
elif message_type == "ka":
logger.debug("Keep-alive received.")
elif message_type == "data":
subscription_id = msg.get('id')
if subscription_id and subscription_id in self.subscriptions:
callback = self.subscriptions[subscription_id]['callback']
callback(msg['payload'])
elif message_type == "error":
logger.error(f"Error received: {msg.get('payload')}")

def _on_error(self, ws, error):
"""
WebSocket error event handler.
"""
logger.error(f"WebSocket error occurred: {error}")
if isinstance(error, Exception):
logger.exception("Exception details:", exc_info=error)
self._attempt_reconnect()

def _on_close(self, ws, close_status_code, close_msg):
"""
WebSocket close event handler.
"""
self._is_open = False
self._acknowledged_event.clear()
logger.warning(f"WebSocket closed: {close_status_code}, message: {close_msg}")
self._attempt_reconnect()

def _attempt_reconnect(self):
"""
Attempt to reconnect to the WebSocket.
"""
if self.retry_count < self.max_retries:
self.retry_count += 1
wait_time = 2**self.retry_count + (0.5 * self.retry_count)
logger.info(f"Reconnecting in {wait_time} seconds...")
time.sleep(wait_time)
self.connect()
else:
raise MaxRetriesExceeded("Max retries reached.")

def subscribe(
self, query: str, variables: Dict[str, Any], callback: Callable[[Dict[str, Any]], None], acknowledgment_timeout: int = 10
) -> str:
"""
Subscribe to a GraphQL query via WebSocket.
:param query: GraphQL query as a string.
:param variables: Variables for the GraphQL query.
:param callback: A callback function that handles the incoming data.
:return: The subscription ID (used for unsubscribing later).
"""
# Wait for connection to be acknowledged before subscribing
if not self._acknowledged_event.wait(timeout=acknowledgment_timeout):
logger.error("Connection acknowledgment timeout.")
raise TimeoutError("Connection acknowledgment timeout.")

subscription_id = str(uuid.uuid4())
self.subscriptions[subscription_id] = {'query': query, 'variables': variables, 'callback': callback}

message = {
"id": subscription_id,
"type": "start",
"payload": {
"data": json.dumps({"query": query, "variables": variables}),
"extensions": {"authorization": self.auth_function()},
},
}
self._send_message(message)
logger.info(f"Subscribed with ID: {subscription_id}")
return subscription_id

def unsubscribe(self, subscription_id: str):
"""
Unsubscribe from a subscription.
:param subscription_id: The subscription ID.
"""
with self.lock:
if subscription_id in self.subscriptions:
self._send_message({"id": subscription_id, "type": "stop"})
del self.subscriptions[subscription_id]
logger.info(f"Unsubscribed from: {subscription_id}")

def _send_message(self, message: Dict[str, Any]):
"""
Send a message over the WebSocket.
"""
with self.lock:
if self.ws and self._is_open:
self.ws.send(json.dumps(message))
else:
logger.warning("WebSocket is not open. Cannot send message.")

def isConnectionOpen(self):
"""
Check if the WebSocket connection is open.
"""
return self._is_open

def isAcknowledged(self):
"""
Check if the connection has been acknowledged.
"""
return self._acknowledged_event.is_set()

def close(self):
"""
Close the WebSocket connection.
"""
with self.lock:
if self.ws:
self.ws.close()
self._is_open = False
logger.info("WebSocket connection closed.")

def __del__(self):
self.close()
2 changes: 2 additions & 0 deletions appsync_ws_client/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
class MaxRetriesExceeded(Exception):
pass
Loading

0 comments on commit 98d9140

Please sign in to comment.