-
-
Notifications
You must be signed in to change notification settings - Fork 77
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BUG]: Problem subscribing many assets at once #724
Comments
The root cause of this problem is in the underlying WS sending logic. Buffering logic sometimes sends two (or more) messages in one packet and the server cannot handle this packet as a separate messages. The current workaround is to send all subscriptions in chunks not one-by-one with small delays between calls. See this code snipped for reference: using Alpaca.Markets;
HashSet<String> trades = [];
HashSet<String> quotes = [];
try
{
var key = new SecretKey("...", "...");
using var client = Environments.Live.GetAlpacaTradingClient(key);
var assets = await client.ListAssetsAsync(new AssetsRequest { AssetStatus = AssetStatus.Active });
var selected = assets.Where(asset => asset is { IsTradable: true, Shortable: true }).ToList();
using var streaming = Environments.Live.GetAlpacaDataStreamingClient(key);
Console.WriteLine(await streaming.ConnectAndAuthenticateAsync());
streaming.OnWarning += HandleWarning;
streaming.OnError += HandleError;
List<IAlpacaDataSubscription> subscriptions = [];
foreach (var asset in selected)
{
var tradeSubscription = streaming.GetTradeSubscription(asset.Symbol);
tradeSubscription.Received += HandleTradeSubscription;
subscriptions.Add(tradeSubscription);
var quoteSubscription = streaming.GetQuoteSubscription(asset.Symbol);
quoteSubscription.Received += HandleQuoteSubscription;
subscriptions.Add(quoteSubscription);
}
foreach (var chunk in subscriptions.Chunk(50))
{
await streaming.SubscribeAsync(chunk);
await Task.Delay(TimeSpan.FromSeconds(1));
Console.WriteLine($"T: {trades.Count}\tQ: {quotes.Count}");
}
while (Console.ReadLine() != "q")
{
Console.WriteLine($"T: {trades.Count}\tQ: {quotes.Count}");
}
streaming.OnError -= HandleError;
streaming.OnWarning -= HandleWarning;
}
catch (Exception e)
{
Console.Error.WriteLine(e);
}
void HandleTradeSubscription(ITrade trade) => trades.Add(trade.Symbol);
void HandleQuoteSubscription(IQuote quote) => quotes.Add(quote.Symbol);
void HandleWarning(String message) => Console.Error.WriteLine($"WRN: {message}");
void HandleError(Exception exception) => Console.Error.WriteLine($"ERR: {exception}"); I'm still working on this - maybe I'll find some low-level solution for this problem. |
Is there an existing issue for this?
Current Behavior
Expected Behavior
All initiated subscriptions should work as expected and provide actual data.
Steps To Reproduce
Environment
The text was updated successfully, but these errors were encountered: