189 lines
5.5 KiB
TypeScript
189 lines
5.5 KiB
TypeScript
import type { Document } from './bson';
|
|
import { MongoRuntimeError, MongoTransactionError } from './error';
|
|
import type { CommandOperationOptions } from './operations/command';
|
|
import { ReadConcern, ReadConcernLike } from './read_concern';
|
|
import type { ReadPreferenceLike } from './read_preference';
|
|
import { ReadPreference } from './read_preference';
|
|
import type { Server } from './sdam/server';
|
|
import { WriteConcern } from './write_concern';
|
|
|
|
/** @internal */
|
|
export const TxnState = Object.freeze({
|
|
NO_TRANSACTION: 'NO_TRANSACTION',
|
|
STARTING_TRANSACTION: 'STARTING_TRANSACTION',
|
|
TRANSACTION_IN_PROGRESS: 'TRANSACTION_IN_PROGRESS',
|
|
TRANSACTION_COMMITTED: 'TRANSACTION_COMMITTED',
|
|
TRANSACTION_COMMITTED_EMPTY: 'TRANSACTION_COMMITTED_EMPTY',
|
|
TRANSACTION_ABORTED: 'TRANSACTION_ABORTED'
|
|
} as const);
|
|
|
|
/** @internal */
|
|
export type TxnState = (typeof TxnState)[keyof typeof TxnState];
|
|
|
|
const stateMachine: { [state in TxnState]: TxnState[] } = {
|
|
[TxnState.NO_TRANSACTION]: [TxnState.NO_TRANSACTION, TxnState.STARTING_TRANSACTION],
|
|
[TxnState.STARTING_TRANSACTION]: [
|
|
TxnState.TRANSACTION_IN_PROGRESS,
|
|
TxnState.TRANSACTION_COMMITTED,
|
|
TxnState.TRANSACTION_COMMITTED_EMPTY,
|
|
TxnState.TRANSACTION_ABORTED
|
|
],
|
|
[TxnState.TRANSACTION_IN_PROGRESS]: [
|
|
TxnState.TRANSACTION_IN_PROGRESS,
|
|
TxnState.TRANSACTION_COMMITTED,
|
|
TxnState.TRANSACTION_ABORTED
|
|
],
|
|
[TxnState.TRANSACTION_COMMITTED]: [
|
|
TxnState.TRANSACTION_COMMITTED,
|
|
TxnState.TRANSACTION_COMMITTED_EMPTY,
|
|
TxnState.STARTING_TRANSACTION,
|
|
TxnState.NO_TRANSACTION
|
|
],
|
|
[TxnState.TRANSACTION_ABORTED]: [TxnState.STARTING_TRANSACTION, TxnState.NO_TRANSACTION],
|
|
[TxnState.TRANSACTION_COMMITTED_EMPTY]: [
|
|
TxnState.TRANSACTION_COMMITTED_EMPTY,
|
|
TxnState.NO_TRANSACTION
|
|
]
|
|
};
|
|
|
|
const ACTIVE_STATES: Set<TxnState> = new Set([
|
|
TxnState.STARTING_TRANSACTION,
|
|
TxnState.TRANSACTION_IN_PROGRESS
|
|
]);
|
|
|
|
const COMMITTED_STATES: Set<TxnState> = new Set([
|
|
TxnState.TRANSACTION_COMMITTED,
|
|
TxnState.TRANSACTION_COMMITTED_EMPTY,
|
|
TxnState.TRANSACTION_ABORTED
|
|
]);
|
|
|
|
/**
|
|
* Configuration options for a transaction.
|
|
* @public
|
|
*/
|
|
export interface TransactionOptions extends CommandOperationOptions {
|
|
// TODO(NODE-3344): These options use the proper class forms of these settings, it should accept the basic enum values too
|
|
/** A default read concern for commands in this transaction */
|
|
readConcern?: ReadConcernLike;
|
|
/** A default writeConcern for commands in this transaction */
|
|
writeConcern?: WriteConcern;
|
|
/** A default read preference for commands in this transaction */
|
|
readPreference?: ReadPreferenceLike;
|
|
/** Specifies the maximum amount of time to allow a commit action on a transaction to run in milliseconds */
|
|
maxCommitTimeMS?: number;
|
|
}
|
|
|
|
/**
|
|
* @public
|
|
* A class maintaining state related to a server transaction. Internal Only
|
|
*/
|
|
export class Transaction {
|
|
/** @internal */
|
|
state: TxnState;
|
|
options: TransactionOptions;
|
|
/** @internal */
|
|
_pinnedServer?: Server;
|
|
/** @internal */
|
|
_recoveryToken?: Document;
|
|
|
|
/** Create a transaction @internal */
|
|
constructor(options?: TransactionOptions) {
|
|
options = options ?? {};
|
|
this.state = TxnState.NO_TRANSACTION;
|
|
this.options = {};
|
|
|
|
const writeConcern = WriteConcern.fromOptions(options);
|
|
if (writeConcern) {
|
|
if (writeConcern.w === 0) {
|
|
throw new MongoTransactionError('Transactions do not support unacknowledged write concern');
|
|
}
|
|
|
|
this.options.writeConcern = writeConcern;
|
|
}
|
|
|
|
if (options.readConcern) {
|
|
this.options.readConcern = ReadConcern.fromOptions(options);
|
|
}
|
|
|
|
if (options.readPreference) {
|
|
this.options.readPreference = ReadPreference.fromOptions(options);
|
|
}
|
|
|
|
if (options.maxCommitTimeMS) {
|
|
this.options.maxTimeMS = options.maxCommitTimeMS;
|
|
}
|
|
|
|
// TODO: This isn't technically necessary
|
|
this._pinnedServer = undefined;
|
|
this._recoveryToken = undefined;
|
|
}
|
|
|
|
/** @internal */
|
|
get server(): Server | undefined {
|
|
return this._pinnedServer;
|
|
}
|
|
|
|
get recoveryToken(): Document | undefined {
|
|
return this._recoveryToken;
|
|
}
|
|
|
|
get isPinned(): boolean {
|
|
return !!this.server;
|
|
}
|
|
|
|
/** @returns Whether the transaction has started */
|
|
get isStarting(): boolean {
|
|
return this.state === TxnState.STARTING_TRANSACTION;
|
|
}
|
|
|
|
/**
|
|
* @returns Whether this session is presently in a transaction
|
|
*/
|
|
get isActive(): boolean {
|
|
return ACTIVE_STATES.has(this.state);
|
|
}
|
|
|
|
get isCommitted(): boolean {
|
|
return COMMITTED_STATES.has(this.state);
|
|
}
|
|
/**
|
|
* Transition the transaction in the state machine
|
|
* @internal
|
|
* @param nextState - The new state to transition to
|
|
*/
|
|
transition(nextState: TxnState): void {
|
|
const nextStates = stateMachine[this.state];
|
|
if (nextStates && nextStates.includes(nextState)) {
|
|
this.state = nextState;
|
|
if (
|
|
this.state === TxnState.NO_TRANSACTION ||
|
|
this.state === TxnState.STARTING_TRANSACTION ||
|
|
this.state === TxnState.TRANSACTION_ABORTED
|
|
) {
|
|
this.unpinServer();
|
|
}
|
|
return;
|
|
}
|
|
|
|
throw new MongoRuntimeError(
|
|
`Attempted illegal state transition from [${this.state}] to [${nextState}]`
|
|
);
|
|
}
|
|
|
|
/** @internal */
|
|
pinServer(server: Server): void {
|
|
if (this.isActive) {
|
|
this._pinnedServer = server;
|
|
}
|
|
}
|
|
|
|
/** @internal */
|
|
unpinServer(): void {
|
|
this._pinnedServer = undefined;
|
|
}
|
|
}
|
|
|
|
export function isTransactionCommand(command: Document): boolean {
|
|
return !!(command.commitTransaction || command.abortTransaction);
|
|
}
|