Skip to content

Commit

Permalink
feat(pubsub): v5 - AppSync realtime - pass authToken via subprotocol (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
iartemiev authored Aug 27, 2024
1 parent 51964a3 commit 774c242
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 16 deletions.
54 changes: 48 additions & 6 deletions packages/pubsub/__tests__/AWSAppSyncRealTimeProvider.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,8 @@ describe('AWSAppSyncRealTimeProvider', () => {

expect(newSocketSpy).toHaveBeenNthCalledWith(
1,
'ws://localhost:8080/realtime?header=&payload=e30=',
'graphql-ws'
'ws://localhost:8080/realtime',
['graphql-ws', 'header-']
);
});

Expand All @@ -247,8 +247,8 @@ describe('AWSAppSyncRealTimeProvider', () => {

expect(newSocketSpy).toHaveBeenNthCalledWith(
1,
'wss://localhost:8080/realtime?header=&payload=e30=',
'graphql-ws'
'wss://localhost:8080/realtime',
['graphql-ws', 'header-']
);
});

Expand All @@ -274,8 +274,50 @@ describe('AWSAppSyncRealTimeProvider', () => {

expect(newSocketSpy).toHaveBeenNthCalledWith(
1,
'wss://testaccounturl123456789123.appsync-realtime-api.us-east-1.amazonaws.com/graphql?header=&payload=e30=',
'graphql-ws'
'wss://testaccounturl123456789123.appsync-realtime-api.us-east-1.amazonaws.com/graphql',
['graphql-ws', 'header-']
);
});

test('subscription generates expected auth token', async () => {
expect.assertions(1);

const newSocketSpy = jest
.spyOn(provider, 'getNewWebSocket')
.mockImplementation(() => {
fakeWebSocketInterface.newWebSocket();
return fakeWebSocketInterface.webSocket;
});

provider
.subscribe('test', {
appSyncGraphqlEndpoint:
'https://testaccounturl123456789123.appsync-api.us-east-1.amazonaws.com/graphql',
// using custom auth instead of apiKey, because the latter inserts a timestamp header => expected value changes
authenticationType: 'AWS_LAMBDA',
additionalHeaders: {
Authorization: 'my-custom-auth-token',
},
})
.subscribe({ error: () => {} });

// Wait for the socket to be initialize
await fakeWebSocketInterface.readyForUse;

/*
Regular base64 encoding of auth header {"Authorization":"my-custom-auth-token","host":"testaccounturl123456789123.appsync-api.us-east-1.amazonaws.com"}
Is: `eyJBdXRob3JpemF0aW9uIjoibXktY3VzdG9tLWF1dGgtdG9rZW4iLCJob3N0IjoidGVzdGFjY291bnR1cmwxMjM0NTY3ODkxMjMuYXBwc3luYy1hcGkudXMtZWFzdC0xLmFtYXpvbmF3cy5jb20ifQ==`
(note `==` at the end of the string)
base64url encoding is expected to drop padding chars `=`
*/

expect(newSocketSpy).toHaveBeenNthCalledWith(
1,
'wss://testaccounturl123456789123.appsync-realtime-api.us-east-1.amazonaws.com/graphql',
[
'graphql-ws',
'header-eyJBdXRob3JpemF0aW9uIjoibXktY3VzdG9tLWF1dGgtdG9rZW4iLCJob3N0IjoidGVzdGFjY291bnR1cmwxMjM0NTY3ODkxMjMuYXBwc3luYy1hcGkudXMtZWFzdC0xLmFtYXpvbmF3cy5jb20ifQ',
]
);
});

Expand Down
45 changes: 35 additions & 10 deletions packages/pubsub/src/Providers/AWSAppSyncRealTimeProvider/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,20 @@ const dispatchApiEvent = (
Hub.dispatch('api', { event, data, message }, 'PubSub', AMPLIFY_SYMBOL);
};

/**
* @returns base64url-encoded string - https://datatracker.ietf.org/doc/html/rfc4648#section-5
*/
const base64urlEncode = (str: string): string => {
const base64Str = Buffer.from(str).toString('base64');

const base64UrlStr = base64Str
.replace(/\+/g, '-')
.replace(/\//g, '_')
.replace(/=/g, '');

return base64UrlStr;
};

export type ObserverQuery = {
observer: PubSubContentObserver;
query: string;
Expand Down Expand Up @@ -182,7 +196,7 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider<AWSAppSyn
this.reconnectionMonitor.close();
}

getNewWebSocket(url: string, protocol: string) {
getNewWebSocket(url: string, protocol: string[]) {
return new WebSocket(url, protocol);
}

Expand Down Expand Up @@ -716,9 +730,7 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider<AWSAppSyn
});

const headerString = authHeader ? JSON.stringify(authHeader) : '';
const headerQs = Buffer.from(headerString).toString('base64');

const payloadQs = Buffer.from(payloadString).toString('base64');
const headerQs = base64urlEncode(headerString);

let discoverableEndpoint = appSyncGraphqlEndpoint ?? '';

Expand All @@ -737,9 +749,13 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider<AWSAppSyn
.replace('https://', protocol)
.replace('http://', protocol);

const awsRealTimeUrl = `${discoverableEndpoint}?header=${headerQs}&payload=${payloadQs}`;
const awsRealTimeUrl = discoverableEndpoint;
const authTokenSubprotocol = `header-${headerQs}`;

await this._initializeRetryableHandshake(awsRealTimeUrl);
await this._initializeRetryableHandshake(
awsRealTimeUrl,
authTokenSubprotocol
);

this.promiseArray.forEach(({ res }) => {
logger.debug('Notifying connection successful');
Expand All @@ -764,23 +780,32 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider<AWSAppSyn
});
}

private async _initializeRetryableHandshake(awsRealTimeUrl: string) {
private async _initializeRetryableHandshake(
awsRealTimeUrl: string,
subprotocol: string
) {
logger.debug(`Initializaling retryable Handshake`);
await jitteredExponentialRetry(
this._initializeHandshake.bind(this),
[awsRealTimeUrl],
[awsRealTimeUrl, subprotocol],
MAX_DELAY_MS
);
}

private async _initializeHandshake(awsRealTimeUrl: string) {
private async _initializeHandshake(
awsRealTimeUrl: string,
subprotocol: string
) {
logger.debug(`Initializing handshake ${awsRealTimeUrl}`);
// Because connecting the socket is async, is waiting until connection is open
// Step 1: connect websocket
try {
await (() => {
return new Promise<void>((res, rej) => {
const newSocket = this.getNewWebSocket(awsRealTimeUrl, 'graphql-ws');
const newSocket = this.getNewWebSocket(awsRealTimeUrl, [
'graphql-ws',
subprotocol,
]);
newSocket.onerror = () => {
logger.debug(`WebSocket connection error`);
};
Expand Down

0 comments on commit 774c242

Please sign in to comment.