Skip to content

Commit

Permalink
fix: (core) Add circuit breaker pattern for database operations
Browse files Browse the repository at this point in the history
Implements circuit breaker pattern to handle database failures gracefully
and prevent cascading failures. Fixes elizaOS#712.

Changes:
- Adds CircuitBreaker class with CLOSED, OPEN, and HALF-OPEN states
- Introduces BaseCircuitBreakerAdapter for database adapters
- Configurable failure thresholds and recovery timeouts
- Automatic recovery attempts in HALF-OPEN state
- Detailed logging of circuit breaker state changes

Circuit breaker configuration:
- Opens after 5 consecutive failures (configurable)
- Resets after 60 seconds in OPEN state
- Requires 3 successful operations in HALF-OPEN state to close

This helps prevent overwhelming failed database connections and provides
graceful degradation during outages.
  • Loading branch information
augchan42 committed Nov 30, 2024
1 parent ccf2287 commit 780b16e
Show file tree
Hide file tree
Showing 4 changed files with 169 additions and 52 deletions.
121 changes: 69 additions & 52 deletions packages/adapter-postgres/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ import {
type UUID,
type IDatabaseCacheAdapter,
Participant,
DatabaseAdapter,
elizaLogger,
getEmbeddingConfig,
BaseCircuitBreakerAdapter,
} from "@ai16z/eliza";
import fs from "fs";
import { fileURLToPath } from "url";
Expand All @@ -32,7 +32,7 @@ const __filename = fileURLToPath(import.meta.url); // get the resolved path to t
const __dirname = path.dirname(__filename); // get the name of the directory

