-
Notifications
You must be signed in to change notification settings - Fork 4.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[QUERY] Internal AsEnumerable in EventDataBatch #20568
Comments
Hi @rzepinskip. Thank you for your feedback.
When you add an event to the batch, it has to be measured to enforce the size limitations for publishing to the service. To ensure that the batch is reliable and those measurements remain accurate, we cannot allow changes to events after they've been accepted into the batch. As a result, we make a defensive copy and intentionally do not allow access to the copied events to ensure that they cannot be mutated and potentially invalidate the batch. This also allows us to make certain optimizations to the batch, such as avoiding serializing the event multiple times and instead storing only the serialized form instead of coping the This also fixes a confusing behavior in the legacy library, in which the
Yes, with the caveat that you mentioned. The batch itself is not associated with a specific Event Hub, but it queries the one associated with the producer that creates it in order to reflect the size limit. From the client's perspective, the size is controlled by the service and may change from Event Hub to Event Hub. Speaking practically, that's not how the service actually works today, as the size limit is controlled by the SKU. |
@jsquire Thank you for prompt response.
Right, I just found the copy remark in the TryAdd docs. In code I see a I think the ability to read and ability to change data are two different things and removing the former to guard against the latter comes as surprise to me. Would you be willing to bring read possibility back? I see two possible ways:
I am confused. In
So client is able to support more granular (Event Hub level size limit instead of SKU level) scenario but are there any plans to utilize it (and hence breaking the use case I mentioned)? |
The challenge with that approach is that It also caused confusion due to the lack of strong identity for a event within the Event Hubs ecosystem in general. Because there's no identity concept, developers would often use a reference comparison like
This helps with the first challenge, but is still likely to suffer from the second. This also has two additional considerations; it precludes the optimization discussed in the next response and introduces another type into the hierarchy and increasing overall complexity. It's possible, but would have to be accepted as a uniform change across languages and approved by the SDK architect board. From previous discussions, I see this as likely to meet a strong challenge that we probably don't have enough justification for at the moment. I'll introduce this as a
Yes, that is what I'm referring to. What we're doing now is not what we'd like to be doing. We ran into problems with our transport library that forced us to hold onto the
This isn't a decision made by the client. We react to the maximum allowed size that the service tells us each time we open an AMQP link. The service can send whatever value that it would like at that point and the client is expected to enforce it. Today the size is controlled by the SKU used for the namespace. I do not know of any plans for that to change, but I can't speak authoritatively to what the service may or may not do in the future. |
I understand the "changes not reflected argument" but isn't this the same situation as in
I didn't know identity was the issue. My initial understanding was: on receiving side you could rely on EventHub namespace + name + partition + sequence number combination but having it on producer requires custom implementation (e.g.
Thank you very much!
I understand. Just wanted to know whether our "send the batch to different Event Hub that created it" may stop in next versions. |
I have to admit that after letting this spin in my head for a while, I'm more convinced that opening this up is likely to create more problems and confusion than it solves. The requested functionality can be added into an application in a fairly straightforward way, which has the added benefit of offering developers of the application more explicit awareness of potential unclear behavior. The usage pattern that I'm thinking of would look something like: var connectionString = "<< SOME CONNECTION STRING >>";
var hub = "<< SOME HUB >>";
await using var producer = new EventHubProducerClient(connectionString, hub);
// Create a batch and use it as the source for an application-specific wrapper that
// tracks the source events as they're added.
using var batch = new ObservableEventDataBatch(await producer.CreateBatchAsync());
// Add some events to the batch.
foreach (var body in new[] { "One", "Two", "Three" })
{
if (!batch.TryAdd(new EventData(new BinaryData(body))))
{
throw new Exception($"Could not add all events to the batch. Failed at: { body }.");
}
}
// Iterate through the events that were added.
foreach (var eventData in batch.Events)
{
Debug.WriteLine($"Event Body: { eventData.EventBody.ToString() }");
}
Debug.WriteLine($"There are { batch.Count } events in the batch.");
Debug.WriteLine($"The total size of the batch, in bytes, is { batch.SizeInBytes }");
// Thanks to implicit conversion, the observable batch can be sent just like any
// EventDataBatch instance.
await producer.SendAsync(batch); Where public class ObservableEventDataBatch : IDisposable
{
private List<EventData> _events = new();
private EventDataBatch _batch { get; }
public IReadOnlyList<EventData> Events { get; }
public int Count => Batch.Count;
public long SizeInBytes => Batch.SizeInBytes;
public long MaximumSizeInBytes => _batch.MaximumSizeInBytes;
public ObservableEventDataBatch(EventDataBatch sourceBatch)
{
_batch = sourceBatch ?? throw new ArgumentNullException(nameof(sourceBatch));
Events = _events.AsReadOnly();
}
public bool TryAdd(EventData eventData)
{
if (Batch.TryAdd(eventData))
{
_events.Add(eventData);
return true;
}
return false;
}
public void Dispose() => _batch .Dispose();
public static implicit operator EventDataBatch(ObservableEventDataBatch observable) => observable._batch ;
} I'm still committed to doing a |
This subject of identity for an event comes up quite often. While your statement about the sequence number + entity properties can be used to identify the specific instance of an event in the eyes of the broker, that's not quite the same thing as understanding the application intent. To illustrate: var json = await GetDataFromSomeServiceAsync();
var first = new EventData(new DataBody(json));
var second = new EventData(first.EventBody);
await producer.SendAsync(first);
await producer.SendAsync(second); Those events have the same data and even share the same body instance. Does the application consider those "the same event?" The broker does not. Should the receiving application scrub duplicate data or should it accept the broker's view that "there really are two of these?" What about: var eventData = new EventData(new DataBody(await GetJsonFromServiceAsync()));
try
{
await producer.SendAsync(eventData);
}
catch (TimeoutException)
{
await producer.SendAsync(eventData);
} The same scenario plays out when the publishing application crashes and needs to resume as well as others. In that same category, Event Hubs offers an "at least once" guarantee; a service failure can cause duplication. (it's very rare, but not impossible.) The guidance for this seems straightforward, use an application property that assigns some unique identifier to your data that your application(s) recognize. However, we see this come up consistently as questions and feedback where folks are confused or don't wish to own responsibility for identity. When designing the API for the SDK, we try to do our best to take this into account and not introduce additional areas of confusion. |
We went along the same route few hours ago internally and stumbled upon one minor issue: you may get you objects out of sync if somebody does
Do you plan to add some form of ID field into Event Hubs then or just provide guidance for custom implementation in the docs? |
Agreed; there's no absolute way to prevent this that I can see. Even if we hide the member, you could still cast to
AMQP offers a |
Well, I personally would avoid casting and go with one of these (probably first one):
(Omitted public class ObservableEventDataBatch : IDisposable
{
private readonly List<EventData> _events = new();
internal EventDataBatch Batch { get; }
public IReadOnlyList<EventData> Events { get; }
public int Count => Batch.Count;
public long SizeInBytes => Batch.SizeInBytes;
public long MaximumSizeInBytes => Batch.MaximumSizeInBytes;
public ObservableEventDataBatch(EventDataBatch sourceBatch)
{
Batch = sourceBatch ?? throw new ArgumentNullException(nameof(sourceBatch));
Events = _events.AsReadOnly();
}
public bool TryAdd(EventData eventData)
{
if (Batch.TryAdd(eventData))
{
_events.Add(eventData);
return true;
}
return false;
}
public void Dispose() => Batch.Dispose();
}
public static class EventHubProducerClientExtensions
{
public static Task SendAsync(this EventHubProducerClient producer, ObservableEventDataBatch observableBatch)
{
return producer.SendAsync(observableBatch.Batch);
}
} [2] public class ObservableEventDataBatch : IDisposable
{
private readonly List<EventData> _events = new();
private readonly EventDataBatch _batch;
public IReadOnlyList<EventData> Events { get; }
public SendEventOptions Options { get; }
public int Count => _batch.Count;
public long SizeInBytes => _batch.SizeInBytes;
public long MaximumSizeInBytes => _batch.MaximumSizeInBytes;
internal ObservableEventDataBatch(EventDataBatch sourceBatch, CreateBatchOptions options = null)
{
_batch = sourceBatch ?? throw new ArgumentNullException(nameof(sourceBatch));
Events = _events.AsReadOnly();
Options = options; // We do not have access to EventDataBatch.SendOptions
}
public bool TryAdd(EventData eventData)
{
if (_batch.TryAdd(eventData))
{
_events.Add(eventData);
return true;
}
return false;
}
public void Dispose() => _batch.Dispose();
}
public static class EventHubProducerClientExtensions
{
public static async ValueTask<ObservableEventDataBatch> CreateObservableBatchAsync(this EventHubProducerClient producer, CreateBatchOptions options = null)
{
return new ObservableEventDataBatch(await producer.CreateBatchAsync(options), options);
}
public static Task SendAsync(this EventHubProducerClient producer, ObservableEventDataBatch observableBatch)
{
return producer.SendAsync(observableBatch.Events, observableBatch.Options);
}
}
Seems useful, I will follow it. It was a nice discussion - thanks for detailed responses! Feel free to close the issue as I do not have anything to add anymore - |
After having some internal discussions, this is unlikely to receive enough support to be adopted officially into the client library and supported across languages. The best way forward for now will be to include this as a sample to illustrate the approach and capture additional feedback. I think this works out better than promoting to a pure design discussion. I've opened #20779 and captured much of the discussion here to help drive that sample. Please feel free to use that issue for further discussion. I'm going to close this out, since we've got our work-around and next steps. |
Query/Question
I am migrating the code using
Microsoft.Azure.EventHubs
toAzure.Messaging.EventHubs
SDK and we use create and sendEventDataBatch
. Previously, this class had publicToEnumerable
(docs) method and the current version of the library only has an internalAsEnumerable
(source code). What is the reasoning behind this change?ToEnumerable
was useful for:EventHubsModelFactory.EventDataBatch
in the new library but then you have to keep reference between an instance ofEventDataBatch
and its items to verify it in the Send call.[1] Is calling a
CreateBatchAsync()
on one Event Hub and sending resulting object to another Event Hub even supported? I tested that it works (as long as these Event Hubs has the same maximum batch size (e.g. Standard tier)) but I am not sure whether this is intended.Environment:
dotnet --info
output for .NET Core projects): Azure Cloud Service, .Net Framework 4.7.2The text was updated successfully, but these errors were encountered: