Skip to content

Commit

Permalink
Merge pull request #557 from ai16z/fix/postgres
Browse files Browse the repository at this point in the history
fix: postgres
  • Loading branch information
shakkernerd authored Nov 25, 2024
2 parents 1977d52 + f14e9e0 commit 97518d6
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 58 deletions.
1 change: 1 addition & 0 deletions agent/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ function initializeDatabase(dataDir: string) {
if (process.env.POSTGRES_URL) {
const db = new PostgresDatabaseAdapter({
connectionString: process.env.POSTGRES_URL,
parseInputs: true,
});
return db;
} else {
Expand Down
20 changes: 11 additions & 9 deletions packages/adapter-postgres/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,16 @@ export class PostgresDatabaseAdapter
while (retryCount < maxRetries) {
try {
const delay = baseDelay * Math.pow(2, retryCount);
elizaLogger.log(`Attempting to reconnect in ${delay}ms...`);
elizaLogger.warn(
`Attempting to reconnect in ${delay}ms...`
);
await new Promise((resolve) => setTimeout(resolve, delay));

// Create new pool with same config
this.pool = new pg.Pool(this.pool.options);
await this.testConnection();

elizaLogger.log("Successfully reconnected to database");
elizaLogger.success("Successfully reconnected to database");
return;
} catch (error) {
retryCount++;
Expand Down Expand Up @@ -116,7 +118,7 @@ export class PostgresDatabaseAdapter
try {
client = await this.pool.connect();
const result = await client.query("SELECT NOW()");
elizaLogger.log(
elizaLogger.success(
"Database connection test successful:",
result.rows[0]
);
Expand Down Expand Up @@ -215,7 +217,7 @@ export class PostgresDatabaseAdapter
if (rows.length === 0) return null;

const account = rows[0];
elizaLogger.log("account", account);
elizaLogger.debug("account", account);
return {
...account,
details:
Expand Down Expand Up @@ -346,7 +348,7 @@ export class PostgresDatabaseAdapter
if (!params.roomId) throw new Error("roomId is required");
let sql = `SELECT * FROM memories WHERE type = $1 AND "agentId" = $2 AND "roomId" = $3`;
const values: any[] = [params.tableName, params.agentId, params.roomId];
let paramCount = 2;
let paramCount = 3; // Updated to start at 3 since we already have 3 parameters

if (params.start) {
paramCount++;
Expand All @@ -366,9 +368,9 @@ export class PostgresDatabaseAdapter

sql += ' ORDER BY "createdAt" DESC';

if (params.count) {
if (params.count && typeof params.count === "number") {
paramCount++;
sql += ` LIMIT $${paramCount}`;
sql += ` LIMIT $${paramCount}::integer`; // Cast to integer
values.push(params.count);
}

Expand Down Expand Up @@ -628,7 +630,7 @@ export class PostgresDatabaseAdapter
);

if (existingParticipant.rows.length > 0) {
elizaLogger.log(
elizaLogger.error(
`Participant with userId ${userId} already exists in room ${roomId}.`
);
return true; // Exit early if the participant already exists
Expand All @@ -643,7 +645,7 @@ export class PostgresDatabaseAdapter
return true;
} catch (error) {
if (error instanceof DatabaseError) {
elizaLogger.log("Error adding participant", error);
elizaLogger.error("Error adding participant", error);
// This is to prevent duplicate participant error in case of a race condition
// Handle unique constraint violation error (code 23505)
if (error.code === "23505") {
Expand Down
178 changes: 129 additions & 49 deletions packages/client-discord/src/voice.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ import {
NoSubscriberBehavior,
StreamType,
VoiceConnection,
VoiceConnectionStatus,
createAudioPlayer,
createAudioResource,
getVoiceConnection,
joinVoiceChannel,
entersState,
} from "@discordjs/voice";
import {
BaseGuildVoiceChannel,
Expand Down Expand Up @@ -230,6 +232,7 @@ export class VoiceManager extends EventEmitter {
console.error("Error leaving voice channel:", error);
}
}

const connection = joinVoiceChannel({
channelId: channel.id,
guildId: channel.guild.id,
Expand All @@ -238,38 +241,103 @@ export class VoiceManager extends EventEmitter {
selfMute: false,
});

const me = channel.guild.members.me;
if (me?.voice && me.permissions.has("DeafenMembers")) {
await me.voice.setDeaf(false);
await me.voice.setMute(false);
} else {
elizaLogger.log("Bot lacks permission to modify voice state");
}
try {
// Wait for either Ready or Signalling state
await Promise.race([
entersState(connection, VoiceConnectionStatus.Ready, 20_000),
entersState(
connection,
VoiceConnectionStatus.Signalling,
20_000
),
]);

// Log connection success
elizaLogger.log(
`Voice connection established in state: ${connection.state.status}`
);

for (const [, member] of channel.members) {
if (!member.user.bot) {
this.monitorMember(member, channel);
}
}
// Set up ongoing state change monitoring
connection.on("stateChange", async (oldState, newState) => {
elizaLogger.log(
`Voice connection state changed from ${oldState.status} to ${newState.status}`
);

connection.on("error", (error) => {
console.error("Voice connection error:", error);
});
if (newState.status === VoiceConnectionStatus.Disconnected) {
elizaLogger.log("Handling disconnection...");

connection.receiver.speaking.on("start", (userId: string) => {
const user = channel.members.get(userId);
if (!user?.user.bot) {
this.monitorMember(user as GuildMember, channel);
this.streams.get(userId)?.emit("speakingStarted");
try {
// Try to reconnect if disconnected
await Promise.race([
entersState(
connection,
VoiceConnectionStatus.Signalling,
5_000
),
entersState(
connection,
VoiceConnectionStatus.Connecting,
5_000
),
]);
// Seems to be reconnecting to a new channel
elizaLogger.log("Reconnecting to channel...");
} catch (e) {
// Seems to be a real disconnect, destroy and cleanup
elizaLogger.log(
"Disconnection confirmed - cleaning up..." + e
);
connection.destroy();
this.connections.delete(channel.id);
}
} else if (
newState.status === VoiceConnectionStatus.Destroyed
) {
this.connections.delete(channel.id);
} else if (
!this.connections.has(channel.id) &&
(newState.status === VoiceConnectionStatus.Ready ||
newState.status === VoiceConnectionStatus.Signalling)
) {
this.connections.set(channel.id, connection);
}
});

connection.on("error", (error) => {
elizaLogger.log("Voice connection error:", error);
// Don't immediately destroy - let the state change handler deal with it
elizaLogger.log(
"Connection error - will attempt to recover..."
);
});

// Store the connection
this.connections.set(channel.id, connection);

// Continue with voice state modifications
const me = channel.guild.members.me;
if (me?.voice && me.permissions.has("DeafenMembers")) {
try {
await me.voice.setDeaf(false);
await me.voice.setMute(false);
} catch (error) {
elizaLogger.log("Failed to modify voice state:", error);
// Continue even if this fails
}
}
});

connection.receiver.speaking.on("end", async (userId: string) => {
const user = channel.members.get(userId);
if (!user?.user.bot) {
this.streams.get(userId)?.emit("speakingStopped");
// Set up member monitoring
for (const [, member] of channel.members) {
if (!member.user.bot) {
await this.monitorMember(member, channel);
}
}
});
} catch (error) {
elizaLogger.log("Failed to establish voice connection:", error);
connection.destroy();
this.connections.delete(channel.id);
throw error;
}
}

private async monitorMember(
Expand Down Expand Up @@ -780,7 +848,7 @@ export class VoiceManager extends EventEmitter {

audioPlayer.on(
"stateChange",
(oldState: any, newState: { status: string }) => {
(_oldState: any, newState: { status: string }) => {
if (newState.status == "idle") {
const idleTime = Date.now();
console.log(
Expand All @@ -792,34 +860,46 @@ export class VoiceManager extends EventEmitter {
}

async handleJoinChannelCommand(interaction: any) {
const channelId = interaction.options.get("channel")?.value as string;
if (!channelId) {
await interaction.reply("Please provide a voice channel to join.");
return;
}
const guild = interaction.guild;
if (!guild) {
return;
}
const voiceChannel = interaction.guild.channels.cache.find(
(channel: VoiceChannel) =>
channel.id === channelId &&
channel.type === ChannelType.GuildVoice
);
try {
// Defer the reply immediately to prevent interaction timeout
await interaction.deferReply();

const channelId = interaction.options.get("channel")
?.value as string;
if (!channelId) {
await interaction.editReply(
"Please provide a voice channel to join."
);
return;
}

if (!voiceChannel) {
await interaction.reply("Voice channel not found!");
return;
}
const guild = interaction.guild;
if (!guild) {
await interaction.editReply("Could not find guild.");
return;
}

try {
this.joinChannel(voiceChannel as BaseGuildVoiceChannel);
await interaction.reply(
const voiceChannel = interaction.guild.channels.cache.find(
(channel: VoiceChannel) =>
channel.id === channelId &&
channel.type === ChannelType.GuildVoice
);

if (!voiceChannel) {
await interaction.editReply("Voice channel not found!");
return;
}

await this.joinChannel(voiceChannel as BaseGuildVoiceChannel);
await interaction.editReply(
`Joined voice channel: ${voiceChannel.name}`
);
} catch (error) {
console.error("Error joining voice channel:", error);
await interaction.reply("Failed to join the voice channel.");
// Use editReply instead of reply for the error case
await interaction
.editReply("Failed to join the voice channel.")
.catch(console.error);
}
}

Expand Down

0 comments on commit 97518d6

Please sign in to comment.