export class PostgresDatabaseAdapter
extends DatabaseAdapter<PoolType>
extends BaseCircuitBreakerAdapter<PoolType>
implements IDatabaseCacheAdapter
{
private pool: InstanceType<PoolType>;
Expand All @@ -43,7 +43,12 @@ export class PostgresDatabaseAdapter
private readonly connectionTimeout: number = 5000; // 5 seconds

constructor(connectionConfig: any) {
super();
super({
//CircuitBreaker config
failureThreshold: 5,
resetTimeout: 60000, // 1 minute
halfOpenMaxAttempts: 3,
});

const defaultConfig = {
max: 20,
Expand Down Expand Up @@ -78,53 +83,63 @@ export class PostgresDatabaseAdapter
}

private async withRetry<T>(operation: () => Promise<T>): Promise<T> {
let lastError: Error = new Error("Unknown error"); // Initialize with default

for (let attempt = 1; attempt <= this.maxRetries; attempt++) {
try {
return await operation();
} catch (error) {
lastError = error as Error;

if (attempt < this.maxRetries) {
// Calculate delay with exponential backoff
const backoffDelay = Math.min(
this.baseDelay * Math.pow(2, attempt - 1),
this.maxDelay
);

// Add jitter to prevent thundering herd
const jitter = Math.random() * this.jitterMax;
const delay = backoffDelay + jitter;
return this.withCircuitBreaker(async () => {
let lastError: Error = new Error("Unknown error");

elizaLogger.warn(
`Database operation failed (attempt ${attempt}/${this.maxRetries}):`,
{
for (let attempt = 1; attempt <= this.maxRetries; attempt++) {
try {
return await operation();
} catch (error) {
lastError = error as Error;

if (attempt < this.maxRetries) {
// Calculate delay with exponential backoff
const backoffDelay = Math.min(
this.baseDelay * Math.pow(2, attempt - 1),
this.maxDelay
);

// Add jitter to prevent thundering herd
const jitter = Math.random() * this.jitterMax;
const delay = backoffDelay + jitter;

elizaLogger.warn(
`Database operation failed (attempt ${attempt}/${this.maxRetries}):`,
{
error:
error instanceof Error
? error.message
: String(error),
nextRetryIn: `${(delay / 1000).toFixed(1)}s`,
circuitState: this.circuitBreaker.getState(),
attempt,
maxRetries: this.maxRetries,
}
);

await new Promise((resolve) =>
setTimeout(resolve, delay)
);
} else {
elizaLogger.error("Max retry attempts reached:", {
error:
error instanceof Error
? error.message
: String(error),
nextRetryIn: `${(delay / 1000).toFixed(1)}s`,
}
);
totalAttempts: attempt,
circuitState: this.circuitBreaker.getState(),
});

await new Promise((resolve) => setTimeout(resolve, delay));
} else {
elizaLogger.error("Max retry attempts reached:", {
error:
error instanceof Error
? error.message
: String(error),
totalAttempts: attempt,
});
throw error instanceof Error
? error
: new Error(String(error));
// Let the circuit breaker know about the failure
throw error instanceof Error
? error
: new Error(String(error));
}
}
}
}

throw lastError;
throw lastError;
}, "PostgresDB");
}

private async handlePoolError(error: Error) {
Expand Down Expand Up @@ -159,16 +174,18 @@ export class PostgresDatabaseAdapter
queryTextOrConfig: string | QueryConfig<I>,
values?: QueryConfigValues<I>
): Promise<QueryResult<R>> {
const client = await this.pool.connect();

try {
return client.query(queryTextOrConfig, values);
} catch (error) {
elizaLogger.error(error);
throw error;
} finally {
client.release();
}
// Should be wrapped in withRetry
return this.withRetry(async () => {
const client = await this.pool.connect();
try {
return await client.query(queryTextOrConfig, values);
} catch (error) {
elizaLogger.error(error);
throw error;
} finally {
client.release();
}
});
}

async init() {
Expand Down Expand Up @@ -1167,7 +1184,7 @@ export class PostgresDatabaseAdapter
);
return true;
} catch (error) {
console.log("Error adding participant", error);
elizaLogger.error("Error adding participant", error);
return false;
}
});
Expand Down
31 changes: 31 additions & 0 deletions packages/core/src/database/BaseCircuitBreakerAdapter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import { CircuitBreaker } from "./CircuitBreaker";
import { elizaLogger } from "../logger";
import { DatabaseAdapter } from "../database";

export abstract class BaseCircuitBreakerAdapter<T> extends DatabaseAdapter<T> {
protected circuitBreaker: CircuitBreaker;

constructor(circuitBreakerConfig?: {
failureThreshold?: number;
resetTimeout?: number;
halfOpenMaxAttempts?: number;
}) {
super();
this.circuitBreaker = new CircuitBreaker(circuitBreakerConfig);
}

protected async withCircuitBreaker<T>(
operation: () => Promise<T>,
context: string
): Promise<T> {
try {
return await this.circuitBreaker.execute(operation);
} catch (error) {
elizaLogger.error(`Circuit breaker error in ${context}:`, {
error: error instanceof Error ? error.message : String(error),
state: this.circuitBreaker.getState(),
});
throw error;
}
}
}
68 changes: 68 additions & 0 deletions packages/core/src/database/CircuitBreaker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
export class CircuitBreaker {
private state: "CLOSED" | "OPEN" | "HALF_OPEN" = "CLOSED";
private failureCount: number = 0;
private lastFailureTime?: number;

private readonly failureThreshold: number;
private readonly resetTimeout: number;
private readonly halfOpenMaxAttempts: number;
private halfOpenSuccesses: number = 0;

constructor(
config: {
failureThreshold?: number;
resetTimeout?: number;
halfOpenMaxAttempts?: number;
} = {}
) {
this.failureThreshold = config.failureThreshold ?? 5;
this.resetTimeout = config.resetTimeout ?? 60000; // 1 minute
this.halfOpenMaxAttempts = config.halfOpenMaxAttempts ?? 3;
}

async execute<T>(operation: () => Promise<T>): Promise<T> {
if (this.state === "OPEN") {
if (Date.now() - (this.lastFailureTime || 0) > this.resetTimeout) {
this.state = "HALF_OPEN";
this.halfOpenSuccesses = 0;
} else {
throw new Error("Circuit breaker is OPEN");
}
}

try {
const result = await operation();

if (this.state === "HALF_OPEN") {
this.halfOpenSuccesses++;
if (this.halfOpenSuccesses >= this.halfOpenMaxAttempts) {
this.reset();
}
}

return result;
} catch (error) {
this.handleFailure();
throw error;
}
}

private handleFailure(): void {
this.failureCount++;
this.lastFailureTime = Date.now();

if (this.failureCount >= this.failureThreshold) {
this.state = "OPEN";
}
}

private reset(): void {
this.state = "CLOSED";
this.failureCount = 0;
this.lastFailureTime = undefined;
}

getState(): "CLOSED" | "OPEN" | "HALF_OPEN" {
return this.state;
}
}
1 change: 1 addition & 0 deletions packages/core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,4 @@ export * from "./enviroment.ts";
export * from "./cache.ts";
export { default as knowledge } from "./knowledge.ts";
export * from "./utils.ts";
export { BaseCircuitBreakerAdapter } from "./database/BaseCircuitBreakerAdapter";

0 comments on commit 780b16e

Please sign in to comment.