Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Event Hubs] Improve EventPosition class #3182

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions sdk/eventhub/event-hubs/src/eventHubReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { ReceiveOptions } from "./eventHubClient";
import { ConnectionContext } from "./connectionContext";
import { LinkEntity } from "./linkEntity";
import { EventPosition } from "./eventPosition";
import { getEventPositionFilter } from "./util/utils";

interface CreateReceiverOptions {
onMessage: OnAmqpEvent;
Expand Down Expand Up @@ -181,9 +182,7 @@ export class EventHubReceiver extends LinkEntity {
this.epoch = options.epoch;
this.options = options;
this.receiverRuntimeMetricEnabled = options.enableReceiverRuntimeMetric || false;
this.runtimeInfo = {

};
this.runtimeInfo = {};
this._checkpoint = {
enqueuedTimeUtc: new Date(),
offset: "0",
Expand Down Expand Up @@ -611,7 +610,7 @@ export class EventHubReceiver extends LinkEntity {
const eventPosition = options.eventPosition || this.options.eventPosition;
if (eventPosition) {
// Set filter on the receiver if event position is specified.
const filterClause = eventPosition.getExpression();
const filterClause = getEventPositionFilter(eventPosition);
if (filterClause) {
(rcvrOptions.source as any).filter = {
"apache.org:selector-filter:string": types.wrap_described(filterClause, 0x468c00000004)
Expand Down
63 changes: 7 additions & 56 deletions sdk/eventhub/event-hubs/src/eventPosition.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

import { translate, Constants, ErrorNameConditionMapper } from "@azure/amqp-common";

/**
* Describes the options that can be set while creating an EventPosition.
* @ignore
Expand Down Expand Up @@ -44,17 +42,20 @@ export interface EventPositionOptions {
*/
export class EventPosition {
/**
* @property {string} startOfStream The offset from which events would be received: `"-1"`.
* @property {EventPosition} firstAvailableEvent Returns the position for the start of a stream.
* Provide this position in receiver creation to start receiving from the first available event in the partition.
* @static
* @readonly
*/
private static readonly startOfStream: string = "-1";
static readonly firstAvailableEvent: EventPosition = EventPosition.fromOffset("-1");
/**
* @property {string} endOfStream The offset from which events would be received: `"@latest"`.
* @property {EventPosition} newEventsOnly Returns the position for the end of a stream.
* Provide this position in receiver creation to start receiving from the next available event in the partition
* after the receiver is created.
* @static
* @readonly
*/
private static readonly endOfStream: string = "@latest";
static readonly newEventsOnly: EventPosition = EventPosition.fromOffset("@latest");
/**
* @property {string} [offset] The offset of the event at the position. It can be undefined
* if the position is just created from a sequence number or an enqueued time.
Expand Down Expand Up @@ -96,38 +97,6 @@ export class EventPosition {
}
}

/**
* @internal
* Gets the expression (filter clause) that needs to be set on the source.
* @return {string} filterExpression
*/
getExpression(): string {
let result;
// order of preference
if (this.offset != undefined) {
result = this.isInclusive
? `${Constants.offsetAnnotation} >= '${this.offset}'`
: `${Constants.offsetAnnotation} > '${this.offset}'`;
} else if (this.sequenceNumber != undefined) {
result = this.isInclusive
? `${Constants.sequenceNumberAnnotation} >= '${this.sequenceNumber}'`
: `${Constants.sequenceNumberAnnotation} > '${this.sequenceNumber}'`;
} else if (this.enqueuedTime != undefined) {
const time = this.enqueuedTime instanceof Date ? this.enqueuedTime.getTime() : this.enqueuedTime;
result = `${Constants.enqueuedTimeAnnotation} > '${time}'`;
// } else if (this.customFilter != undefined) {
// result = this.customFilter;
}

if (!result) {
throw translate({
condition: ErrorNameConditionMapper.ArgumentError,
description: "No starting position was set in the EventPosition."
});
}
return result;
}

/**
* Creates a position at the given offset.
* @param {string} offset The offset of the data relative to the Event Hub partition stream.
Expand Down Expand Up @@ -183,22 +152,4 @@ export class EventPosition {
// }
// return new EventPosition({ customFilter: customFilter });
// }

/**
* Returns the position for the start of a stream. Provide this position in receiver creation to
* start receiving from the first available event in the partition.
* @return {EventPosition} EventPosition
*/
static fromFirstAvailable(): EventPosition {
return EventPosition.fromOffset(EventPosition.startOfStream);
}

/**
* Returns the position for the end of a stream. Provide this position in receiver creation to
* start receiving from the next available event in the partition after the receiver is created.
* @return {EventPosition} EventPosition
*/
static fromLastAvailable(): EventPosition {
return EventPosition.fromOffset(EventPosition.endOfStream);
}
}
33 changes: 33 additions & 0 deletions sdk/eventhub/event-hubs/src/util/utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import { EventPosition } from "../eventPosition";
import { translate, Constants, ErrorNameConditionMapper } from "@azure/amqp-common";

/**
* @internal
* Gets the expression to be set as the filter clause when creating the receiver
* @return {string} filterExpression
*/
export function getEventPositionFilter(eventPosition: EventPosition): string {
let result;
// order of preference
if (eventPosition.offset != undefined) {
result = eventPosition.isInclusive
? `${Constants.offsetAnnotation} >= '${eventPosition.offset}'`
: `${Constants.offsetAnnotation} > '${eventPosition.offset}'`;
} else if (eventPosition.sequenceNumber != undefined) {
result = eventPosition.isInclusive
? `${Constants.sequenceNumberAnnotation} >= '${eventPosition.sequenceNumber}'`
: `${Constants.sequenceNumberAnnotation} > '${eventPosition.sequenceNumber}'`;
} else if (eventPosition.enqueuedTime != undefined) {
const time =
eventPosition.enqueuedTime instanceof Date ? eventPosition.enqueuedTime.getTime() : eventPosition.enqueuedTime;
result = `${Constants.enqueuedTimeAnnotation} > '${time}'`;
}

if (!result) {
throw translate({
condition: ErrorNameConditionMapper.ArgumentError,
description: "No starting position was set in the EventPosition."
});
}
return result;
}