Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Weird problem when running two methods inside the Enqueue #2017

Open
luizfbicalho opened this issue Mar 27, 2022 · 3 comments
Open

Weird problem when running two methods inside the Enqueue #2017

luizfbicalho opened this issue Mar 27, 2022 · 3 comments

Comments

@luizfbicalho
Copy link

luizfbicalho commented Mar 27, 2022

I have an extension to enqueue my view models pointing to an implementation of an interface IBackgroundJob

this are my extensions methods

private static readonly ActivitySource activitySource = new("MC.Hangfire.Extensions");

public static string Enqueue<T>(this T job, IBackgroundJobClient client)
{
    return client.Enqueue<IBackgroundJob<T>>(ps => ps.AddTelemetry(null).EnqueueJob(null, job, JobCancellationToken.Null));
}

public static IBackgroundJob<T> AddTelemetry<T>(this IBackgroundJob<T> job, PerformContext context)
{
    using var activity = activitySource.StartActivity($"Start Job {typeof(T).FullName} id {context.BackgroundJob.Id}", ActivityKind.Server);
    activity?.SetTag("JobId", context.BackgroundJob.Id);
    activity?.SetTag("JobJson", Newtonsoft.Json.JsonConvert.SerializeObject(job));
    activity?.SetTag("Job", Newtonsoft.Json.JsonConvert.SerializeObject(context.BackgroundJob.Job));
    return job;
}

My problem is that the EnqueueJob method is called, but the AddTelemetry method is not called before, how can I Add the telemetry information before calling all of my jobs, but in the context of the jobs, and of course not adding this code in all of my enqueue methods?

EDIT:

I'll try to implement this with filters, not sure if it's the best way

@luizfbicalho
Copy link
Author

I copied this filter and did this filter

public class TraceLogFilterAttribute : JobFilterAttribute,
IClientFilter, IServerFilter, IElectStateFilter, IApplyStateFilter
{
    public TraceLogFilterAttribute()
    {

    }

    private static readonly ILog Logger = LogProvider.GetCurrentClassLogger();

    public void OnCreating(CreatingContext context)
    {
        Logger.InfoFormat("Creating a job based on method `{0}`...", context.Job.Method.Name);
    }

    public void OnCreated(CreatedContext context)
    {
        Logger.InfoFormat(
            "Job that is based on method `{0}` has been created with id `{1}`",
            context.Job.Method.Name,
            context.BackgroundJob?.Id);
    }
    private static readonly ActivitySource activitySource = new("MC.Hangfire.Extensions");


    public void OnPerforming(PerformingContext context)
    {
        using var activity = activitySource.StartActivity($"Start Job {context.BackgroundJob.Job.Type.Name}.{context.BackgroundJob.Job.Method.Name}", ActivityKind.Server);
        activity?.SetTag("JobId", context.BackgroundJob.Id);
        activity?.SetTag("JobType", context.BackgroundJob.Job.Type.FullName);
        activity?.SetTag("JobMethod", context.BackgroundJob.Job.Method.Name);
        activity?.SetTag("JobMethod", JsonConvert.SerializeObject(context.BackgroundJob.Job.Args, new JsonSerializerSettings() { ReferenceLoopHandling = ReferenceLoopHandling.Ignore }));
        Logger.InfoFormat("Starting to perform job `{0}`", context.BackgroundJob.Id);
    }

    public void OnPerformed(PerformedContext context)
    {
        Logger.InfoFormat("Job `{0}` has been performed", context.BackgroundJob.Id);
    }

    public void OnStateElection(ElectStateContext context)
    {
        var failedState = context.CandidateState as FailedState;
        if (failedState != null)
        {
            Logger.WarnFormat(
                "Job `{0}` has been failed due to an exception `{1}`",
                context.BackgroundJob.Id,
                failedState.Exception);
        }
    }

    public void OnStateApplied(ApplyStateContext context, IWriteOnlyTransaction transaction)
    {
        Logger.InfoFormat(
            "Job `{0}` state was changed from `{1}` to `{2}`",
            context.BackgroundJob.Id,
            context.OldStateName,
            context.NewState.Name);
    }

    public void OnStateUnapplied(ApplyStateContext context, IWriteOnlyTransaction transaction)
    {
        Logger.InfoFormat(
            "Job `{0}` state `{1}` was unapplied.",
            context.BackgroundJob.Id,
            context.OldStateName);
    }
}

