Skip to content

Commit

Permalink
fix(NODE-4262): make startSession work without a connection
Browse files Browse the repository at this point in the history
  • Loading branch information
nbbeeken committed Jun 4, 2022
1 parent ed50ef5 commit 08d6b88
Show file tree
Hide file tree
Showing 25 changed files with 397 additions and 1,008 deletions.
29 changes: 10 additions & 19 deletions src/cursor/abstract_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ export abstract class AbstractCursor<
/** @internal */
[kId]?: Long;
/** @internal */
[kSession]?: ClientSession;
[kSession]: ClientSession;
/** @internal */
[kServer]?: Server;
/** @internal */
Expand Down Expand Up @@ -187,6 +187,8 @@ export abstract class AbstractCursor<

if (options.session instanceof ClientSession) {
this[kSession] = options.session;
} else {
this[kSession] = this[kClient].startSession({ owner: this, explicit: false });
}
}

Expand Down Expand Up @@ -217,11 +219,11 @@ export abstract class AbstractCursor<
}

/** @internal */
get session(): ClientSession | undefined {
get session(): ClientSession {
return this[kSession];
}

set session(clientSession: ClientSession | undefined) {
set session(clientSession: ClientSession) {
this[kSession] = clientSession;
}

Expand Down Expand Up @@ -592,11 +594,12 @@ export abstract class AbstractCursor<
const session = this[kSession];
if (session) {
// We only want to end this session if we created it, and it hasn't ended yet
if (session.explicit === false && !session.hasEnded) {
session.endSession();
if (session.explicit === false) {
if (!session.hasEnded) {
session.endSession(() => null);
}
this[kSession] = this.client.startSession({ owner: this, explicit: false });
}

this[kSession] = undefined;
}
}

Expand Down Expand Up @@ -644,22 +647,10 @@ export abstract class AbstractCursor<
* a significant refactor.
*/
[kInit](callback: Callback<TSchema | null>): void {
if (this[kSession] == null) {
if (this[kClient].topology?.shouldCheckForSessionSupport()) {
return this[kClient].topology?.selectServer(ReadPreference.primaryPreferred, {}, err => {
if (err) return callback(err);
return this[kInit](callback);
});
} else if (this[kClient].topology?.hasSessionSupport()) {
this[kSession] = this[kClient].topology?.startSession({ owner: this, explicit: false });
}
}

this._initialize(this[kSession], (err, state) => {
if (state) {
const response = state.response;
this[kServer] = state.server;
this[kSession] = state.session;

if (response.cursor) {
this[kId] =
Expand Down
87 changes: 63 additions & 24 deletions src/mongo_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,17 @@ import type { MONGO_CLIENT_EVENTS } from './constants';
import { Db, DbOptions } from './db';
import type { AutoEncrypter, AutoEncryptionOptions } from './deps';
import type { Encrypter } from './encrypter';
import { MongoInvalidArgumentError, MongoNotConnectedError } from './error';
import { MongoInvalidArgumentError } from './error';
import type { Logger, LoggerLevel } from './logger';
import { TypedEventEmitter } from './mongo_types';
import { connect } from './operations/connect';
import { PromiseProvider } from './promise_provider';
import type { ReadConcern, ReadConcernLevel, ReadConcernLike } from './read_concern';
import type { ReadPreference, ReadPreferenceMode } from './read_preference';
import { ReadPreference, ReadPreferenceMode } from './read_preference';
import type { TagSet } from './sdam/server_description';
import type { SrvPoller } from './sdam/srv_polling';
import type { Topology, TopologyEvents } from './sdam/topology';
import type { ClientSession, ClientSessionOptions } from './sessions';
import { ClientSession, ClientSessionOptions, ServerSessionPool } from './sessions';
import {
Callback,
ClientMetadata,
Expand Down Expand Up @@ -267,10 +267,15 @@ export type WithSessionCallback = (session: ClientSession) => Promise<any>;
/** @internal */
export interface MongoClientPrivate {
url: string;
sessions: Set<ClientSession>;
bsonOptions: BSONSerializeOptions;
namespace: MongoDBNamespace;
hasBeenClosed: boolean;
/**
* We keep a reference to the sessions that are acquired from the pool
* so we can end them at client.close time. Bookkeeping for testing mainly (non-spec).
*/
readonly activeSessions: Set<ClientSession>;
readonly sessionPool: ServerSessionPool;
readonly options: MongoOptions;
readonly readConcern?: ReadConcern;
readonly writeConcern?: WriteConcern;
Expand Down Expand Up @@ -352,10 +357,11 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> {
// The internal state
this.s = {
url,
sessions: new Set(),
bsonOptions: resolveBSONOptions(this[kOptions]),
namespace: ns('admin'),
hasBeenClosed: false,
sessionPool: new ServerSessionPool(this),
activeSessions: new Set(),

get options() {
return client[kOptions];
Expand Down Expand Up @@ -470,23 +476,51 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> {

return maybePromise(callback, callback => {
if (this.topology == null) {
// Do not connect just to end sessions
return callback();
}

// clear out references to old topology
const topology = this.topology;
this.topology = undefined;

topology.close({ force }, error => {
if (error) return callback(error);
const { encrypter } = this[kOptions];
if (encrypter) {
return encrypter.close(this, force, error => {
callback(error);
const activeSessionEnds = Array.from(this.s.activeSessions, session => session.endSession());
this.s.activeSessions.clear();

Promise.all(activeSessionEnds)
.then(() => {
const endSessions = Array.from(this.s.sessionPool.sessions, ({ id }) => id);
if (endSessions.length === 0) return;
return this.db('admin')
.command(
{ endSessions },
{ readPreference: ReadPreference.primaryPreferred, noResponse: true }
)
.then(() => null) // outcome does not matter
.catch(() => null); // outcome does not matter
})
.then(() => {
if (this.topology == null) {
return callback();
}
// clear out references to old topology
const topology = this.topology;
this.topology = undefined;

return new Promise<void>((resolve, reject) => {
topology.close({ force }, error => {
if (error) return reject(error);
const { encrypter } = this[kOptions];
if (encrypter) {
return encrypter.close(this, force, error => {
if (error) return reject(error);
resolve();
});
}
resolve();
});
});
}
callback();
});
})
.then(
() => callback(),
error => callback(error)
);
});
}

Expand Down Expand Up @@ -553,12 +587,17 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> {
startSession(): ClientSession;
startSession(options: ClientSessionOptions): ClientSession;
startSession(options?: ClientSessionOptions): ClientSession {
options = Object.assign({ explicit: true }, options);
if (!this.topology) {
throw new MongoNotConnectedError('MongoClient must be connected to start a session');
}

return this.topology.startSession(options, this.s.options);
const session = new ClientSession(
this,
this.s.sessionPool,
{ explicit: true, ...options },
this[kOptions]
);
this.s.activeSessions.add(session);
session.once('ended', () => {
this.s.activeSessions.delete(session);
});
return session;
}

/**
Expand Down
2 changes: 1 addition & 1 deletion src/operations/execute_operation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ export function executeOperation<
if (topology.hasSessionSupport()) {
if (session == null) {
owner = Symbol();
session = topology.startSession({ owner, explicit: false });
session = client.startSession({ owner, explicit: false });
} else if (session.hasEnded) {
return callback(new MongoExpiredSessionError('Use of expired sessions is not permitted'));
} else if (session.snapshotEnabled && !topology.capabilities.supportsSnapshotReads) {
Expand Down
Loading

0 comments on commit 08d6b88

Please sign in to comment.