Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions packages/agents-work-apps/src/__tests__/slack/routes.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,66 @@ describe('Slack Work App Routes', () => {
app = createTestApp();
});

describe('POST /events - Slack retry deduplication', () => {
it('should acknowledge Slack retries without re-processing', async () => {
const response = await app.request('/events', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'x-slack-retry-num': '1',
'x-slack-retry-reason': 'http_timeout',
},
body: JSON.stringify({
type: 'event_callback',
team_id: 'T123',
event: {
type: 'app_mention',
user: 'U123',
text: '<@UBOT> hello',
channel: 'C123',
ts: '1234.5678',
},
}),
});

expect(response.status).toBe(200);
const json = await response.json();
expect(json).toEqual({ ok: true });
});

it('should acknowledge retries even with retry reason only', async () => {
const response = await app.request('/events', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'x-slack-retry-num': '2',
},
body: JSON.stringify({ type: 'event_callback' }),
});

expect(response.status).toBe(200);
const json = await response.json();
expect(json).toEqual({ ok: true });
});

it('should process events normally when no retry headers are present', async () => {
const response = await app.request('/events', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({
type: 'url_verification',
challenge: 'test-challenge-from-retry-test',
}),
});

expect(response.status).toBe(200);
const text = await response.text();
expect(text).toBe('test-challenge-from-retry-test');
});
});

