Skip to content

Commit

Permalink
chore(satp-hermes): crash recovery architecture
Browse files Browse the repository at this point in the history
Signed-off-by: Rafael Belchior <rafael.belchior@tecnico.ulisboa.pt>
  • Loading branch information
RafaelAPB committed Aug 21, 2024
1 parent c768aa3 commit f9014b0
Show file tree
Hide file tree
Showing 14 changed files with 569 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ export class BLODispatcher {
public async Transact(req: StatusRequest): Promise<StatusResponse> {
return ExecuteGetStatus(this.logger, req);
}

// TODO implement recovery handlers
// get channel by caller; give needed client from orchestrator to handler to call
// for all channels, find session id on request
// TODO implement handlers GetAudit, Transact, Cancel, Routes
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
// handler to allow a user application to communicate a gateway it crashed and needs to be recovered. It "forces" and update of status with a counterparty gateway
// TODO update the spec with a RecoverForce message that is handled by this handler
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
// handler to allow a user application to force a rollback
// TODO update the spec with RollbackForce message that is handled by this handler
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
import {
Logger,
LoggerProvider,
Checks,
LogLevelDesc,
} from "@hyperledger/cactus-common";
import { SessionData } from "../../generated/proto/cacti/satp/v02/common/session_pb";
import { CrashRecoveryHandler } from "./crash-recovery-handler";
import { SATPSession } from "../satp-session";
import {
RollbackState,
RollbackStrategy,
RollbackStrategyFactory,
} from "./rollback/rollback-strategy-factory";

enum CrashStatus {
IN_RECOVERY = "IN_RECOVERY",
RECOVERED = "RECOVERED",
NO_CRASH = "NO_CRASH",
}

class CrashOccurrence {
constructor(
public status: CrashStatus,
public time: Date,
public lastUpdate: Date,
) {}
}

export interface ICrashRecoveryManagerOptions {
logLevel?: LogLevelDesc;
instanceId: string;
}

export class CrashRecoveryManager {
public static readonly CLASS_NAME = "CrashRecoveryManager";
private readonly log: Logger;
private readonly instanceId: string;
private readonly sessions: Map<string, SessionData>;
private crashRecoveryHandler: CrashRecoveryHandler;
private factory: RollbackStrategyFactory;

constructor(public readonly options: ICrashRecoveryManagerOptions) {
const fnTag = `${CrashRecoveryManager.CLASS_NAME}#constructor()`;
Checks.truthy(options, `${fnTag} arg options`);

const level = this.options.logLevel || "DEBUG";
const label = this.className;
this.log = LoggerProvider.getOrCreate({ level, label });
this.instanceId = options.instanceId;
this.sessions = this.getSessions() || new Map<string, SessionData>();
this.log.info(`Instantiated ${this.className} OK`);
this.factory = new RollbackStrategyFactory();
}

get className(): string {
return CrashRecoveryManager.CLASS_NAME;
}

private getSessions(): Map<string, SessionData> {
// todo read from local log to get session data
return new Map<string, SessionData>();
}

// todo create util functoin that retrieves sessionid and checks if it is valid; i believe it is implemented in the satp services, refactor making it reusable
private checkCrash(sessionId: SATPSession): Promise<boolean> {
// todo implement crash check - check logs and understsands if there was a crash; might use timouts, etc
return Promise.resolve(false);
}

public async setupCrashManager() {
// todo setup handler, need to create services
this.crashRecoveryHandler = new CrashRecoveryHandler({
loggerOptions: {
label: "CrashRecoveryHandler",
level: "DEBUG",
},
serverService: null,
clientService: null,
sessions: this.sessions,
});
}

public async checkAndResolveCrash(sessionId: SATPSession): Promise<boolean> {
const fnTag = `${this.className}#checkAndResolveCrash()`;
this.log.info(`${fnTag} Checking crash status for session ${sessionId}`);

try {
const didCrash = await this.checkCrash(sessionId);
if (didCrash) {
// create new occurrence
const crashOccurrence = new CrashOccurrence(
CrashStatus.IN_RECOVERY,
new Date(),
new Date(),
);
this.log.debug(crashOccurrence);
// todo manage occurrence
// call corresponding services via handler for crash recovery
return false;
} else {
this.log.error(
`${fnTag} Failed to resolve crash for session ${sessionId}`,
);
// should panic and stop the server, for manual inspection
return false;
}
} catch (error) {
this.log.error(
`${fnTag} Error during crash check and resolution: ${error}`,
);
return false;
}
}

public async initiateRollback(
session: SATPSession,
forceRollback?: boolean,
): Promise<boolean> {
const fnTag = `CrashRecoveryManager#initiateRollback()`;
this.log.info(
`${fnTag} Initiating rollback for session ${session.getSessionId()}`,
);

try {
// Implement check for rollback (needs to read logs, etc) OR we assume that at satp handler/service layer this check is done and rollback is good to do
const shouldRollback = true; // todo implement check

if (forceRollback || shouldRollback) {
// send bridge manager and possibly others to factory
const strategy = this.factory.createStrategy(session);
const rollbackState = await this.executeRollback(strategy, session);

if (rollbackState) {
const cleanupSuccess = await this.performCleanup(
strategy,
session,
rollbackState,
);
return cleanupSuccess;
} else {
this.log.error(
`${fnTag} Rollback execution failed for session ${session.getSessionId()}`,
);
return false;
}
} else {
this.log.info(
`${fnTag} Rollback not needed for session ${session.getSessionId()}`,
);
return true;
}
} catch (error) {
this.log.error(`${fnTag} Error during rollback initiation: ${error}`);
return false;
}
}

private async executeRollback(
strategy: RollbackStrategy,
session: SATPSession,
): Promise<RollbackState | undefined> {
const fnTag = `CrashRecoveryManager#executeRollback`;
this.log.debug(
`${fnTag} Executing rollback strategy for session ${session.getSessionId()}`,
);

try {
return await strategy.execute(session);
} catch (error) {
this.log.error(`${fnTag} Error executing rollback strategy: ${error}`);
}
}

private async performCleanup(
strategy: RollbackStrategy,
session: SATPSession,
state: RollbackState,
): Promise<boolean> {
const fnTag = `CrashRecoveryManager#performCleanup`;
this.log.debug(
`${fnTag} Performing cleanup after rollback for session ${session.getSessionId()}`,
);

try {
const updatedState = await strategy.cleanup(session, state);

// TODO: Handle the updated state, perhaps update session data or perform additional actions
this.log.info(
`${fnTag} Cleanup completed. Updated state: ${JSON.stringify(updatedState)}`,
);

return true;
} catch (error) {
this.log.error(`${fnTag} Error during cleanup: ${error}`);
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,11 @@ import {
} from "../../generated/proto/cacti/satp/v02/crash_recovery_pb";
import { SessionData } from "../../generated/proto/cacti/satp/v02/common/session_pb";

// use connect protocol to send messages to the counterparty server; implement protocol
export class CrashRecoveryClientService {
createRecoverMessage(sessionData: SessionData): RecoverMessage {}

async sendRecoverMessage(
message: RecoverMessage,
): Promise<RecoverUpdateMessage> {}
async sendRecover(message: RecoverMessage): Promise<RecoverUpdateMessage> {}

async sendRecoverUpdateMessage(
message: RecoverUpdateMessage,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import {
} from "@hyperledger/cactus-common";
import { Empty } from "@bufbuild/protobuf";
import { SessionData } from "../../generated/proto/cacti/satp/v02/common/session_pb";
import { SATPSession } from "../satp-session";

interface HandlerOptions {
serverService: CrashRecoveryServerService;
Expand Down Expand Up @@ -48,9 +49,7 @@ export class CrashRecoveryHandler {
return this.logger;
}

async RecoverMessageImplementation(
req: RecoverMessage,
): Promise<RecoverUpdateMessage> {
async sendRecover(req: RecoverMessage): Promise<RecoverUpdateMessage> {
const stepTag = `RecoverV2MessageImplementation()`;
const fnTag = `${this.getHandlerIdentifier()}#${stepTag}`;
try {
Expand All @@ -77,7 +76,7 @@ export class CrashRecoveryHandler {
}
}

async RecoverUpdateMessageImplementation(
async sendRecoverUpdate(
req: RecoverUpdateMessage,
): Promise<RecoverSuccessMessage> {
const fnTag = `${this.getHandlerIdentifier()}#handleRecoverUpdateMessage()`;
Expand Down Expand Up @@ -107,9 +106,7 @@ export class CrashRecoveryHandler {
}
}

async RecoverSuccessMessageImplementation(
req: RecoverSuccessMessage,
): Promise<Empty> {
async sendRecoverSuccess(req: RecoverSuccessMessage): Promise<Empty> {
const fnTag = `${this.getHandlerIdentifier()}#handleRecoverSuccessMessage()`;
try {
this.Log.debug(`${fnTag}, Handling Recover Success Message...`);
Expand All @@ -129,9 +126,7 @@ export class CrashRecoveryHandler {
}
}

async RollbackMessageImplementation(
req: RollbackMessage,
): Promise<RollbackAckMessage> {
async sendRollback(req: RollbackMessage): Promise<RollbackAckMessage> {
const fnTag = `${this.getHandlerIdentifier()}#handleRollbackMessage()`;
try {
this.Log.debug(`${fnTag}, Handling Rollback Message...`);
Expand All @@ -152,9 +147,7 @@ export class CrashRecoveryHandler {
}
}

async RollbackAckMessageImplementation(
req: RollbackAckMessage,
): Promise<Empty> {
async sendRollbackAck(req: RollbackAckMessage): Promise<Empty> {
const fnTag = `${this.getHandlerIdentifier()}#handleRollbackAckMessage()`;
try {
this.Log.debug(`${fnTag}, Handling Rollback Ack Message...`);
Expand All @@ -176,15 +169,16 @@ export class CrashRecoveryHandler {

setupRouter(router: ConnectRouter): void {
router.service(CrashRecovery, {
recoverV2Message: this.RecoverMessageImplementation,
recoverV2UpdateMessage: this.RecoverUpdateMessageImplementation,
recoverV2SuccessMessage: this.RecoverSuccessMessageImplementation,
rollbackV2Message: this.RollbackMessageImplementation,
rollbackV2AckMessage: this.RollbackAckMessageImplementation,
recoverV2Message: this.sendRecover,
recoverV2UpdateMessage: this.sendRecoverUpdate,
recoverV2SuccessMessage: this.sendRecoverSuccess,
rollbackV2Message: this.sendRollback,
rollbackV2AckMessage: this.sendRollbackAck,
});
}

public async SendRecoverMessage(
// TODO! what is this function for? seems like a service function
public async sendRecoverTODO(
sessionId: string,
): Promise<RecoverUpdateMessage> {
const stepTag = `SendRecoverV2Message()`;
Expand All @@ -198,8 +192,7 @@ export class CrashRecoveryHandler {
}

const recoverMessage = this.clientService.createRecoverMessage(session);
const response =
await this.clientService.sendRecoverMessage(recoverMessage);
const response = await this.clientService.sendRecover(recoverMessage);

if (!response) {
throw new Error(`${fnTag}, Failed to receive RecoverUpdateMessage`);
Expand Down Expand Up @@ -227,24 +220,4 @@ export class CrashRecoveryHandler {
throw new Error(`${fnTag}, Error sending Recover Update: ${error}`);
}
}

public async InitiateRollback(sessionId: string): Promise<RollbackMessage> {
const fnTag = `${this.getHandlerIdentifier()}#initiateRollback()`;
try {
this.Log.debug(`${fnTag}, Initiating Rollback...`);

const session = this.sessions.get(sessionId);
if (!session) {
throw new Error(`${fnTag}, Session not found`);
}

const rollbackMessage = this.clientService.createRollbackMessage(session);

this.Log.debug(`${fnTag}, Rollback Message created`);

return rollbackMessage;
} catch (error) {
throw new Error(`${fnTag}, Error initiating Rollback: ${error}`);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import {
} from "../../generated/proto/cacti/satp/v02/crash_recovery_pb";
import { SessionData } from "../../generated/proto/cacti/satp/v02/common/session_pb";

// use connect protocol to receive messages from the crashed server; implement protocol
// will need access to local logs and remote logs
export class CrashRecoveryServerService {
createRecoverUpdateMessage(
request: RecoverMessage,
Expand Down
Loading

0 comments on commit f9014b0

Please sign in to comment.