From 738c7999ebaafe1ee91cf7f20f6b568b4b81638d Mon Sep 17 00:00:00 2001 From: Sid Krishna Date: Mon, 7 Dec 2020 18:22:16 -0800 Subject: [PATCH] Merging dev to master to release version 4.2.0 (#82) * Add EventHub output logging. Fixes #60. * Unifying logging of exceptions. Fixes #59. * Fix nuget errors on pack (#73) * Increasing max message size and batch size. Fixes #29 * User configurable initial offset support (#79) * Instructions via Readme.md for setting up local environment to run integration tests Co-authored-by: Alexey Rodionov Co-authored-by: Pragna Gopa Co-authored-by: Sid Krishna --- release_notes.md | 9 + .../Config/EventHubExtensionConfigProvider.cs | 16 +- .../Config/EventHubOptions.cs | 16 +- .../EventHubWebJobsBuilderExtensions.cs | 37 +++- .../Config/InitialOffsetOptions.cs | 24 +++ .../Listeners/EventHubListener.cs | 15 +- .../Triggers/EventHubAsyncCollector.cs | 16 +- .../Utility.cs | 100 +++++++++++ .../WebJobs.Extensions.EventHubs.csproj | 9 +- .../webjobs.png | Bin 0 -> 14849 bytes .../EventHubApplicationInsightsTest.cs | 4 +- .../EventHubAsyncCollectorTests.cs | 14 +- .../EventHubConfigurationTests.cs | 56 ++++-- .../EventHubEndToEndTests.cs | 162 ++++++++++++++++-- .../EventHubListenerTests.cs | 10 +- .../PublicSurfaceTests.cs | 1 + .../WebJobs.Extensions.EventHubs.Tests.csproj | 4 +- test/README.md | 24 +++ 18 files changed, 442 insertions(+), 75 deletions(-) create mode 100644 release_notes.md create mode 100644 src/Microsoft.Azure.WebJobs.Extensions.EventHubs/Config/InitialOffsetOptions.cs create mode 100644 src/Microsoft.Azure.WebJobs.Extensions.EventHubs/Utility.cs create mode 100644 src/Microsoft.Azure.WebJobs.Extensions.EventHubs/webjobs.png create mode 100644 test/README.md diff --git a/release_notes.md b/release_notes.md new file mode 100644 index 0000000..16f9c61 --- /dev/null +++ b/release_notes.md @@ -0,0 +1,9 @@ +### Release notes + +#### Version 4.2.0 +- User configurable initial offset support [#79](https://github.com/Azure/azure-functions-eventhubs-extension/pull/79) + +**Release sprint:** Sprint 87 +[ [bugs](https://github.com/Azure/azure-functions-host/issues?q=is%3Aissue+milestone%3A%22Functions+Sprint+87%22+label%3Abug+is%3Aclosed) | [features](https://github.com/Azure/azure-functions-host/issues?q=is%3Aissue+milestone%3A%22Functions+Sprint+87%22+label%3Afeature+is%3Aclosed) ] diff --git a/src/Microsoft.Azure.WebJobs.Extensions.EventHubs/Config/EventHubExtensionConfigProvider.cs b/src/Microsoft.Azure.WebJobs.Extensions.EventHubs/Config/EventHubExtensionConfigProvider.cs index e9fc388..f338cdc 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.EventHubs/Config/EventHubExtensionConfigProvider.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.EventHubs/Config/EventHubExtensionConfigProvider.cs @@ -86,18 +86,10 @@ public void Initialize(ExtensionConfigContext context) internal static void LogExceptionReceivedEvent(ExceptionReceivedEventArgs e, ILoggerFactory loggerFactory) { - try - { - var logger = loggerFactory?.CreateLogger(LogCategories.Executor); - string message = $"EventProcessorHost error (Action={e.Action}, HostName={e.Hostname}, PartitionId={e.PartitionId})"; + var logger = loggerFactory?.CreateLogger(LogCategories.Executor); + string message = $"EventProcessorHost error (Action='{e.Action}', HostName='{e.Hostname}', PartitionId='{e.PartitionId}')."; - var logLevel = GetLogLevel(e.Exception); - logger?.Log(logLevel, 0, message, e.Exception, (s, ex) => message); - } - catch - { - // best effort logging - } + Utility.LogException(e.Exception, message, logger); } private static LogLevel GetLogLevel(Exception ex) @@ -129,7 +121,7 @@ private static LogLevel GetLogLevel(Exception ex) private IAsyncCollector BuildFromAttribute(EventHubAttribute attribute) { EventHubClient client = _options.Value.GetEventHubClient(attribute.EventHubName, attribute.Connection); - return new EventHubAsyncCollector(client); + return new EventHubAsyncCollector(client, _loggerFactory); } private static string ConvertEventData2String(EventData x) diff --git a/src/Microsoft.Azure.WebJobs.Extensions.EventHubs/Config/EventHubOptions.cs b/src/Microsoft.Azure.WebJobs.Extensions.EventHubs/Config/EventHubOptions.cs index 0320a23..df73e0a 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.EventHubs/Config/EventHubOptions.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.EventHubs/Config/EventHubOptions.cs @@ -38,6 +38,7 @@ public EventHubOptions() { EventProcessorOptions = EventProcessorOptions.DefaultOptions; PartitionManagerOptions = new PartitionManagerOptions(); + InitialOffsetOptions = new InitialOffsetOptions(); } /// @@ -60,6 +61,8 @@ public int BatchCheckpointFrequency } } + public InitialOffsetOptions InitialOffsetOptions { get; set; } + public EventProcessorOptions EventProcessorOptions { get; } public PartitionManagerOptions PartitionManagerOptions { get; } @@ -390,11 +393,22 @@ public string Format() }; } + JObject initialOffsetOptions = null; + if (InitialOffsetOptions != null) + { + initialOffsetOptions = new JObject + { + { nameof(InitialOffsetOptions.Type), InitialOffsetOptions.Type }, + { nameof(InitialOffsetOptions.EnqueuedTimeUTC), InitialOffsetOptions.EnqueuedTimeUTC }, + }; + } + JObject options = new JObject { { nameof(BatchCheckpointFrequency), BatchCheckpointFrequency }, { nameof(EventProcessorOptions), eventProcessorOptions }, - { nameof(PartitionManagerOptions), partitionManagerOptions } + { nameof(PartitionManagerOptions), partitionManagerOptions }, + { nameof(InitialOffsetOptions), initialOffsetOptions } }; return options.ToString(Formatting.Indented); diff --git a/src/Microsoft.Azure.WebJobs.Extensions.EventHubs/Config/EventHubWebJobsBuilderExtensions.cs b/src/Microsoft.Azure.WebJobs.Extensions.EventHubs/Config/EventHubWebJobsBuilderExtensions.cs index d4d0e4c..f611409 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.EventHubs/Config/EventHubWebJobsBuilderExtensions.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.EventHubs/Config/EventHubWebJobsBuilderExtensions.cs @@ -2,7 +2,7 @@ // Licensed under the MIT License. See License.txt in the project root for license information. using System; -using Microsoft.Azure.EventHubs.Processor; +using Microsoft.Azure.EventHubs; using Microsoft.Azure.WebJobs; using Microsoft.Azure.WebJobs.EventHubs; using Microsoft.Extensions.DependencyInjection; @@ -18,7 +18,7 @@ public static IWebJobsBuilder AddEventHubs(this IWebJobsBuilder builder) throw new ArgumentNullException(nameof(builder)); } - builder.AddEventHubs(p => {}); + builder.AddEventHubs(ConfigureOptions); return builder; } @@ -45,5 +45,38 @@ public static IWebJobsBuilder AddEventHubs(this IWebJobsBuilder builder, Action< return builder; } + + internal static void ConfigureOptions(EventHubOptions options) + { + string offsetType = options?.InitialOffsetOptions?.Type?.ToLower() ?? String.Empty; + if (!offsetType.Equals(String.Empty)) + { + switch (offsetType) + { + case "fromstart": + options.EventProcessorOptions.InitialOffsetProvider = (s) => { return EventPosition.FromStart(); }; + break; + case "fromend": + options.EventProcessorOptions.InitialOffsetProvider = (s) => { return EventPosition.FromEnd(); }; + break; + case "fromenqueuedtime": + try + { + DateTime enqueuedTimeUTC = DateTime.Parse(options.InitialOffsetOptions.EnqueuedTimeUTC).ToUniversalTime(); + options.EventProcessorOptions.InitialOffsetProvider = (s) => { return EventPosition.FromEnqueuedTime(enqueuedTimeUTC); }; + } + catch (System.FormatException fe) + { + string message = $"{nameof(EventHubOptions)}:{nameof(InitialOffsetOptions)}:{nameof(InitialOffsetOptions.EnqueuedTimeUTC)} is configured with an invalid format. " + + "Please use a format supported by DateTime.Parse(). e.g. 'yyyy-MM-ddTHH:mm:ssZ'"; + throw new InvalidOperationException(message, fe); + } + break; + default: + throw new InvalidOperationException("An unsupported value was supplied for initialOffsetOptions.type"); + } + // If not specified, EventProcessor's default offset will apply + } + } } } diff --git a/src/Microsoft.Azure.WebJobs.Extensions.EventHubs/Config/InitialOffsetOptions.cs b/src/Microsoft.Azure.WebJobs.Extensions.EventHubs/Config/InitialOffsetOptions.cs new file mode 100644 index 0000000..877492f --- /dev/null +++ b/src/Microsoft.Azure.WebJobs.Extensions.EventHubs/Config/InitialOffsetOptions.cs @@ -0,0 +1,24 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Globalization; +using System.Text; +using Microsoft.Azure.EventHubs; +using Microsoft.Azure.EventHubs.Processor; +using Microsoft.Azure.WebJobs.Hosting; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.Options; +using Newtonsoft.Json; +using Newtonsoft.Json.Linq; + +namespace Microsoft.Azure.WebJobs.EventHubs +{ + public class InitialOffsetOptions + { + public string Type { get; set; } = ""; + public string EnqueuedTimeUTC { get; set; } = ""; + } +} diff --git a/src/Microsoft.Azure.WebJobs.Extensions.EventHubs/Listeners/EventHubListener.cs b/src/Microsoft.Azure.WebJobs.Extensions.EventHubs/Listeners/EventHubListener.cs index 5a78573..767b341 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.EventHubs/Listeners/EventHubListener.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.EventHubs/Listeners/EventHubListener.cs @@ -145,20 +145,9 @@ public Task OpenAsync(PartitionContext context) public Task ProcessErrorAsync(PartitionContext context, Exception error) { - string errorDetails = $"Partition Id: '{context.PartitionId}', Owner: '{context.Owner}', EventHubPath: '{context.EventHubPath}'"; + string errorDetails = $"Processing error (Partition Id: '{context.PartitionId}', Owner: '{context.Owner}', EventHubPath: '{context.EventHubPath}')."; - if (error is ReceiverDisconnectedException || - error is LeaseLostException) - { - // For EventProcessorHost these exceptions can happen as part - // of normal partition balancing across instances, so we want to - // trace them, but not treat them as errors. - _logger.LogInformation($"An Event Hub exception of type '{error.GetType().Name}' was thrown from {errorDetails}. This exception type is typically a result of Event Hub processor rebalancing and can be safely ignored."); - } - else - { - _logger.LogError(error, $"Error processing event from {errorDetails}"); - } + Utility.LogException(error, errorDetails, _logger); return Task.CompletedTask; } diff --git a/src/Microsoft.Azure.WebJobs.Extensions.EventHubs/Triggers/EventHubAsyncCollector.cs b/src/Microsoft.Azure.WebJobs.Extensions.EventHubs/Triggers/EventHubAsyncCollector.cs index 7057acd..faa3d36 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.EventHubs/Triggers/EventHubAsyncCollector.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.EventHubs/Triggers/EventHubAsyncCollector.cs @@ -7,13 +7,15 @@ using System.Threading; using System.Threading.Tasks; using Microsoft.Azure.EventHubs; +using Microsoft.Azure.WebJobs.Logging; +using Microsoft.Extensions.Logging; namespace Microsoft.Azure.WebJobs.EventHubs { /// /// Core object to send events to EventHub. /// Any user parameter that sends EventHub events will eventually get bound to this object. - /// This will queue events and send in batches, also keeping under the 256kb event hub limit per batch. + /// This will queue events and send in batches, also keeping under the 1024kb event hub limit per batch. /// internal class EventHubAsyncCollector : IAsyncCollector { @@ -23,20 +25,23 @@ internal class EventHubAsyncCollector : IAsyncCollector private const int BatchSize = 100; - // Suggested to use 240k instead of 256k to leave padding room for headers. - private const int MaxByteSize = 240 * 1024; - + // Suggested to use 1008k instead of 1024k to leave padding room for headers. + private const int MaxByteSize = 1008 * 1024; + + private readonly ILogger _logger; + /// /// Create a sender around the given client. /// /// - public EventHubAsyncCollector(EventHubClient client) + public EventHubAsyncCollector(EventHubClient client, ILoggerFactory loggerFactory) { if (client == null) { throw new ArgumentNullException("client"); } _client = client; + _logger = loggerFactory?.CreateLogger(LogCategories.Executor); } /// @@ -96,6 +101,7 @@ public EventHubAsyncCollector(EventHubClient client) /// the set of events to send protected virtual async Task SendBatchAsync(IEnumerable batch) { + _logger?.LogDebug("Sending events to EventHub"); await _client.SendAsync(batch); } diff --git a/src/Microsoft.Azure.WebJobs.Extensions.EventHubs/Utility.cs b/src/Microsoft.Azure.WebJobs.Extensions.EventHubs/Utility.cs new file mode 100644 index 0000000..5172754 --- /dev/null +++ b/src/Microsoft.Azure.WebJobs.Extensions.EventHubs/Utility.cs @@ -0,0 +1,100 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using Microsoft.Azure.EventHubs; +using Microsoft.Azure.EventHubs.Processor; +using Microsoft.Extensions.Logging; +using Microsoft.WindowsAzure.Storage; +using LogLevel = Microsoft.Extensions.Logging.LogLevel; + +namespace Microsoft.Azure.WebJobs.EventHubs +{ + internal class Utility + { + public static void LogException(Exception ex, string message, ILogger logger) + { + try + { + // Sometimes EventHub SDK aggregates an exception + AggregateException ae = ex as AggregateException; + if (ae != null && ae.InnerExceptions != null && ae.InnerExceptions.Count == 1) + { + ex = ae.InnerExceptions[0]; + } + + LogLevel logLevel = GetLevel(ex); + if (logLevel == LogLevel.Information) + { + message = $"{message} An exception of type '{ex.GetType().Name}' was thrown. This exception type is typically a result of Event Hub processor rebalancing or a transient error and can be safely ignored."; + } + logger?.Log(logLevel, 0, message, ex, (s, exc) => message); + } + catch + { + // best effort logging + } + } + + private static LogLevel GetLevel(Exception ex) + { + if (ex == null) + { + throw new ArgumentNullException("ex"); + } + + if (ex is ReceiverDisconnectedException || ex is LeaseLostException + || IsConflictLeaseIdMismatchWithLeaseOperation(ex)) + { + // For EventProcessorHost these exceptions can happen as part + // of normal partition balancing across instances, so we want to + // trace them, but not treat them as errors. + return LogLevel.Information; + } + + var ehex = ex as EventHubsException; + if (!(ex is OperationCanceledException) && (ehex == null || !ehex.IsTransient)) + { + // any non-transient exceptions or unknown exception types + // we want to log as errors + return LogLevel.Error; + } + else + { + // transient messaging errors we log as info so we have a record + // of them, but we don't treat them as actual errors + return LogLevel.Information; + } + } + + public static bool IsConflictLeaseIdMismatchWithLeaseOperation(Exception ex) + { + StorageException exception = ex as StorageException; + if (exception == null) + { + return false; + } + + RequestResult result = exception.RequestInformation; + + if (result == null) + { + return false; + } + + if (result.HttpStatusCode != 409) + { + return false; + } + + StorageExtendedErrorInformation extendedInformation = result.ExtendedErrorInformation; + + if (extendedInformation == null) + { + return false; + } + + return extendedInformation.ErrorCode == "LeaseIdMismatchWithLeaseOperation"; + } + } +} diff --git a/src/Microsoft.Azure.WebJobs.Extensions.EventHubs/WebJobs.Extensions.EventHubs.csproj b/src/Microsoft.Azure.WebJobs.Extensions.EventHubs/WebJobs.Extensions.EventHubs.csproj index e2b5c3a..248e994 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.EventHubs/WebJobs.Extensions.EventHubs.csproj +++ b/src/Microsoft.Azure.WebJobs.Extensions.EventHubs/WebJobs.Extensions.EventHubs.csproj @@ -6,14 +6,15 @@ Microsoft.Azure.WebJobs.EventHubs Microsoft.Azure.WebJobs.Extensions.EventHubs Microsoft Azure WebJobs SDK EventHubs Extension - 4.1.1 + 4.2.0 N/A $(Version) Commit hash: $(CommitHash) Microsoft Microsoft © Microsoft Corporation. All rights reserved. - https://go.microsoft.com/fwlink/?linkid=2028464 - https://raw.githubusercontent.com/Azure/azure-webjobs-sdk/dev/webjobs.png + True + MIT + webjobs.png http://go.microsoft.com/fwlink/?LinkID=320972 git https://github.com/Azure/azure-functions-servicebus-extension @@ -38,7 +39,7 @@ - + all diff --git a/src/Microsoft.Azure.WebJobs.Extensions.EventHubs/webjobs.png b/src/Microsoft.Azure.WebJobs.Extensions.EventHubs/webjobs.png new file mode 100644 index 0000000000000000000000000000000000000000..831f064bfbe73617e373629cb208266368242cbc GIT binary patch literal 14849 zcmd^m^b|{6nN07-*`k4%CgYZ;`&h+|_QV0YKw@^qCVJ0HA=efw^WlsYFMIN5@4+M+e8n z1;-@?$EEovWyX=(Ntym>Io_ECNl8imnFZcCpsi-o;H$BIpGC0X}P)n#dWUb9cj4*(FK)hxy6~exp4*M z!Nql+Vzhkbvrp~i# zDYL07xMM1_sn@$}DW|C`sdYHKW2&a6#=mPWt#!b^YbmZ{EVyefu4A&GsVlr|&cAo5 zuC6Y)ceS#qHN1Bzymz&{wX3PADW_{JdSEqfV70ZiwWgytX?Qhhcs*@+y`!Teb8Nk< zt1D-0J$G!qx3{--U@UiXy;Yw zlaozLJFQDQ!*fehQ&SzQI|EDW9jkkDb923<>(bJ|`rgvg((wA;>gwv)_ruBWhwJO> zQ#*%qJBQ!De_z@=T-rO?+1Xj$J6R{y!;`(ez3+!7JBKHShle{SCwu?u#o@`t$-i=O za`CT_ruY9_`BM4k7fJJHkg1{B|9blW)!{Ye{`L+LR`(! z)lf4Jv;SHA*V%S9eEPNj1BGfEIi!uL{fJGRdKr%A%G=jLv)QvZio9Ago5!B_|G13k z-)?~or=y|XBDBBEaXiM^=(>;jy!yQ}OO1Z=|g?ucP;mH)#kN3_bih(G}Nu7c{MFE1uMF_nOl%1L<5{)ZdK zR7U8JNCMlXC9c0pGQX5=rmNXPJpS&<&E2JV+{NdfSHk&;TM&$TcQ>ydF9~zKvn5hM z{*(sad9lGpG&>cgPLF!tyAcAi%Z!wWsVVl zkaCm>Xu`$$gH@4vmXuR2@bw3ygDlAgKsHwQ-Ly5eFUlo99B|l1@>c<}ANGT9tt@Hx zex~cBh0)_B?mla5BTNnUhOPbAf=)m>?%N%gXN0%!e>9Hfm4n8?8azYZv|(mrx(cHA zOm)-|Wv>sK^RqwHnKP!Mn~mng2oAug*6lTM{DsFX1sj9t%HG3o?0e$4soy(QPXhwE zDkL(ql7u`JH?4q6%GBdlkE~w9>a#MwuRi1h>miooKel~(y0@%w7K2)^b}RU^!szin zV+}8#KAybAVb962_JPQd!tM+4xQbd@WI0YMc^+okaP^asMrMu#_(S46`WC?=;`Ti> zLpW_nLJTb1?(B$VQ2d!cYl7V!eWE}L6BlHNBz3vx7o}3FsI;Zy9m=k64ZIS#xcA-E z(!(7T689g@uGo|J!-?4DH#)0Q6;R0t(wL%9%Zr_l^W_0tjamZcOoO7$!Ci1R*Cfr8mS1! zJo>)7_|A;}$HKLa5u1-eBq;Xj@wTW!4ddMqZ1P;gmuw=sOml3bM1#prV~0vd&B((a ze;LPtPUm1@(CcUDR4lt5UG*6B0c)+S?s~8(M5Gv}>^cq8J2B#18Qm$1?{M_?b^7pnCi|fEkVDm%lea{0~ymPYg>rbvlQ+_dl-U24- z!Co;QcXY489h=XN@V>=S?XVYatd|V;kB4K~9@jc)fivP4%0&AsAD6(j=B(?~UVqQB zKWFbE9p8J~66b+N_QsPh3|&q2>mz>r>5WMIn^f{{N{k=szJvAZQb!wO3a`qL08X?0 zAypYltFyrknKizgQ&5*z*GDbY$A_D8Z7(pylrYzc^~}|}m`!~7u*(J&gLv$P~y#@-mz2AJTRd{bHaTBXt!~S=wQutkOE5xI& z#m28xfIBhy1HGR}>&>>`%g*MwO2GQRThVkoEr@olr}#3+opc(*_EF|%eUBlQ2K{0- zwYmW_zUvSBZ+m~8)3CrF_Zb4!fo83@n)M2Mnmyxbj&>K{pj7)4asgJ)yIjd)k{oLl zx={|Gf1CeOh9M{m3438fCb8JDE#}GJiAwECKf2@fPE#D#K&Dj~R(-jyIOx6x*f%j4 zE7El0-vNZgFs{4(xnAr+DaT1MC`FY6Px%kxQT&rZ$@q5TV~SXbbv52VL=IfC)3+R* z6d%CNrc1>z`hj?z-Wcxpj;k(3indb0i^D?l#~I`d1JwX)4StS0bV_P9O)P-BP3U)U z(+;;F_6b;n%F=Sr4?G5GmVPDUo>TOzkL}j=)FB4%kKWU!@Z%PDGZLmA0t13;6wkhU z)Gd0BOa)&Znc@6zFB>NL@VE0HTHqrO{o~@-Ht67yoqS`oI9e0XfAhR#35FRTooqTS zle#{XomyEftOjg5d{Dl34wICndjlzKY5cYNTBVx(El_Cohw_a!NMJ*rxVyXd)X6d~ z3fM5d1>Deof>q!cPp0)z#eJ$BfQ_ny zZOUI1Zc;^~ggjkLoaQ!>j2k||>V@030v>fZnY2E33uaV-o1S!HbilK5wZ;=h?A7Gf zyEc?leYGON+3EyXLkxM$&_rT7QA<3?(lIdSukbz*&87#u@;v4tw@pxuS4fg$64>MF zosSm=Z7l@d|7_1K@XZvyxh#Fc;o<2EaU=!v`H1W9Y#MV%r6Jyi$n@y-ag2g=1_paq z?`JE2F~xhe)W>&UxvnfsPxz}P0(JK@MY=28!9@AOz`qZome>vfZ=Ml|rO}lAvDE!# zzIQL)2_^ryawZnw#KEow1{F3|kKU|LX%7#5QkR;ie85cook|*2_KD2?XX?hCnGDCt z=1)OKkB#V3%+ceSxv+Mz-6?G@E859HPg-n*#ODvc5)M9mSPMOx ze#_^1P<;(u$^l6LrY0q-42Nd*Vui+9j4qa^1nRKVX*dSs2ry`VwT)dY?7sOY zK>I3K43emMf22bzQ{GH&?D)=JEOZ;SX9&FlLOOl2o9t??5618Z`*wCuJ%}9-dF+^D z_zEEKmo+iywQK4;RdSfck0xIX-70ZJ#w&mabGTdq$>q3Zw8>3=p45iU&1Vy{*eWsq zVj#jSs(|j+6xBg6+0mGKc4lnOF7`DwEBNESZvls3A-(KRcjvcja!<>7CeqeOWTCF! z3u@lu&10+cHI{Ri_h@I-W3(efSH2b?_U5DLnb<{Vx{1NLa&LZoCB);W&HFW?M1sz2 z+*Piz-Ssb3$*P~}jYV-WRWC+pgc+$6wtN1}ducT6OJH13!gSb|zRX5Fs+7E+E=SnM z{P7ikFLxIA948IBaP9qlrfsXajY)F3QA6s zm;4U?`%x5sL0`s*9qW6xJ>L5rRk)jX!t@eyW(4YiU)@9OC}rI5w3F}G8y%+R*%rPZ>ExJSW5ZFn;j}pL{55hWLd-OPe_~u_vSmKF^N>nRYQyId0&6ymQz5rIh6kV zeKjti`y~Ymq~~~R4H7>aCvKs-8K0@ngx<+k2i}{jz5*;|!Scdx9gf(Y#0}9LHIPF7 z_%(zef7MaiQu|n#fpFx>OpnKASV-s|S)+V+xZqNvc;D>?lHhU39U}Vb*a{H)=0a3< z4NY+pOh$<)v!Tqtez&k=9hSHQ>!Ne;ns_N7&kbl6=BcQ!9U_u`!MI8xZ>{4s!HT~C zYE$N*=xvWw5e!obi8@3Jqp+_lwiOFRrYNrX7r|(XAyJ%Smh`wg>TMrinl+mN`$nd}1zqt7X_2%SJN&};fxa*!{MZ^Q4ViEW zWf>JYBeNF12()NJ<=Cpfgh}bIdQuJ{u+O5MKawekYj~VZA1vvRbRGox8{Nff0jw7J z{!FP$WW!%Co`n8-G@i+4x9~OA^{E7~sNk~zzm0ycInbSE!u4K&Uc08M)OM>2Xu0lj zE?t$Ht+TZzI7XxEkJKy&RHatBfJZq5&UJ}TN4#0n?yQ-wUw8d^*W5a3S!I;@AN#{_ z@=g~sTXiXZWI=p`(gD0II&t*+^bJAw?iSmU&u=efZ%*4((J=Kor$=jQ+~q4r4H^k^hYEbVTiX8J7|2v zL#m1HVT!+ah#pJChodcj&9}_)%3yxc8y+LsOL0wSM<$$!_UfB3kn7SNBVeGGQ4;?9B6ck~ST_h^U>tE-tl|-cnFlTOCM|{FAB?K=Plv4}ZvIVN&ed-qn zh9BLr0ICu>eXGAK{CKOXhp3M4fk5}do$SD}jN-z)%}1P5qD!nQ%HrmZ*qF613_u;f z;OE`e=+=A-=d<0!qz5t%L!I#*5d5fL4BLzdp=}5;|ARZ&d9SX_92jaj=N>F{wPYmz zhnY-_H>XVd1u+91A9g5*TsGm}N6>yxR&P&w-|6uJGQ>es!Q*1kE)Z-ezLI|e_&Dvl zUuFR2TK1;jRHQocNyIzMbekE!b+#1+TD%#=B!*Ou)4^D)1-De1U0pc_zZ!4@W7&M+L5B1;KEtf9qKVB~nXHwc(`_wmbI}Z0aShur>!9I^>?!%+uf%q8=AW zy5XqN?4`VAGnplJQQ4)ep;M3BZBtHE-tCZjxC&AFQbUiKKWD;6!UGL^{?orCu#-cc zPT{oOxJ{g{*hCOWoupYKw6nYADO6@&m);jD^$dOk$Q zw-{>V=guN(ZCoB9n7;Bu@K?gn4=6W95M?fU11C*GBD9Av0SJzIl17k=c{a*4xo|`a z8!h^fG3*6~uj&&g31Ao{x#h$jmE@@S0}=+G$yX3mGjIBD`^2MHR{lM5J^iV zxq2v04E`m zJ}n6vkjVDmiLME;T#4ftv^cPPVlK}3Q3iROrxw-6GJB)l3UE?PTt!<5&}?QUss^aG z02*Tv*Apb{@-%MqW!Xd3(cFFs#-HOJP3rU~PTa+MfUq4}pC^XnX+y{u4@K@4Aie`! z^&|W6qp$Hls&Y6+lKwLfnWFJJ3;v|N1j#KgU8z3v=4Sz*&KG1$D?9(~U&{rdic|Gf zw+5b9ZWaKE6i&4)-1Gbc?37W00FNSK8~rE?o)5!3mm{dqAh5#XZSdG74U**$m?k$+ z{i%sC{v?|bCn6wbeiMO-11eTmbR^omv>salD-U|jjt!B`WL-!_!BaG%m|k?TdIGz- zS_V57{DCi(<0)qJxkJCbhPX>f!w>J3b0R-~n?^|D=|T0G#s{Om^RJu6b1l6{Hd$q&I62$+A_$+XUxf`%D1 zt&{&z+4CVu2j+Oj3qAuM@#;yri1IZA*Y8C`GDl}1CCry%9oflfj1aBsx?$+Y2)ymf z%blpLR30Hq56XV~Lcib`ULX+kkv9UQ?RhhXE@L|3nATo8Fn=`1@r%a0H zZz5d)Eyz8VNpX{hGl8ir0ndutp%;v@bl?RNo5jt8jrsqo@lOFk`8G2MBN#5<9h#du zXu?)P=pgf2<|4#iJ=$YBQ_{k+ zIn2o~QOnS75kw|uF?5}IQ~JDofH%7&N)duRo`+kzarQ0k9i~DoFVESgllk+j|3deu=M`8X!hs0OOhks&b}JqUr_Ky zyF_XORTbz9xVn@^{HeSFNJnB8ogEQH5_EjKN#@>HzBK}+zCv6Zv(gGJaC+aCAMb=# zDh7wbaF0UXv&a1|T1bOVkXxj|%MpbGQzyLL;C zz)R6s5^GI+%(xu+0)iD>o<92tZCmw+xK+(vG1PtqEn*=M)N*X3diH5Z&LU6p3RG z8&TZ<{LA_N;G6faIzV!a@W}IiW$!i(Dhg^fN{z z`W;uu=$~P*Du*6t2a4vVjeW35GLQ;i*0%r>A?)|(aXNC4ia^-MOW4W))#$#2A8&;w zpDR%#aKzehN{af`+%1|YOQ0KKF3QyD>*;pZ#Hkm61s>Vk2A2w@`S48Up2;Ml6Q0z&$%kW4NS&O*`t%p2TZ~Sa<;%k68jCg?;YRYBLUQ178pR_uVHKWF1NnP zT!IX9o3JZs?3eMKv_8T0vwhFVAZ0LzXjX{Gwm*F;L)D^mJ@gR+w-xk|;El8?F2>#0 zekBC-#>~f7I?5Oh`n_^`0C67C5u1!nj?(bu1lXLN8wxY%FF43{@LB_j)h83Ko4eoO1(tVo0KVB>+mWmzo zW{Ae!^{X^LtYc?oxkh<$owUz2n-AljU<8Ql91+{U^}Y$%ASWzvhSsT+TSOu3l{gIt zV4$SOF5VmL2+O33{ua1Z6}7{Brb?Mk3U)YRT6bk&CFqzgAPdGG4RTZV_>oLW5 zscQiP2~dT^MM%&t9k0qcYX%aVi~E9_&_}D8GM^_tGdJVr1I&YgX-3EG@3qmnYEu4s z@vFx|QM>ztD|yVdw{Uu{RCP2x?@X9~i+%QX(E{>sNh#UjiYv#oZz!C-y6QRJaQlp| z>-bMS{#WMAE1;kSLw9tuuM~R)-UPw6&oQ?GouLs=GW}B?4L!eKi9hZ#Jh}Ta9SXjv(a&lub1;@UM+!Mi=@FV${b=nyK74#O z&Dp0}Gb0bZZU^~~eh7*8BF`@5zq0-N#EWmm^j%P7BjLfZm@B4>0%|h+U!E#jf`gDB zu#a6I`lIcaquEMH@$hjoFmIq6CNo$bO$ zh(d%Lac=>GtML__EH{44$iIHNWK_Psy6?A7^Y(Erp4$4?seX8j%U6Q|se`Z`?u>et zpgX$u!1EZVigI8i|=uVvOG-oTq~VOV~n-(y*e|wzt8!$p^GZ0Z=V~WA?T+839 zcxr*8y4jYT|K^?^Qw(NDIVsOJBZi|u=$zjFm@bQ85ElfB}vk+@KJ}e z#PF@_f8Ttvn4StSNqME%ta$|9rz=HIi#swki%1IN_EIsGMiuW-*{fffcgp-wBlR^$ zdE0FPW<5-#WDvNxV|IH1L4|T`JD9C3Dslh$q*@_#(C&j~v5@bUVD94kkuO?T;dr3q z_6Q|2J@(HGP0`iMkHAz@zIUUrEn>bT^*IpACi%XI^En1POOKs}D%{%Z;JVpaAogp2hQn3jgM5jI zBlqHb7N1`nt2snCJWAxowykrEp(n51Es>l{L(*ApD91Sn@80#B5MfV7HF;(FW#0-4 zix-$&pU44^=KwQSo2u{qmN5$ZlY*BSOw77oDesE5&JKB&?t=28f(G@_vx>sYF4OS1 z5Sxlllmud|wMq=-I&_``CE^4g1FWSuPJlUqJu$=a`9O@tUqguEf77kCn`=p)=+v4k zx6s%EM%;2XHQkE7Dz{4f3nE178$ zyrCmw{x6h6?3o5|PXw1u>+_TR79o^&rdCT5PV@q)3Eva+U}gCb#1{7L%}9HpIPxz< zGa#dkuN_GOp9yAw%B}I}oz7`pf*Mf-CqiL-%lcNC0g$k3`<9`qg{O#k4LNcSP-qB; zwc=*%Whx=ee&nmlFU9O}N}NJ*#ay{nyZF8qc>h#t9=dFEwC`viQ$2GahZLi|*9RjY zp=D2Qai8FiyD7Y=`4SW2f9>mF@ z_qOrH&XSy6D~+Iuza#b%&T9~Jz*)q{qG2?8EpS2OC94OD1m}v@DF6&0o4=qRW4K>F#Y%5xI4DjH#2bK0vfMRdZ&TEOpYZmT<8|)0 z<#^M?8@<-q|CCZQpin{Z$Khcd@j@%{42QZysrHNLl`4XB|G&Hg^~lW~$?e7INT(=W z1Bkw}L$b*(LZ2BY3+BUtb8OHPV(!7WF3IV0s|Q{&XD?FwFdcDJ&|oRFP8RtU%dCN+U* zV<2?n7KM!T&Ps};WXV6rjY{7m9A&Iv6Twb3_aNbJ0*1}=Y)R$2P$NNvKXocC*4=I} zBz4~%zMdmLwj-F4A>;_N<0(-hakj-282gK9GBnkJ7fvo`cyr7>QREno7TbhyTqDN3 z!mB)5hwCW$g832g?d+_T<#CIiA1a|+YEP=eF5`!e`E z#h}ma^1&u6MlIMXnM!ePG~INvjNYPkm;RQ9G7g>{}U8dVVZ#U7c;6a3AlwKJ5PP*)jNyf*S_QhvR2BQW3t^_^YI4 z=$=q4>zmQ9R1M14KC>)uaYqZk`!U*(2x?t*KYS$qf=%fmp658wEgy~Thw1P(4b5~-a=Px&SDW0L9O=DQY;$e~lIqc`h>nzNgk%n8+(hWC9 zm8Qz9pl=}vMzkIehfm#C*_6K)BFOU!Ldiv<^5r(-CZ+fMF#=AE7P=Qj_U5(U6UdGO z0SP?Pj8$kEVZ(E>8IWrW7yHF?XoeRW{tzMFN)}F)79DGiw7Q&h!I2bf2#$|~(1tG_*$pM+30Cdk2jvR% zaoSfqeQa;Jl5y&D8<{|<^l|E7Ys!H8G4sNIO0jG)cN0HLuKK8oKS5`rvz5--snp=) zhRY*iNV`KI0p*Cn-T4OqkA(=+f_FBhS55;VxmrgW!gN$1PO7d<&w z`bGytpBv>ALQJXbbi3-F0QXRSO!&~AS7LhuMm^|}qA#Mj@K2>LT6p^c&gR)9Z>h6) ztcv2|Aq#NhCK9%~0PJy&5q^P?AB!%#zaN{fLmN0QLL%vt^-%8MS5kf)*YppC>Z(_S zKQ{n*;ib`IJ%jX(To;E|nT16b;Mr!lW?-1K#s2nHH7?Il*bLVIoPTyvc4`I1>Vy5a z91bjf*z$Y1akY>lxN*lniwOJp+j<;Kw2b)DB%^VYVEtS4_E33Pwh8VfF#Hc`4(cQg zez76u1`MGH!N)(?v)0BR^4U_Q11Bgy2D~rMruwS`K0W(0Lp4(qBn+-{O@H)t`Zr}} z5uPwuLp?BUiF5lGb5{GrO4*qiy<0#mL`A-@J@D~l$qzxCE}#sT*MfGF?^G0ad+w-( zpUL%7+pVWFJZpj+i$!^risb#}!cQyf!Seth6RTLwS@#z_v%+%U8BCwFUjm( zBDgyWszTFI1pX*Du6O#VG!31G`*Pshr>r(v9e4iRZGI~<9IZ= z{1V%FA~kzT?1-dF=cAGcK~Wr6JT;W4{px$pRcKfqZmQF77jFM|)bGB++g+hp`tvub zPNblnSI#l`o$>uj`k?#A?RW$7CF0p`ATdQl9^`ryzw_#DqNga z|HF&pLQacUZ`nzau+haWElBtNQE&Hc`IF*q@}J&0v9a5L6hmsv%h*1_I=}nUKmU2QBpdu8Q)#%}gs{rPHw!Q2o`^RiW zqEbkjS}RQjPM2vVxJ7Y?C(|*C3+yU{^e5qljYtdw*7GKgO-WlXkLMCDWLp*g&IuZ; zs6vrYqInAlU8%7Or^w?=YyjpWwO7{t;>NPgZUDUy{E>%{AGA&z;R*U_6LygSHAt)_ z@HwH=q$qBiM%@;O^b5?#`+~W<+*Y9ZaHRiiUwBCo#2;+&#g96HzR(ynML4GuIAQox zlV_X!fCrd@GVk8+8+01lN1)4U}_<|rtJ>1GZ%zV2$|S+`U3+} zJ#pbE$9l9<*z?Ug(m3lxfXlrZArPP|Xf^n_xz?C!N49tFvVs%uJP$3A7btm{|Ir42 z)WiJbxc9>u>%{AO0(}Z#hP~NI^;<;Q&lr?8WR6Tsf`SF-0G(TV>LK|&UMaRx%zR55 zU^fF!{KI^84a1Nltb`|!!}cg3MZ$xM=70~#5&@zW5p)2)CJycQ-t=?$G_iQ20)DwX&DG3qR27`0ky z2&3jA(-HQhiC?!oQfwz|VqE+{G6j_`SRq5$Uas?-%muEq=w*HBwA?>s^vo0xA^1Xw z9AMZ}D!r-+ussqd=npzFH$@$_;giaRub}z&!;eKT%hy8Kf5v^B^|Xiifj+4C`kLhK zzCl$)UNU};7PhoH#CFZu6aAw&`K8|q=1fXP1%?}^;CwghQ#X=#kZ?A0y7GO*u{W;L z;BxI)&yoLaNGHX{jAI(GqE8=%Ah0gb@cDhPzLb~ymte1H1mQ{yohXRu{6vToin?~k zL^No5@#3K$pn7_Y0c1spQG)hwBCNi!p7i;}Y3T((erGt?Z+!`s;SdYX+{Dau4iPSH z$Wz8r$B}~zDVRMpkml8K!)1?kr#>C{4N5d>wh9;CEmAVhwSoNwEqE3!>Rc zj0-s0I+>SYp^1{Qtw^Jes;IScEICj$W>#sh9ay(^~MTO}( z*mgJ1RfXR4J9|P99esOo{UqGcTKQ09zK9(Cgem^C=xSMtup+e@sMMwF2i#V#vSWy< znPIimE+Bi(v?LX@@2cdq=OM&qAw#LufwuwgQ9>(Hkp$T-s@3kmz@Hlgg&b0+Ru3I( zx+W|Kf&$hXVP+kDk$$W9)W5OGjS;hAAk}7R8NiNR*=SjXhC)Q$!l}yH?$*g|=inRu z*_I+v+7#{vozGsZ*DXSQ#)>(=HJ{m)7qtsNn= zf~pA<_ddUYH~5eC;&|OAt)GA3Ea8hD;hdJue&3d|AsH3RUkl;(K9x4+b8lT~f>ul_ zKgI!4xpClaC&&w+cG2Cu^iA+6zi0I<&UXW05u!?~&di-;2DH9h7rvG>7P4kAf0t+( zPyEOddlLXYSl7W^7GfC-{tFhFCaP&-^8(BX$u*cc3*xK?+7edGfqULcN z9wAs5q`|Y<=gn(gB+eP)c(m`#{rXY{7Pzt;oM+O0i2Cpbfh z-cZP+Yrkh{jUgXkgliNH)IIc3tw^yr;}KK6RB=aC4}FFjAXOMACgP;liW3PllOzNC zHxajy@ZJ|>fG=u*8+f`et+v20A`QakAkWKg^f1FPPLipv5QH-nV);LLCMvfA;4B4f za%6*%2JVaGNonNjCn+GHE^3!+O%Oo0-1hBU+FKqE>k?jw!c^qw$Jf?x8iFlyJ`N42 za=OzREB+*^R^90yQzphSGML|U?w5dqu|CAqgT^4hy1nK`2U*GRL>Vc6LvC`Pp%b9p zWX%?%5b=Ci2QE&iYtN888SZ$1!S?oDn={kUrFdZgeDj|CutG`;CXt3EtE>*uDf0xl zdKR>dfRu5Y;4P;DApRfAK#R5sSr;l8ncUl5i)~JrwuK zN6iYdF`gsOA;CLAM*-B2k=!km2yO#4PmrIxRJce6cmp=*N^kYbfQb}P7pW%}oG{#P zY9}wXAD$cc~2fToF z!O!vpcb|=}gT=rrZTH?$4#|N1&2WQ{zNu#j9KFNOs4eKT7aQw7%bkjHp>B`7OlKA2 z!US=aURXl*6#W4oPPsRSXAHf$Pb?1#i-2m;kPSUur2Rt!RF3q9gRk4b@QB`i644Lq zj*6hg^BvF=-r_jRk}r-(Pi2)fl{N$Vl}R~|4uw-=omUS6{mc_I-Js9eKPuCfz;HVq zLk&3VNKm$jT7CyJ%Qzu<-UY+SLYWySptn4^Ippavi;Tvq=Oj_z&f!Mf+$qRRk1=89 zelPitRsw`E8b@m5iXmDuzvAoh_ZL6kW{p*a;JMg`nsK~$dCe__9fN#2|2DX*p}axr zlE}|X@tkG>C`Oa{{8E>xJ^O# z;VU6ewqbag%6EY~JSq^ma}^lw5Nxh_Ak7q*k6I743cQt?4rg6oFg9ID6 zpN%oe2K{S@AY$-IHm#M6@tto+bs{Bfy~pWjKoumO8vnQs5kbL=o)&#r>_RRlpc?(x zM^}rVWpuX#L^>dreK8y3T~UQIoA!wH8}~8fD>=*ele3UfV~zC$2|_@ce8+ViZh;rL zkC$AOXN!?W%396WpZmnio=^UZ(})e8-u4MUWi&j)zI<`|Q72K^17iUYXsR;h&q>=< zQvTXMXmUkF6blxo)>F!sP;S>A{xodQSNO+evL5RHSj{0!z9ozgIsO^Jsu;B~82IdK zI0-qj*_-u{`EgxjL26S}tqNDtjr{^0vT2r}jtty1+U_i|+M9}OryYWdiEu|#Jn87l z%%buA?eK&3qH{Z%jBZT}=jCNUkvyJK#x8Gbi@4kfbgd0kn7$dPh&=GBwH7TarqLC38WA%>#sf^S|F?H=a>&n z_?_x@R_$IC;Q*c?xa)CdmG(EE&DzGq2Hd2h$yDul?aFkqyl@|oMolPuxxof5{+(T_ zFHH43OY`JevH%<)n<9?prQgHAo$D+or_h^pCYiBp^NR}aFHnRVKsAJ-&-coc(W8xl zm6?IkM~t+D&PY0Jr1R>%+3LDbr1Q>wea%!vWTFAnDOt+YF&%(;4_cf z25)H@cT;h1Eg*%$ddtq2!dao~E!ievmA<_6xJfS+ST~#LNbYL+*#b$xRkkxICIfV~ z)~QnV$$%Iaj2hZ4^AcojwOPnlf|t@v0MX`fc{A|j$KoeCvHJpgpi&>x&%+Dyl$EY; VU)Tpo|Iz{gT}?xcI(5hR{{xlXV}1Yt literal 0 HcmV?d00001 diff --git a/test/Microsoft.Azure.WebJobs.Extensions.EventHubs.Tests/EventHubApplicationInsightsTest.cs b/test/Microsoft.Azure.WebJobs.Extensions.EventHubs.Tests/EventHubApplicationInsightsTest.cs index cdb11e9..efbda46 100644 --- a/test/Microsoft.Azure.WebJobs.Extensions.EventHubs.Tests/EventHubApplicationInsightsTest.cs +++ b/test/Microsoft.Azure.WebJobs.Extensions.EventHubs.Tests/EventHubApplicationInsightsTest.cs @@ -216,7 +216,7 @@ public async Task EventHub_MultipleDispatch_IndependentMessages() expectedLinks[i] = new TestLink { operation_Id = operationId, - id = $"|{operationId}.{spanId}." + id = $"{spanId}" }; messages[i] = new EventData(Encoding.UTF8.GetBytes(_testPrefix + i)) @@ -388,7 +388,7 @@ private void ValidateEventHubDependency( string parentId, string category) { - Assert.Equal($"{endpoint} | {entityName}", dependency.Target); + Assert.Equal($"{endpoint}{entityName}", dependency.Target); Assert.Equal("Azure Event Hubs", dependency.Type); Assert.Equal(name, dependency.Name); Assert.True(dependency.Success); diff --git a/test/Microsoft.Azure.WebJobs.Extensions.EventHubs.Tests/EventHubAsyncCollectorTests.cs b/test/Microsoft.Azure.WebJobs.Extensions.EventHubs.Tests/EventHubAsyncCollectorTests.cs index 73877a3..fa0e36f 100644 --- a/test/Microsoft.Azure.WebJobs.Extensions.EventHubs.Tests/EventHubAsyncCollectorTests.cs +++ b/test/Microsoft.Azure.WebJobs.Extensions.EventHubs.Tests/EventHubAsyncCollectorTests.cs @@ -20,7 +20,7 @@ public class EventHubAsyncCollectorTests [Fact] public void NullArgumentCheck() { - Assert.Throws(() => new EventHubAsyncCollector(null)); + Assert.Throws(() => new EventHubAsyncCollector(null, null)); } public EventData CreateEvent(byte[] body, string partitionKey) @@ -95,8 +95,8 @@ public async Task FlushAfterSizeThreshold() { var collector = new TestEventHubAsyncCollector(); - // Trip the 256k EventHub limit. - for (int i = 0; i < 10; i++) + // Trip the 1024k EventHub limit. + for (int i = 0; i < 50; i++) { var e1 = new EventData(new byte[10 * 1024]); await collector.AddAsync(e1); @@ -106,7 +106,7 @@ public async Task FlushAfterSizeThreshold() Assert.Empty(collector.SentEvents); // This will push it over the theshold - for (int i = 0; i < 20; i++) + for (int i = 0; i < 60; i++) { var e1 = new EventData(new byte[10 * 1024]); await collector.AddAsync(e1); @@ -120,8 +120,8 @@ public async Task CantSentGiantEvent() { var collector = new TestEventHubAsyncCollector(); - // event hub max is 256k payload. - var hugePayload = new byte[300 * 1024]; + // event hub max is 1024k payload. + var hugePayload = new byte[1200 * 1024]; var e1 = new EventData(hugePayload); try @@ -232,7 +232,7 @@ internal class TestEventHubAsyncCollector : EventHubAsyncCollector private const string FakeConnectionString = "Endpoint=sb://test89123-ns-x.servicebus.windows.net/;SharedAccessKeyName=ReceiveRule;SharedAccessKey=secretkey;EntityPath=path2"; public TestEventHubAsyncCollector() - : base(TestClient) + : base(TestClient, null) { } diff --git a/test/Microsoft.Azure.WebJobs.Extensions.EventHubs.Tests/EventHubConfigurationTests.cs b/test/Microsoft.Azure.WebJobs.Extensions.EventHubs.Tests/EventHubConfigurationTests.cs index 1c36f28..a888040 100644 --- a/test/Microsoft.Azure.WebJobs.Extensions.EventHubs.Tests/EventHubConfigurationTests.cs +++ b/test/Microsoft.Azure.WebJobs.Extensions.EventHubs.Tests/EventHubConfigurationTests.cs @@ -15,8 +15,10 @@ using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; +using Microsoft.WindowsAzure.Storage; using Newtonsoft.Json.Linq; using Xunit; +using LogLevel = Microsoft.Extensions.Logging.LogLevel; namespace Microsoft.Azure.WebJobs.EventHubs.UnitTests { @@ -24,6 +26,9 @@ public class EventHubConfigurationTests { private readonly ILoggerFactory _loggerFactory; private readonly TestLoggerProvider _loggerProvider; + private readonly string _template = " An exception of type '{0}' was thrown. This exception type is typically a result of Event Hub processor rebalancing or a transient error and can be safely ignored."; + private readonly string _optionStringInitialOffsetType = "FromEnqueuedTime"; + private readonly string _optionStringInitialOffsetQneueuedTimeUTC = "2020-09-13T12:00Z"; public EventHubConfigurationTests() { @@ -46,6 +51,8 @@ public void ConfigureOptions_AppliesValuesCorrectly() Assert.Equal(5, options.BatchCheckpointFrequency); Assert.Equal(31, options.PartitionManagerOptions.LeaseDuration.TotalSeconds); Assert.Equal(21, options.PartitionManagerOptions.RenewInterval.TotalSeconds); + Assert.Equal(_optionStringInitialOffsetType, options.InitialOffsetOptions.Type); + Assert.Equal(_optionStringInitialOffsetQneueuedTimeUTC, options.InitialOffsetOptions.EnqueuedTimeUTC); } [Fact] @@ -65,6 +72,8 @@ public void ConfigureOptions_Format_Returns_Expected() Assert.Equal(result.EventProcessorOptions.ReceiveTimeout, TimeSpan.FromSeconds(33)); Assert.Equal(result.PartitionManagerOptions.LeaseDuration, TimeSpan.FromSeconds(31)); Assert.Equal(result.PartitionManagerOptions.RenewInterval, TimeSpan.FromSeconds(21)); + Assert.Equal(_optionStringInitialOffsetType, options.InitialOffsetOptions.Type); + Assert.Equal(_optionStringInitialOffsetQneueuedTimeUTC, options.InitialOffsetOptions.EnqueuedTimeUTC); } private EventHubOptions CreateOptions() @@ -79,7 +88,9 @@ private EventHubOptions CreateOptions() { $"{extensionPath}:EventProcessorOptions:InvokeProcessorAfterReceiveTimeout", "true" }, { $"{extensionPath}:BatchCheckpointFrequency", "5" }, { $"{extensionPath}:PartitionManagerOptions:LeaseDuration", "00:00:31" }, - { $"{extensionPath}:PartitionManagerOptions:RenewInterval", "00:00:21" } + { $"{extensionPath}:PartitionManagerOptions:RenewInterval", "00:00:21" }, + { $"{extensionPath}:InitialOffsetOptions:Type", _optionStringInitialOffsetType }, + { $"{extensionPath}:InitialOffsetOptions:EnqueuedTimeUTC", _optionStringInitialOffsetQneueuedTimeUTC } }; return TestHelpers.GetConfiguredOptions(b => @@ -118,7 +129,7 @@ public void Initialize_PerformsExpectedRegistrations() var handler = (Action)eventProcessorOptions.GetType().GetField("exceptionHandler", BindingFlags.Instance | BindingFlags.NonPublic).GetValue(eventProcessorOptions); handler.Method.Invoke(handler.Target, new object[] { args }); - string expectedMessage = "EventProcessorHost error (Action=TestAction, HostName=TestHostName, PartitionId=TestPartitionId)"; + string expectedMessage = "EventProcessorHost error (Action='TestAction', HostName='TestHostName', PartitionId='TestPartitionId')."; var logMessage = host.GetTestLoggerProvider().GetAllLogMessages().Single(); Assert.Equal(LogLevel.Error, logMessage.Level); Assert.Equal(expectedMessage, logMessage.FormattedMessage); @@ -134,7 +145,7 @@ public void LogExceptionReceivedEvent_NonTransientEvent_LoggedAsError() var e = (ExceptionReceivedEventArgs)ctor.Invoke(new object[] { "TestHostName", "TestPartitionId", ex, "TestAction" }); EventHubExtensionConfigProvider.LogExceptionReceivedEvent(e, _loggerFactory); - string expectedMessage = "EventProcessorHost error (Action=TestAction, HostName=TestHostName, PartitionId=TestPartitionId)"; + string expectedMessage = "EventProcessorHost error (Action='TestAction', HostName='TestHostName', PartitionId='TestPartitionId')."; var logMessage = _loggerProvider.GetAllLogMessages().Single(); Assert.Equal(LogLevel.Error, logMessage.Level); Assert.Same(ex, logMessage.Exception); @@ -150,11 +161,11 @@ public void LogExceptionReceivedEvent_TransientEvent_LoggedAsVerbose() var e = (ExceptionReceivedEventArgs)ctor.Invoke(new object[] { "TestHostName", "TestPartitionId", ex, "TestAction" }); EventHubExtensionConfigProvider.LogExceptionReceivedEvent(e, _loggerFactory); - string expectedMessage = "EventProcessorHost error (Action=TestAction, HostName=TestHostName, PartitionId=TestPartitionId)"; + string expectedMessage = "EventProcessorHost error (Action='TestAction', HostName='TestHostName', PartitionId='TestPartitionId')."; var logMessage = _loggerProvider.GetAllLogMessages().Single(); Assert.Equal(LogLevel.Information, logMessage.Level); Assert.Same(ex, logMessage.Exception); - Assert.Equal(expectedMessage, logMessage.FormattedMessage); + Assert.Equal(expectedMessage + string.Format(_template, typeof(EventHubsException).Name), logMessage.FormattedMessage); } [Fact] @@ -165,11 +176,11 @@ public void LogExceptionReceivedEvent_OperationCanceledException_LoggedAsVerbose var e = (ExceptionReceivedEventArgs)ctor.Invoke(new object[] { "TestHostName", "TestPartitionId", ex, "TestAction" }); EventHubExtensionConfigProvider.LogExceptionReceivedEvent(e, _loggerFactory); - string expectedMessage = "EventProcessorHost error (Action=TestAction, HostName=TestHostName, PartitionId=TestPartitionId)"; + string expectedMessage = "EventProcessorHost error (Action='TestAction', HostName='TestHostName', PartitionId='TestPartitionId')."; var logMessage = _loggerProvider.GetAllLogMessages().Single(); Assert.Equal(LogLevel.Information, logMessage.Level); Assert.Same(ex, logMessage.Exception); - Assert.Equal(expectedMessage, logMessage.FormattedMessage); + Assert.Equal(expectedMessage + string.Format(_template, typeof(OperationCanceledException).Name), logMessage.FormattedMessage); } [Fact] @@ -180,7 +191,7 @@ public void LogExceptionReceivedEvent_NonMessagingException_LoggedAsError() var e = (ExceptionReceivedEventArgs)ctor.Invoke(new object[] { "TestHostName", "TestPartitionId", ex, "TestAction" }); EventHubExtensionConfigProvider.LogExceptionReceivedEvent(e, _loggerFactory); - string expectedMessage = "EventProcessorHost error (Action=TestAction, HostName=TestHostName, PartitionId=TestPartitionId)"; + string expectedMessage = "EventProcessorHost error (Action='TestAction', HostName='TestHostName', PartitionId='TestPartitionId')."; var logMessage = _loggerProvider.GetAllLogMessages().Single(); Assert.Equal(LogLevel.Error, logMessage.Level); Assert.Same(ex, logMessage.Exception); @@ -196,11 +207,36 @@ public void LogExceptionReceivedEvent_PartitionExceptions_LoggedAsInfo() var e = (ExceptionReceivedEventArgs)ctor.Invoke(new object[] { "TestHostName", "TestPartitionId", ex, "TestAction" }); EventHubExtensionConfigProvider.LogExceptionReceivedEvent(e, _loggerFactory); - string expectedMessage = "EventProcessorHost error (Action=TestAction, HostName=TestHostName, PartitionId=TestPartitionId)"; + string expectedMessage = "EventProcessorHost error (Action='TestAction', HostName='TestHostName', PartitionId='TestPartitionId')."; var logMessage = _loggerProvider.GetAllLogMessages().Single(); Assert.Equal(LogLevel.Information, logMessage.Level); Assert.Same(ex, logMessage.Exception); - Assert.Equal(expectedMessage, logMessage.FormattedMessage); + Assert.Equal(expectedMessage + string.Format(_template, typeof(ReceiverDisconnectedException).Name), logMessage.FormattedMessage); + } + + [Fact] + public void LogExceptionReceivedEvent_AggregateExceptions_LoggedAsInfo() + { + var ctor = typeof(AggregateException).GetConstructor(BindingFlags.Public | BindingFlags.Instance, null, new Type[] { typeof(IEnumerable) }, null); + var request = new RequestResult() + { + HttpStatusCode = 409 + }; + var information = new StorageExtendedErrorInformation(); + typeof(StorageExtendedErrorInformation).GetProperty("ErrorCode").SetValue(information, "LeaseIdMismatchWithLeaseOperation"); + typeof(RequestResult).GetProperty("ExtendedErrorInformation").SetValue(request, information); + var storageException = new StorageException(request, "The lease ID specified did not match the lease ID for the blob.", null); + + var ex = (AggregateException)ctor.Invoke(new object[] { new Exception[] { storageException } }); + ctor = typeof(ExceptionReceivedEventArgs).GetConstructors(BindingFlags.NonPublic | BindingFlags.Instance).Single(); + var e = (ExceptionReceivedEventArgs)ctor.Invoke(new object[] { "TestHostName", "TestPartitionId", ex, "TestAction" }); + EventHubExtensionConfigProvider.LogExceptionReceivedEvent(e, _loggerFactory); + + string expectedMessage = "EventProcessorHost error (Action='TestAction', HostName='TestHostName', PartitionId='TestPartitionId')."; + var logMessage = _loggerProvider.GetAllLogMessages().Single(); + Assert.Equal(LogLevel.Information, logMessage.Level); + Assert.Same(storageException, logMessage.Exception); + Assert.Equal(expectedMessage + string.Format(_template, typeof(WindowsAzure.Storage.StorageException).Name), logMessage.FormattedMessage); } } } \ No newline at end of file diff --git a/test/Microsoft.Azure.WebJobs.Extensions.EventHubs.Tests/EventHubEndToEndTests.cs b/test/Microsoft.Azure.WebJobs.Extensions.EventHubs.Tests/EventHubEndToEndTests.cs index dcc8829..cbdb66a 100644 --- a/test/Microsoft.Azure.WebJobs.Extensions.EventHubs.Tests/EventHubEndToEndTests.cs +++ b/test/Microsoft.Azure.WebJobs.Extensions.EventHubs.Tests/EventHubEndToEndTests.cs @@ -8,29 +8,50 @@ using System.Text; using System.Threading; using System.Threading.Tasks; +using Microsoft.WindowsAzure.Storage; +using Microsoft.WindowsAzure.Storage.Blob; using Microsoft.Azure.EventHubs; using Microsoft.Azure.WebJobs.Host.TestCommon; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; -using Newtonsoft.Json.Linq; +using Microsoft.Azure.WebJobs.EventHubs; using Xunit; +// Disambiguage between Microsoft.WindowsAzure.Storage.LogLevel +using LogLevel = Microsoft.Extensions.Logging.LogLevel; namespace Microsoft.Azure.WebJobs.Host.EndToEndTests { public class EventHubEndToEndTests { private const string TestHubName = "webjobstesthub"; + private const string TestHubConnectionName = "AzureWebJobsTestHubConnection"; + private const string StorageConnectionName = "AzureWebJobsStorage"; + // The container name created by this extension to save snapshots of the Event Hubs stream positions + private const string SnapshotStorageContainerName = "azure-webjobs-eventhub"; private const int Timeout = 30000; + private static EventWaitHandle _eventWait; private static string _testId; private static List _results; + private static string _testHubConnectionString; + private static string _storageConnectionString; + private static DateTime _initialOffsetEnqueuedTimeUTC; + private static DateTime? _earliestReceivedMessageEnqueuedTimeUTC = null; public EventHubEndToEndTests() { _results = new List(); _testId = Guid.NewGuid().ToString(); _eventWait = new ManualResetEvent(initialState: false); + + var config = new ConfigurationBuilder() + .AddEnvironmentVariables() + .AddTestSettings() + .Build(); + + _testHubConnectionString = config.GetConnectionStringOrSetting(TestHubConnectionName); + _storageConnectionString = config.GetConnectionStringOrSetting(StorageConnectionName); } [Fact] @@ -96,6 +117,9 @@ public async Task EventHub_SingleDispatch() Assert.True(logMessages.Where(x => !string.IsNullOrEmpty(x.FormattedMessage) && x.FormattedMessage.Contains("CheckpointAsync")).Count() > 0); + + Assert.True(logMessages.Where(x => !string.IsNullOrEmpty(x.FormattedMessage) + && x.FormattedMessage.Contains("Sending events to EventHub")).Count() > 0); } } @@ -129,6 +153,9 @@ public async Task EventHub_MultipleDispatch() Assert.True(logMessages.Where(x => !string.IsNullOrEmpty(x.FormattedMessage) && x.FormattedMessage.Contains("CheckpointAsync")).Count() > 0); + + Assert.True(logMessages.Where(x => !string.IsNullOrEmpty(x.FormattedMessage) + && x.FormattedMessage.Contains("Sending events to EventHub")).Count() > 0); } } @@ -148,6 +175,59 @@ public async Task EventHub_PartitionKey() } } + [Fact] + public async Task EventHub_InitialOffsetFromEnd() + { + // Send a message to ensure the stream is not empty as we are trying to validate that no messages are delivered in this case + using (var host = BuildHost().Item1) + { + var method = typeof(EventHubTestSendOnlyJobs).GetMethod(nameof(EventHubTestSendOnlyJobs.SendEvent_TestHub), BindingFlags.Static | BindingFlags.Public); + await host.CallAsync(method, new { input = _testId }); + } + // Clear out existing checkpoints as the InitialOffset config only applies when checkpoints don't exist (first run on a host) + await DeleteStorageCheckpoints(); + var initialOffsetOptions = new InitialOffsetOptions() + { + Type = "FromEnd" + }; + using (var host = BuildHost(initialOffsetOptions).Item1) + { + // We don't expect to get signalled as there will be messages recieved with a FromEnd initial offset + bool result = _eventWait.WaitOne(Timeout); + Assert.False(result, "An event was received while none were expected."); + } + } + + [Fact] + public async Task EventHub_InitialOffsetFromEnqueuedTime() + { + // Mark the time now and send a message which should be the only one that is picked up when we run the actual test host + _initialOffsetEnqueuedTimeUTC = DateTime.UtcNow; + using (var host = BuildHost().Item1) + { + var method = typeof(EventHubTestSendOnlyJobs).GetMethod(nameof(EventHubTestSendOnlyJobs.SendEvent_TestHub), BindingFlags.Static | BindingFlags.Public); + await host.CallAsync(method, new { input = _testId }); + } + // Clear out existing checkpoints as the InitialOffset config only applies when checkpoints don't exist (first run on a host) + await DeleteStorageCheckpoints(); + _earliestReceivedMessageEnqueuedTimeUTC = null; + var initialOffsetOptions = new InitialOffsetOptions() + { + Type = "FromEnqueuedTime", + EnqueuedTimeUTC = _initialOffsetEnqueuedTimeUTC.ToString("yyyy-MM-ddTHH:mm:ssZ") + }; + using (var host = BuildHost(initialOffsetOptions).Item1) + { + // Validation that we only got messages after the configured FromEnqueuedTime is done in the JobHost + bool result = _eventWait.WaitOne(Timeout); + Assert.True(result, $"No event was received within the timeout period of {Timeout}. " + + $"Expected event sent shortly after {_initialOffsetEnqueuedTimeUTC.ToString("yyyy-MM-ddTHH:mm:ssZ")} with content {_testId}"); + Assert.True(_earliestReceivedMessageEnqueuedTimeUTC > _initialOffsetEnqueuedTimeUTC, + "A message was received that was enqueued before the configured Initial Offset Enqueued Time. " + + $"Received message enqueued time: {_earliestReceivedMessageEnqueuedTimeUTC?.ToString("yyyy-MM-ddTHH:mm:ssZ")}" + + $", initial offset enqueued time: {_initialOffsetEnqueuedTimeUTC.ToString("yyyy-MM-ddTHH:mm:ssZ")}"); + } + } public class EventHubTestSingleDispatchJobs { @@ -175,6 +255,38 @@ public static void ProcessSingleEvent([EventHubTrigger(TestHubName)] string evt, } } + public class EventHubTestSendOnlyJobs + { + public static void SendEvent_TestHub(string input, [EventHub(TestHubName)] out EventData evt) + { + evt = new EventData(Encoding.UTF8.GetBytes(input)); + } + } + + public class EventHubTestInitialOffsetFromEndJobs + { + public static void ProcessSingleEvent([EventHubTrigger(TestHubName)] string evt, + string partitionKey, DateTime enqueuedTimeUtc, IDictionary properties, + IDictionary systemProperties) + { + _eventWait.Set(); + } + } + + public class EventHubTestInitialOffsetFromEnqueuedTimeJobs + { + public static void ProcessSingleEvent([EventHubTrigger(TestHubName)] string evt, + string partitionKey, DateTime enqueuedTimeUtc, IDictionary properties, + IDictionary systemProperties) + { + if (_earliestReceivedMessageEnqueuedTimeUTC == null) + { + _earliestReceivedMessageEnqueuedTimeUTC = enqueuedTimeUtc; + _eventWait.Set(); + } + } + } + public class EventHubTestBindToPocoJobs { public static void SendEvent_TestHub(string input, [EventHub(TestHubName)] out EventData evt) @@ -290,18 +402,11 @@ public static void ProcessMultiplePartitionEvents([EventHubTrigger(TestHubName)] } } - private Tuple BuildHost() + private Tuple BuildHost(InitialOffsetOptions initialOffsetOptions = null) { JobHost jobHost = null; - var config = new ConfigurationBuilder() - .AddEnvironmentVariables() - .AddTestSettings() - .Build(); - - const string connectionName = "AzureWebJobsTestHubConnection"; - string connection = config.GetConnectionStringOrSetting(connectionName); - Assert.True(!string.IsNullOrEmpty(connection), $"Required test connection string '{connectionName}' is missing."); + Assert.True(!string.IsNullOrEmpty(_testHubConnectionString), $"Required test connection string '{TestHubConnectionName}' is missing."); IHost host = new HostBuilder() .ConfigureDefaultTestHost(b => @@ -309,8 +414,14 @@ private Tuple BuildHost() b.AddEventHubs(options => { options.EventProcessorOptions.EnableReceiverRuntimeMetric = true; - options.AddSender(TestHubName, connection); - options.AddReceiver(TestHubName, connection); + if (initialOffsetOptions != null) + { + options.InitialOffsetOptions = initialOffsetOptions; + } + // We want to validate the default options configuration logic for setting initial offset and not implemente it here + EventHubWebJobsBuilderExtensions.ConfigureOptions(options); + options.AddSender(TestHubName, _testHubConnectionString); + options.AddReceiver(TestHubName, _testHubConnectionString); }); }) .ConfigureLogging(b => @@ -324,6 +435,33 @@ private Tuple BuildHost() return new Tuple(jobHost, host); } + + // Deletes all checkpoints that were saved in storage so InitialOffset can be validated + private async Task DeleteStorageCheckpoints() + { + CloudStorageAccount cloudStorageAccount = CloudStorageAccount.Parse(_storageConnectionString); + CloudBlobClient blobClient = cloudStorageAccount.CreateCloudBlobClient(); + var container = blobClient.GetContainerReference(SnapshotStorageContainerName); + BlobContinuationToken continuationToken = null; + do + { + var response = await container.ListBlobsSegmentedAsync(string.Empty, true, BlobListingDetails.None, null, continuationToken, null, null); + continuationToken = response.ContinuationToken; + foreach (var blob in response.Results.OfType()) + { + try + { + await blob.BreakLeaseAsync(TimeSpan.Zero); + } + catch (StorageException) + { + // Ignore as this will be thrown if the blob has no lease on it + } + await blob.DeleteAsync(); + } + } while (continuationToken != null); + } + public class TestPoco { public string Name { get; set; } diff --git a/test/Microsoft.Azure.WebJobs.Extensions.EventHubs.Tests/EventHubListenerTests.cs b/test/Microsoft.Azure.WebJobs.Extensions.EventHubs.Tests/EventHubListenerTests.cs index ca8e16c..4075f98 100644 --- a/test/Microsoft.Azure.WebJobs.Extensions.EventHubs.Tests/EventHubListenerTests.cs +++ b/test/Microsoft.Azure.WebJobs.Extensions.EventHubs.Tests/EventHubListenerTests.cs @@ -152,7 +152,7 @@ public async Task ProcessErrorsAsync_LoggedAsError() await eventProcessor.ProcessErrorAsync(partitionContext, ex); var msg = testLogger.GetLogMessages().Single(); - Assert.Equal("Error processing event from Partition Id: '123', Owner: 'def', EventHubPath: 'abc'", msg.FormattedMessage); + Assert.Equal("Processing error (Partition Id: '123', Owner: 'def', EventHubPath: 'abc').", msg.FormattedMessage); Assert.IsType(msg.Exception); Assert.Equal(LogLevel.Error, msg.Level); } @@ -174,8 +174,8 @@ public async Task ProcessErrorsAsync_RebalancingExceptions_LoggedAsInformation() await eventProcessor.ProcessErrorAsync(partitionContext, disconnectedEx); var msg = testLogger.GetLogMessages().Single(); - Assert.Equal("An Event Hub exception of type 'ReceiverDisconnectedException' was thrown from Partition Id: '123', Owner: 'def', EventHubPath: 'abc'. This exception type is typically a result of Event Hub processor rebalancing and can be safely ignored.", msg.FormattedMessage); - Assert.Null(msg.Exception); + Assert.Equal("Processing error (Partition Id: '123', Owner: 'def', EventHubPath: 'abc'). An exception of type 'ReceiverDisconnectedException' was thrown. This exception type is typically a result of Event Hub processor rebalancing or a transient error and can be safely ignored.", msg.FormattedMessage); + Assert.NotNull(msg.Exception); Assert.Equal(LogLevel.Information, msg.Level); testLogger.ClearLogMessages(); @@ -187,8 +187,8 @@ public async Task ProcessErrorsAsync_RebalancingExceptions_LoggedAsInformation() await eventProcessor.ProcessErrorAsync(partitionContext, leaseLostEx); msg = testLogger.GetLogMessages().Single(); - Assert.Equal("An Event Hub exception of type 'LeaseLostException' was thrown from Partition Id: '123', Owner: 'def', EventHubPath: 'abc'. This exception type is typically a result of Event Hub processor rebalancing and can be safely ignored.", msg.FormattedMessage); - Assert.Null(msg.Exception); + Assert.Equal("Processing error (Partition Id: '123', Owner: 'def', EventHubPath: 'abc'). An exception of type 'LeaseLostException' was thrown. This exception type is typically a result of Event Hub processor rebalancing or a transient error and can be safely ignored.", msg.FormattedMessage); + Assert.NotNull(msg.Exception); Assert.Equal(LogLevel.Information, msg.Level); } diff --git a/test/Microsoft.Azure.WebJobs.Extensions.EventHubs.Tests/PublicSurfaceTests.cs b/test/Microsoft.Azure.WebJobs.Extensions.EventHubs.Tests/PublicSurfaceTests.cs index 24bb3ba..d603929 100644 --- a/test/Microsoft.Azure.WebJobs.Extensions.EventHubs.Tests/PublicSurfaceTests.cs +++ b/test/Microsoft.Azure.WebJobs.Extensions.EventHubs.Tests/PublicSurfaceTests.cs @@ -18,6 +18,7 @@ public void WebJobs_Extensions_EventHubs_VerifyPublicSurfaceArea() "EventHubAttribute", "EventHubTriggerAttribute", "EventHubOptions", + "InitialOffsetOptions", "EventHubWebJobsBuilderExtensions", "EventHubsWebJobsStartup" }; diff --git a/test/Microsoft.Azure.WebJobs.Extensions.EventHubs.Tests/WebJobs.Extensions.EventHubs.Tests.csproj b/test/Microsoft.Azure.WebJobs.Extensions.EventHubs.Tests/WebJobs.Extensions.EventHubs.Tests.csproj index 8e39bd9..8a9d67f 100644 --- a/test/Microsoft.Azure.WebJobs.Extensions.EventHubs.Tests/WebJobs.Extensions.EventHubs.Tests.csproj +++ b/test/Microsoft.Azure.WebJobs.Extensions.EventHubs.Tests/WebJobs.Extensions.EventHubs.Tests.csproj @@ -26,8 +26,8 @@ - - + + all diff --git a/test/README.md b/test/README.md new file mode 100644 index 0000000..4c371b1 --- /dev/null +++ b/test/README.md @@ -0,0 +1,24 @@ +# Event Hubs Extension for Azure Functions guide to running integration tests locally +Integration tests are implemented in the `EventHubsEndToEndTests` and `EventHubApplicationInsightsTest` classes and require special configuration to execute locally in Visual Studio or via dotnet test. + +All configuration is done via a json file called `appsettings.tests` which on windows should be located in the `%USERPROFILE%\.azurefunctions` folder (e.g. `C:\Users\user123\.azurefunctions`) + +**Note:** *The specifics of the configuration will change when the validation code is modified so check the code for the latest configuration if the tests do not pass as this readme file may not have been updated with each code change.* + +Create the appropriate Azure resources if needed as explained below and create or update the `appsettings.tests` file in the location specified above by copying the configuration below and replacing all the `PLACEHOLDER` values + +appsettings.tests contents +``` +{ + "ConnectionStrings": { + "AzureWebJobsTestHubConnection": "PLACEHOLDER" + }, + "AzureWebJobsStorage": "PLACEHOLDER" +} +``` +## Create Azure resources and configure test environment +1. Create a storage account and configure its connection string into `AzureWebJobsStorage`. This will be used by the webjobs hosts created by the tests as well as the Event Hubs extension for checkpointing. +2. Create anEvent Hubs namespace namespaces and within it, create an Event Hubs resource named `webjobstesthub`. +3. Navigate to the Event Hubs resource you created called `webjobstesthub` and create a "Shared access policy" with any name and "Manage, Send, Listen" claims. +4. Navigate to the policy above and copy either the primary or secondary connection string and set the value of the `AzureWebJobsTestHubConnection` element of the appsettings.tests file to the connection string you copied. +5. If you will keep this setup around for more than a day, set the "Message Retention" on the Event Hubs resource to the minimum value of 1 day to make tests run faster. \ No newline at end of file