describe('POST /events - url_verification', () => {
it('should respond to url_verification challenge', async () => {
const response = await app.request('/events', {
Expand Down
101 changes: 101 additions & 0 deletions packages/agents-work-apps/src/__tests__/slack/streaming.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -148,4 +148,105 @@ describe('streamAgentResponse', () => {
expect(body.conversationId).toBe('conv-123');
expect(body.stream).toBe(true);
});

describe('contentAlreadyDelivered error suppression', () => {
it('should return success and suppress error message when content was already streamed', async () => {
// Simulate a stream that delivers content then throws on the next read
const sseData = 'data: {"type":"text-delta","delta":"Hello world"}\n';
let readCount = 0;
const stream = new ReadableStream({
pull(controller) {
if (readCount === 0) {
controller.enqueue(new TextEncoder().encode(sseData));
readCount++;
} else {
controller.error(new Error('streamer.append timed out after 10000ms'));
}
},
});

const localAppend = vi.fn().mockResolvedValue(undefined);
const localStop = vi.fn().mockResolvedValue(undefined);
mockSlackClient.chatStream.mockReturnValue({
append: localAppend,
stop: localStop,
});

vi.spyOn(global, 'fetch').mockResolvedValue(new Response(stream, { status: 200 }));

const result = await streamAgentResponse(baseParams);

expect(result.success).toBe(true);
// Should NOT post any error message to the user
expect(mockPostMessage).not.toHaveBeenCalled();
// Should still clean up thinking message
expect(mockChatDelete).toHaveBeenCalledWith(
expect.objectContaining({ channel: 'C456', ts: '1234.9999' })
);
});

it('should post error message when no content was delivered', async () => {
// Stream that errors immediately before any content
const stream = new ReadableStream({
pull(controller) {
controller.error(new Error('connection reset'));
},
});

const localAppend = vi.fn().mockResolvedValue(undefined);
const localStop = vi.fn().mockResolvedValue(undefined);
mockSlackClient.chatStream.mockReturnValue({
append: localAppend,
stop: localStop,
});

vi.spyOn(global, 'fetch').mockResolvedValue(new Response(stream, { status: 200 }));

const result = await streamAgentResponse(baseParams);

expect(result.success).toBe(false);
expect(result.errorType).toBeDefined();
// Should post error message since no content was delivered
expect(mockPostMessage).toHaveBeenCalledWith(
expect.objectContaining({
channel: 'C456',
thread_ts: '1234.5678',
})
);
});

it('should return success when streamer.stop() finalization times out after content delivery', async () => {
const sseData =
'data: {"type":"text-delta","delta":"Hello "}\n' +
'data: {"type":"text-delta","delta":"world"}\n' +
'data: [DONE]\n';

const stream = new ReadableStream({
start(controller) {
controller.enqueue(new TextEncoder().encode(sseData));
controller.close();
},
});

const localAppend = vi.fn().mockResolvedValue(undefined);
// streamer.stop() rejects to simulate finalization timeout
const localStop = vi
.fn()
.mockRejectedValue(new Error('streamer.stop timed out after 10000ms'));
mockSlackClient.chatStream.mockReturnValue({
append: localAppend,
stop: localStop,
});

vi.spyOn(global, 'fetch').mockResolvedValue(new Response(stream, { status: 200 }));

const result = await streamAgentResponse(baseParams);

expect(result.success).toBe(true);
// Should NOT post any error message to the user
expect(mockPostMessage).not.toHaveBeenCalled();
// Should still clean up thinking message
expect(mockChatDelete).toHaveBeenCalledWith(expect.objectContaining({ ts: '1234.9999' }));
});
});
});
16 changes: 16 additions & 0 deletions packages/agents-work-apps/src/slack/routes/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,22 @@ app.post('/commands', async (c) => {
});

app.post('/events', async (c) => {
// Slack retries event delivery when the initial ack is slow (>3s).
// Retries include X-Slack-Retry-Num / X-Slack-Retry-Reason headers.
// Since we fire-and-forget background work, retries would cause duplicate processing.
const retryNum = c.req.header('x-slack-retry-num');
const retryReason = c.req.header('x-slack-retry-reason');
if (retryNum) {
return tracer.startActiveSpan(`${SLACK_SPAN_NAMES.WEBHOOK} retry`, (span) => {
span.setAttribute(SLACK_SPAN_KEYS.OUTCOME, 'ignored_slack_retry' satisfies SlackOutcome);
span.setAttribute('slack.retry_num', retryNum);
if (retryReason) span.setAttribute('slack.retry_reason', retryReason);
logger.info({ retryNum, retryReason }, 'Acknowledging Slack retry without re-processing');
span.end();
return c.json({ ok: true });
});
}
Comment on lines 91 to 105
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Minor: Retry deduplication bypasses tracing span

Issue: This code path returns early before entering tracer.startActiveSpan(), making retry acknowledgments invisible to distributed tracing. All other early-return paths in this handler (url_verification, signature_invalid, ignored_bot_message) set span.setAttribute(SLACK_SPAN_KEYS.OUTCOME, outcome) before ending the span.

Why: Without span tracking, retry frequency is invisible to observability tooling. This makes it harder to detect Slack delivery issues or understand why initial acks are slow — important context for debugging latency issues.

Fix: Move the retry check inside the tracer.startActiveSpan() block and add a new outcome value (e.g., 'acknowledged_retry') to SlackOutcome type in tracer.ts. Example:

return tracer.startActiveSpan(SLACK_SPAN_NAMES.WEBHOOK, async (span) => {
  const retryNum = c.req.header('x-slack-retry-num');
  if (retryNum) {
    span.setAttribute(SLACK_SPAN_KEYS.OUTCOME, 'acknowledged_retry');
    span.setAttribute('slack.retry_num', retryNum);
    logger.info({ retryNum, retryReason: c.req.header('x-slack-retry-reason') }, 'Acknowledging Slack retry without re-processing');
    span.end();
    return c.json({ ok: true });
  }
  // ... rest of handler
});

Refs:


return tracer.startActiveSpan(SLACK_SPAN_NAMES.WEBHOOK, async (span) => {
let outcome: SlackOutcome = 'ignored_unknown_event';

Expand Down
52 changes: 46 additions & 6 deletions packages/agents-work-apps/src/slack/services/events/streaming.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ const logger = getLogger('slack-streaming');

const STREAM_TIMEOUT_MS = 120_000;
const CHATSTREAM_OP_TIMEOUT_MS = 10_000;
/** Shorter timeout for best-effort cleanup in error paths to bound total error handling time. */
const CLEANUP_TIMEOUT_MS = 3_000;

/**
* Wrap a promise with a timeout to prevent indefinite blocking on Slack API calls.
Expand Down Expand Up @@ -260,11 +262,21 @@ export async function streamAgentResponse(params: {
clearTimeout(timeoutId);

const contextBlock = createContextBlock({ agentName });
await withTimeout(
streamer.stop({ blocks: [contextBlock] }),
CHATSTREAM_OP_TIMEOUT_MS,
'streamer.stop'
);
try {
await withTimeout(
streamer.stop({ blocks: [contextBlock] }),
CHATSTREAM_OP_TIMEOUT_MS,
'streamer.stop'
);
} catch (stopError) {
// If content was already delivered to the user, a streamer.stop() timeout
// is a non-critical finalization error — log it but don't surface to user
span.setAttribute(SLACK_SPAN_KEYS.STREAM_FINALIZATION_FAILED, true);
logger.warn(
{ stopError, channel, threadTs, responseLength: fullText.length },
'Failed to finalize chatStream — content was already delivered'
);
}
Comment on lines 271 to 279
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💭 Consider: Add span attribute for finalization failures

Issue: When streamer.stop() fails in the success path, the error is logged but the span will appear as a clean success in traces. This makes it harder to identify finalization reliability issues in observability dashboards.

Why: Being able to query for "streams that succeeded but had finalization issues" would help identify patterns (e.g., specific agents, time-of-day, message length) that correlate with slow finalization.

Fix: Add a span attribute to mark degraded success:

} catch (stopError) {
  span.setAttribute('slack.finalization_failed', true);
  logger.warn(
    { stopError, channel, threadTs, responseLength: fullText.length },
    'Failed to finalize chatStream — content was already delivered'
  );
}

Refs:


if (thinkingMessageTs) {
try {
Expand All @@ -287,8 +299,36 @@ export async function streamAgentResponse(params: {
} catch (streamError) {
clearTimeout(timeoutId);
if (streamError instanceof Error) setSpanWithError(span, streamError);

const contentAlreadyDelivered = fullText.length > 0;

if (contentAlreadyDelivered) {
// Content was already streamed to the user — a late error (e.g. streamer.append
// timeout on the final chunk) should not surface as a user-facing error message.
span.setAttribute(SLACK_SPAN_KEYS.CONTENT_ALREADY_DELIVERED, true);
logger.warn(
{ streamError, channel, threadTs, responseLength: fullText.length },
'Error during Slack streaming after content was already delivered — suppressing user-facing error'
);
await withTimeout(streamer.stop(), CLEANUP_TIMEOUT_MS, 'streamer.stop-cleanup').catch((e) =>
logger.warn({ error: e }, 'Failed to stop streamer during error cleanup')
);

if (thinkingMessageTs) {
try {
await slackClient.chat.delete({ channel, ts: thinkingMessageTs });
} catch {
// Ignore delete errors in error path
}
}

span.end();
return { success: true };
Comment on lines 303 to 326
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💭 Consider: Add span attribute for partial failure tracking

Issue: When content has been delivered but a late error occurs, the function returns { success: true } (correct UX), but the span records the error via setSpanWithError() while still being a "successful" operation. This creates a semantic mismatch in observability.

Why: Being able to distinguish "clean success" from "graceful degradation after partial failure" in dashboards would help track reliability trends without false-alarming on user-facing errors.

Fix: Add a span attribute before returning success:

if (contentAlreadyDelivered) {
  span.setAttribute('slack.partial_failure', true);
  logger.warn(
    { streamError, channel, threadTs, responseLength: fullText.length },
    'Error during Slack streaming after content was already delivered — suppressing user-facing error'
  );
  // ... rest of cleanup
}

Refs:

  • Same pattern as suggested for the finalization timeout case at line 269

}

// No content was delivered — surface the error to the user
logger.error({ streamError }, 'Error during Slack streaming');
await withTimeout(streamer.stop(), CHATSTREAM_OP_TIMEOUT_MS, 'streamer.stop').catch((e) =>
await withTimeout(streamer.stop(), CLEANUP_TIMEOUT_MS, 'streamer.stop-cleanup').catch((e) =>
logger.warn({ error: e }, 'Failed to stop streamer during error cleanup')
);

Expand Down
3 changes: 3 additions & 0 deletions packages/agents-work-apps/src/slack/tracer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,16 @@ export const SLACK_SPAN_KEYS = {
IS_BOT_MESSAGE: 'slack.is_bot_message',
HAS_QUERY: 'slack.has_query',
IS_IN_THREAD: 'slack.is_in_thread',
STREAM_FINALIZATION_FAILED: 'slack.stream_finalization_failed',
CONTENT_ALREADY_DELIVERED: 'slack.content_already_delivered',
} as const;

export type SlackOutcome =
| 'handled'
| 'ignored_bot_message'
| 'ignored_unknown_event'
| 'ignored_no_action_match'
| 'ignored_slack_retry'
| 'url_verification'
| 'validation_error'
| 'signature_invalid'
Expand Down