Skip to content

Commit

Permalink
fix: make async webhooks fully async (#3111)
Browse files Browse the repository at this point in the history
  • Loading branch information
zepatrik authored Feb 17, 2023
1 parent bb12fe7 commit 342bfb0
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 62 deletions.
126 changes: 64 additions & 62 deletions selfservice/hook/web_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,63 +266,75 @@ func (e *WebHook) ExecuteSettingsPrePersistHook(_ http.ResponseWriter, req *http
}

func (e *WebHook) execute(ctx context.Context, data *templateContext) error {
builder, err := request.NewBuilder(e.conf, e.deps)
if err != nil {
return err
}

req, err := builder.BuildRequest(ctx, data)
if errors.Is(err, request.ErrCancel) {
return nil
} else if err != nil {
return err
}

attrs := semconv.HTTPClientAttributesFromHTTPRequest(req.Request)
if data.Identity != nil {
attrs = append(attrs,
attribute.String("webhook.identity.id", data.Identity.ID.String()),
attribute.String("webhook.identity.nid", data.Identity.NID.String()),
)
}

var (
httpClient = e.deps.HTTPClient(ctx)
ignoreResponse = gjson.GetBytes(e.conf, "response.ignore").Bool()
canInterrupt = gjson.GetBytes(e.conf, "can_interrupt").Bool()
parseResponse = gjson.GetBytes(e.conf, "response.parse").Bool()
tracer = trace.SpanFromContext(ctx).TracerProvider().Tracer("kratos-webhooks")
spanOpts = []trace.SpanStartOption{trace.WithAttributes(attrs...)}
errChan = make(chan error, 1)
)

if ignoreResponse && (parseResponse || canInterrupt) {
return errors.WithStack(herodot.ErrInternalServerError.WithReasonf("A webhook is configured to ignore the response but also to parse the response. This is not possible."))
}

ctx, span := tracer.Start(ctx, "selfservice.webhook", spanOpts...)
e.deps.Logger().WithRequest(req.Request).Info("Dispatching webhook")

req = req.WithContext(ctx)
if ignoreResponse {
// This is one of the few places where spawning a context.Background() is ok. We need to do this
// because the function runs asynchronously and we don't want to cancel the request if the
// incoming request context is cancelled.
//
// The webhook will still cancel after 30 seconds as that is the configured timeout for the HTTP client.
req = req.WithContext(trace.ContextWithSpan(context.Background(), span))
}

startTime := time.Now()
go func() {
defer close(errChan)
makeRequest := func() (finalErr error) {
if ignoreResponse {
// This is one of the few places where spawning a context.Background() is ok. We need to do this
// because the function runs asynchronously and we don't want to cancel the request if the
// incoming request context is cancelled.
//
// The webhook will still cancel after 30 seconds as that is the configured timeout for the HTTP client.
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(context.Background(), httpClient.HTTPClient.Timeout)
defer cancel()
}
ctx, span := tracer.Start(ctx, "selfservice.webhook")
defer span.End()
startTime := time.Now()

defer func() {
traceID, spanID := span.SpanContext().TraceID(), span.SpanContext().SpanID()
logger := e.deps.Logger().WithField("otel", map[string]string{
"trace_id": traceID.String(),
"span_id": spanID.String(),
})
if finalErr != nil {
if ignoreResponse {
logger.WithField("duration", time.Since(startTime)).WithError(finalErr).Warning("Webhook request failed but the error was ignored because the configuration indicated that the upstream response should be ignored.")
}
} else {
logger.WithField("duration", time.Since(startTime)).Info("Webhook request succeeded")
}
}()

builder, err := request.NewBuilder(e.conf, e.deps)
if err != nil {
return err
}

req, err := builder.BuildRequest(ctx, data)
if errors.Is(err, request.ErrCancel) {
return nil
} else if err != nil {
return err
}

span.SetAttributes(semconv.HTTPClientAttributesFromHTTPRequest(req.Request)...)
if data.Identity != nil {
span.SetAttributes(
attribute.String("webhook.identity.id", data.Identity.ID.String()),
attribute.String("webhook.identity.nid", data.Identity.NID.String()),
)
}

e.deps.Logger().WithRequest(req.Request).Info("Dispatching webhook")

req = req.WithContext(ctx)

resp, err := httpClient.Do(req)
if err != nil {
span.SetStatus(codes.Error, err.Error())
errChan <- errors.WithStack(err)
return
return errors.WithStack(err)
}
defer resp.Body.Close()
span.SetAttributes(semconv.HTTPAttributesFromHTTPStatusCode(resp.StatusCode)...)
Expand All @@ -332,40 +344,30 @@ func (e *WebHook) execute(ctx context.Context, data *templateContext) error {
if canInterrupt || parseResponse {
if err := parseWebhookResponse(resp, data.Identity); err != nil {
span.SetStatus(codes.Error, err.Error())
errChan <- err
return err
}
}
errChan <- fmt.Errorf("webhook failed with status code %v", resp.StatusCode)
return
return fmt.Errorf("webhook failed with status code %v", resp.StatusCode)
}

if parseResponse {
if err := parseWebhookResponse(resp, data.Identity); err != nil {
span.SetStatus(codes.Error, err.Error())
errChan <- err
return err
}
}

errChan <- nil
}()

if ignoreResponse {
traceID, spanID := span.SpanContext().TraceID(), span.SpanContext().SpanID()
logger := e.deps.Logger().WithField("otel", map[string]string{
"trace_id": traceID.String(),
"span_id": spanID.String(),
})
go func() {
if err := <-errChan; err != nil {
logger.WithField("duration", time.Since(startTime)).WithError(err).Warning("Webhook request failed but the error was ignored because the configuration indicated that the upstream response should be ignored.")
} else {
logger.WithField("duration", time.Since(startTime)).Info("Webhook request succeeded")
}
}()
return nil
}

return <-errChan
if !ignoreResponse {
return makeRequest()
}
go func() {
// we cannot handle the error as we are running async, and it is logged anyway
_ = makeRequest()
}()
return nil
}

func parseWebhookResponse(resp *http.Response, id *identity.Identity) (err error) {
Expand Down
17 changes: 17 additions & 0 deletions selfservice/hook/web_hook_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -749,6 +749,23 @@ func TestWebHooks(t *testing.T) {
assert.Error(t, err)
})

t.Run("must not error when template is erroneous and responses are ignored", func(t *testing.T) {
ts := newServer(webHookHttpCodeEndPoint(200))
req := &http.Request{
Header: map[string][]string{"Some-Header": {"Some-Value"}},
Host: "www.ory.sh",
TLS: new(tls.ConnectionState),
URL: &url.URL{Path: "/some_end_point"},
Method: http.MethodPost,
}
f := &login.Flow{ID: x.NewUUID()}
conf := json.RawMessage(fmt.Sprintf(`{"url": "%s", "method": "GET", "body": "file://./stub/bad_template.jsonnet", "response": {"ignore": true}}`, ts.URL+path))
wh := hook.NewWebHook(&whDeps, conf)

err := wh.ExecuteLoginPreHook(nil, req, f)
assert.NoError(t, err)
})

t.Run("must not make request", func(t *testing.T) {
req := &http.Request{
Header: map[string][]string{"Some-Header": {"Some-Value"}},
Expand Down

0 comments on commit 342bfb0

Please sign in to comment.