Now I have some issues about this filter

  1. The filter should be Injected by DI, I could just create it as a singleton to the hangfire global filters.
  2. The activity started doesn't flow to the method enqueued, this way I cant measure what happened in my instrumentation correctly
  3. I used the LogProvider.GetCurrentClassLogger(); but my idea was to use he ASP.NET core Log here also.

@sgryphon
Copy link

using var activity = activitySource.StartActivity($"Start Job {context.BackgroundJob.Job.Type.Name}.{context.BackgroundJob.Job.Method.Name}", ActivityKind.Server);

The using will Dispose of the Activity when the handler ends, which will Stop the activity (end it, and revert to parent, which will be empty). So while the first log message will have the trace, anything inside the actual job will not.

Not that you don't need to keep a reference, as Activity.Current will keep a reference for you. (And you can't anyway, as Hangfire serialises the reference data and doesn't pass the actual object)

You can then clean up in OnPerformed by getting Activity.Current and stopping it. You should probably check that the item you are closing matches the one you opened, but if it doesn't it not clear what else to do (check if part of the same trace, and walk up the parent chain? I don't know -- at that point it would be better to be in core Hangfire and close the right reference).

Ideally, this should be part of the main Hangfire code (built in, not as a Filter). Maybe one day I will look at making it a proper pull request (too many other open source projects at the moment).

/// <summary>
/// Start a trace (Activity) for each Hangfire job. Need to configure a listener for 'Hangfire.Server', e.g. OpenTelemetry.
/// </summary>
public class StartActivityHangfireServerFilter : IServerFilter
{
    public const string ActivitySourceName = "Hangfire.Server";

    private readonly ActivitySource _activitySource = new ActivitySource(ActivitySourceName);

    public void OnPerforming(PerformingContext context)
    {
        var activity = _activitySource.StartActivity(
            $"perform {context.BackgroundJob.Job}",
            ActivityKind.Consumer
        );
        activity?.SetTag("job_name", context.BackgroundJob.Job.ToString());
        activity?.SetTag("job_id", context.BackgroundJob.Id);
        context.SetJobParameter("activity.trace_id", activity?.TraceId.ToString());
    }

    public void OnPerformed(PerformedContext context)
    {
        var activity = Activity.Current;
        if (activity?.Status == ActivityStatusCode.Unset)
        {
            activity?.SetStatus(
                context.Exception == null ? ActivityStatusCode.Ok : ActivityStatusCode.Error,
                context.Exception?.Message
            );
        }
        activity?.Stop();
    }
}

@sgryphon
Copy link

I've also put up a PR to include Activity creation in core Hangfier, so distributed trace context is automatically transferred from job creation to job processing, i.e. job processing will always have an activity. #2460

sgryphon added a commit to sgryphon/Hangfire that referenced this issue Nov 17, 2024
Addresses tracing aspects of HangfireIO#2408 for integration with Aspire, as well as all other OpenTelemetery based diagnostics,  and addresses HangfireIO#2017.

Add a default filter to start producer activities (spans) when jobs created, and consumer
activities when jobs performed. Pass the creation context through as TraceParent and TraceState job parameters, so that distributed tracing works across job scheduling.

Note that activity supports is only from netstandard2.0 onwards, and only creates activities if there is a configured listener.
sgryphon added a commit to sgryphon/Hangfire that referenced this issue Nov 23, 2024
Addresses tracing aspects of HangfireIO#2408 for integration with Aspire, as well as all other OpenTelemetery based diagnostics,  and addresses HangfireIO#2017.

Add a default filter to start producer activities (spans) when jobs created, and consumer
activities when jobs performed. Pass the creation context through as TraceParent and TraceState job parameters, so that distributed tracing works across job scheduling.

Note that activity supports is only from netstandard2.0 onwards, and only creates activities if there is a configured listener.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

No branches or pull requests

2 participants