Building End-to-End Diagnostics and Tracing: Trace Context
Posts in this series:
- An Intro
- Trace Context
- Diagnostic Events
- OpenTelemetry Integration
- Activity and Span Correlation
- Visualization with Exporters
- User-Defined Context with Correlation Context
- ActivitySource and OpenTelemetry 1.0
In the last post, I walked through the overall problem we run into with diagnosing issues in distributed systems - mainly that it can be difficult to determine causality because we don't have that "stack trace" with a single in-process application.
To create a sort of "trace" in a distributed system, we need some way to build breadcrumbs into our communications. When one system communicates with another, and that system calls another, we need some way to link those requests together:
In a downstream system that experiences a failure, how do we link that failure to the original request, and all between? This is where distributed tracing comes in.
Product companies and OSS filled the void, but there became a problem - each product, OSS or not, had its own way of providing additional context to each request to be able to link them together. The solution to causality is rather simple - we just need some context of the parent system/process that initiated the current request. That context is as simple as providing some unique identifier for the current request to all subsequent requests.
Very recently (February 2020), a new W3C standard exited "draft" status entered the "recommendation", Trace Context. This standard describes mainly:
- What is the ID of the current request?
- What is the ID of the parent request?
It also allows requests to include some state information, but most important are those identifications. With an ID and parent ID, we can now create a directed acyclic graph, very similar to what we would see in a Git commit history.
Trace Context in .NET Core
In order to flow tracing identifiers through a request pipeline, regardless of the technology of the "in" and "out" request, we need some means of capturing the incoming tracing identifiers (on headers), storing them, and flowing them to outgoing headers. The basic pieces for this flow are:
- Incoming requests pull trace identifiers and store in an
Activity
Activity.Current
includes any additional information for the current activity- Outgoing requests read information from
Activity.Current
and place on outgoing trace headers.
One of the big pushes for observability in .NET Core 3.0 was to enable this W3C standard. Although it's not turned on by default for backwards compatibility reasons, if you turn it on:
public class Program
{
public static void Main(string[] args)
{
Activity.DefaultIdFormat = ActivityIdFormat.W3C;
CreateHostBuilder(args).Build().Run();
}
That will use the W3C standards for identifiers, but we still need to consume and propagate these trace identifiers. Luckily, we can see how ASP.NET Core and HttpClient did this:
- ASP.NET Core consuming incoming Trace Context headers
- HttpClient enriching outgoing requests from Activity.Current with Trace Context headers
There's a lot more going on in that code that we'll get to soon, but first things first, we need middleware for NServiceBus to:
- Start an Activity for incoming requests and set its parent ID from the
traceparent
header - Set the
traceparent
header for outgoing requests
Luckily for us, NServiceBus has a robust middleware API that makes it easy for us to add these pieces, Behaviors.
Incoming Requests to Activity
The first step in the process for diagnostics is to start an Activity
at the beginning of processing a message and stop it at the end. We need to go one step further to add the appropriate parent ID. We can do this with a behavior defined for incoming messages:
public class ConsumerDiagnostics
: Behavior<IIncomingPhysicalMessageContext>
{
public override async Task Invoke(
IIncomingPhysicalMessageContext context
Func<Task> next)
{
var activity = StartActivity(context);
try
{
await next().ConfigureAwait(false);
}
finally
{
StopActivity(activity, context);
}
}
Behaviors in NServiceBus are similar to ASP.NET Core middleware. You get two parameters, the first being the context of the operation performed, and the second delegate to perform the next action in the chain.
The StartActivity
method needs to do two things - start an Activity
, and set pull the traceparent
header off the incoming message:
private static Activity StartActivity(IIncomingPhysicalMessageContext context)
{
var activity = new Activity(Constants.ConsumerActivityName);
if (!context.MessageHeaders.TryGetValue(
Constants.TraceParentHeaderName,
out var requestId))
{
context.MessageHeaders.TryGetValue(
Constants.RequestIdHeaderName,
out requestId);
}
if (!string.IsNullOrEmpty(requestId))
{
// This is the magic
activity.SetParentId(requestId);
if (context.MessageHeaders.TryGetValue(
Constants.TraceStateHeaderName,
out var traceState))
{
activity.TraceStateString = traceState;
}
}
// The current activity gets an ID with the W3C format
activity.Start();
return activity;
}
We first create an activity with a good name, in my case I chose NServiceBus.Diagnostics.Receive
. There's not a ton of recommendations about naming activities, but it should be something meaningful to the overall operation that's being performed. Activity names are hierarchical for future purposes, so we want to adhere to some sort of namespacing. The ASP.NET Core name is Microsoft.AspNetCore.Hosting.HttpRequestIn
and HttpClient is System.Net.Http.HttpRequestOut
.
After creating the Activity
, I try to pull the traceparent
header value out. I'm also trying to be a good citizen and pull the older, previous request-id
header value out. Once i have this, I can set the ParentId
on the Activity
. Finally, if it exists, I'll pull the tracestate
value and stuff it into the Activity
. There's some more things in store for distributed tracing related to additional correlation context items, but for now, I'll leave that alone.
Finally, I start the activity, and Activity.Current
represents this new activity. Stopping the activity is straightforward - the only thing I really need to care about is setting an end time of the Activity:
private static void StopActivity(Activity activity,
IIncomingPhysicalMessageContext context)
{
if (activity.Duration == TimeSpan.Zero)
{
activity.SetEndTime(DateTime.UtcNow);
}
activity.Stop();
}
Setting an appropriate end time for the activity will mean more later on when we start raising diagnostic events, but we want to make sure the duration of the event is just around calling the next
item in the pipeline.
That's incoming requests, what about outgoing ones?
Propagating trace context in outgoing messages
Just like we have incoming behaviors for messages, NServiceBus has outgoing behaviors as well. We just need to reverse the flow from above - set the traceparent
header on outgoing messages from the current Actvity
:
public class ProducerDiagnostics : Behavior<IOutgoingPhysicalMessageContext>
{
public override async Task Invoke(
IOutgoingPhysicalMessageContext context,
Func<Task> next)
{
var activity = StartActivity(context);
InjectHeaders(activity, context);
try
{
await next().ConfigureAwait(false);
}
finally
{
StopActivity(activity, context);
}
}
Starting the activity is much simpler now:
private static Activity StartActivity(IOutgoingPhysicalMessageContext context)
{
var activity = new Activity(Constants.ProducerActivityName);
activity.Start();
return activity;
}
But wait, we're not setting the parent ID! For outgoing messages, we don't need to. If there's a current started activity, our activity will automatically have its ParentId
set to Activity.Current.Id
, so we're good to go without managing all that ourselves.
Next, we need to inject the headers of the current activity into the outgoing request:
private static void InjectHeaders(
Activity activity,
IOutgoingPhysicalMessageContext context)
{
if (activity.IdFormat == ActivityIdFormat.W3C)
{
if (!context.Headers.ContainsKey(Constants.TraceParentHeaderName))
{
context.Headers[Constants.TraceParentHeaderName] = activity.Id;
if (activity.TraceStateString != null)
{
context.Headers[Constants.TraceStateHeaderName] =
activity.TraceStateString;
}
}
}
else
{
if (!context.Headers.ContainsKey(Constants.RequestIdHeaderName))
{
context.Headers[Constants.RequestIdHeaderName] =
activity.Id;
}
}
}
The new request's parent ID will be this activity's ID, and that new parent ID will be consumed by downstream systems as well.
The magic here is Activity.Current
, an async local static property that means that anything sharing the same async context will get the same Activity.Current
value.
Stopping the activity looks exactly the same as the incoming requests:
private static void StopActivity(
Activity activity,
IOutgoingPhysicalMessageContext context)
{
if (activity.Duration == TimeSpan.Zero)
{
activity.SetEndTime(DateTime.UtcNow);
}
activity.Stop();
}
To enable these behaviors, you can use NServiceBus Features to add these behaviors to the processing pipeline automatically:
public class DiagnosticsFeature : Feature
{
public DiagnosticsFeature()
{
EnableByDefault();
}
protected override void Setup(FeatureConfigurationContext context)
{
context.Pipeline.Register(new ConsumerDiagnostics(),
"Parses incoming W3C trace information from incoming messages.");
context.Pipeline.Register(new ProducerDiagnostics(),
"Appends W3C trace information to outgoing messages.");
}
}
I enable this feature by default, with the future idea that anyone that references this package/assembly will get this behavior opted in. With all of this in place, how does this look in practice?
A Dummy Distributed System
I wanted to simulate all these different kinds of flows, which use a variety of hosts and communication:
- Incoming HTTP to Outgoing NServiceBus
- Incoming NServiceBus to Outgoing HTTP
- Incoming NServiceBus to Outgoing NServiceBus
Incoming HTTP will be a regular ASP.NET Core application and host, and the incoming NServiceBus will be a worker service. I wanted to capture all manners of communication with these two applications:
My Web Server is a web application with this diagnostics code added, plus using the NServiceBus extension to .NET Core generic hosting. I created a simple API controller that uses the NServiceBus IMessageSession
injected to send an AMQP message via RabbitMQ:
[HttpGet]
public async Task<ActionResult> Get(string message)
{
var command = new SaySomething
{
Message = message
};
_logger.LogInformation("Sending message {message}", command.Message);
await _messageSession.Send(command);
return Accepted();
}
The handler of this message on the worker service makes the HTTP call and a Reply:
public class SaySomethingHandler : IHandleMessages<SaySomething>
{
private readonly ILogger<SaySomethingHandler> _logger;
private static readonly HttpClient _httpClient = new HttpClient
{
BaseAddress = new Uri("https://localhost:5001")
};
public SaySomethingHandler(ILogger<SaySomethingHandler> logger)
=> _logger = logger;
public async Task Handle(SaySomething message, IMessageHandlerContext context)
{
var content = await _httpClient.GetStringAsync("/weatherforecast/today");
dynamic json = Deserialize<ExpandoObject>(content);
var temp = (int)json.temperatureF.GetInt32();
_logger.LogInformation("Saying {message} and the weather today is {weather}F", message.Message, temp);
await context.Reply(new SaySomethingResponse
{
Message = $"Back at ya {message.Message}"
});
}
}
The API endpoint is rather dumb, it's the weather dummy data that I stuck in a database:
[HttpGet("today")]
public async Task<WeatherForecast> GetToday()
{
var forecastCount = await _dbContext.Forecasts.CountAsync();
var rng = new Random();
return await _dbContext.Forecasts.Skip(rng.Next(forecastCount)).FirstAsync();
}
And the Reply handler doesn't do anything fun, but it stops the distributed flow:
public class SaySomethingResponseHandler
: IHandleMessages<SaySomethingResponse>
{
private readonly ILogger<SaySomethingResponseHandler> _logger;
public SaySomethingResponseHandler(ILogger<SaySomethingResponseHandler> logger)
=> _logger = logger;
public Task Handle(SaySomethingResponse message, IMessageHandlerContext context)
{
_logger.LogInformation("Received {message}", message.Message);
return Task.CompletedTask;
}
}
With all the pieces in place, let's trace the flow from the initial request all the way through each receiver and out again.
Tracing the flow
It all starts with initiating the request with the Swagger UI:
Logging the Activity.Current.Id
and Activity.Current.ParentId
:
We see that the current activity ID has a value, but the parent ID does not. This makes sense - the Swagger UI doesn't track activities and does not pass a traceparent
header along.
With the message sent, let's look at the message in RabbitMQ to see if it has a traceparent
value that matches the above:
It does! Let's now run the whole system end-to-end and watch the activity IDs in our logs:
We can see that all of our activity IDs share the same trace-id
fragment, while the parent-id
values differ (technical detail, but these link to a span in tracing terms).
With our tracing identifiers correctly propagating, we've laid the groundwork to start putting humpty dumpty together again. In the next post, we'll look at how we can raise diagnostic events so that something can see these traces outside of directly instrumenting our traffic.