Skip to content

Commit

Permalink
Merge pull request #5 from gopalldb/cloud
Browse files Browse the repository at this point in the history
initial skeleton for cloud fetch
  • Loading branch information
gopalldb authored Mar 15, 2024
2 parents 1949057 + 84b39b6 commit 590b920
Showing 1 changed file with 73 additions and 0 deletions.
73 changes: 73 additions & 0 deletions csharp/src/Drivers/Apache/Spark/SparkStatement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

using System;
using System.Collections.Generic;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using Apache.Arrow.Adbc.Drivers.Apache.Hive2;
Expand All @@ -34,6 +35,7 @@ internal SparkStatement(SparkConnection connection)

protected override void SetStatementProperties(TExecuteStatementReq statement)
{

statement.CanReadArrowResult = true;
statement.CanDownloadResult = true;
statement.ConfOverlay = SparkConnection.timestampConfig;
Expand Down Expand Up @@ -126,4 +128,75 @@ public void Dispose()
}
}
}

internal class ChunkDownloader
{
private Dictionary<int, Chunk> chunks;

int currentChunk = 0;
HttpClient client;

public ChunkDownloader(Dictionary<string, Dictionary<string, string>> links)
{
this.links = links;

Check failure on line 141 in csharp/src/Drivers/Apache/Spark/SparkStatement.cs

View workflow job for this annotation

GitHub Actions / C# ubuntu-latest

'ChunkDownloader' does not contain a definition for 'links' and no accessible extension method 'links' accepting a first argument of type 'ChunkDownloader' could be found (are you missing a using directive or an assembly reference?)

Check failure on line 141 in csharp/src/Drivers/Apache/Spark/SparkStatement.cs

View workflow job for this annotation

GitHub Actions / C# ubuntu-latest

'ChunkDownloader' does not contain a definition for 'links' and no accessible extension method 'links' accepting a first argument of type 'ChunkDownloader' could be found (are you missing a using directive or an assembly reference?)

Check failure on line 141 in csharp/src/Drivers/Apache/Spark/SparkStatement.cs

View workflow job for this annotation

GitHub Actions / C# windows-2019

'ChunkDownloader' does not contain a definition for 'links' and no accessible extension method 'links' accepting a first argument of type 'ChunkDownloader' could be found (are you missing a using directive or an assembly reference?)

Check failure on line 141 in csharp/src/Drivers/Apache/Spark/SparkStatement.cs

View workflow job for this annotation

GitHub Actions / C# windows-2019

'ChunkDownloader' does not contain a definition for 'links' and no accessible extension method 'links' accepting a first argument of type 'ChunkDownloader' could be found (are you missing a using directive or an assembly reference?)

Check failure on line 141 in csharp/src/Drivers/Apache/Spark/SparkStatement.cs

View workflow job for this annotation

GitHub Actions / C# macos-latest

'ChunkDownloader' does not contain a definition for 'links' and no accessible extension method 'links' accepting a first argument of type 'ChunkDownloader' could be found (are you missing a using directive or an assembly reference?)

Check failure on line 141 in csharp/src/Drivers/Apache/Spark/SparkStatement.cs

View workflow job for this annotation

GitHub Actions / C# macos-latest

'ChunkDownloader' does not contain a definition for 'links' and no accessible extension method 'links' accepting a first argument of type 'ChunkDownloader' could be found (are you missing a using directive or an assembly reference?)
this.client = new HttpClient();
}

void initialize()
{
int workerThreads, completionPortThreads;
ThreadPool.GetMinThreads(out workerThreads, out completionPortThreads);
ThreadPool.SetMinThreads(5, completionPortThreads);
ThreadPool.SetMaxThreads(10, completionPortThreads);
foreach (KeyValuePair<int, Chunk> chunk in chunks)
{
ThreadPool.QueueUserWorkItem(async _ =>
{
try
{
await chunk.Value.downloadData(this.client);
}
catch (Exception e)
{
Console.WriteLine(e);
}
});

}

}


}

public class Chunk
{
int chunkId;
string chunkUrl;
Dictionary<string, string> headers;
public bool isDownloaded = false;
public bool isFailed = false;
public IArrowReader reader;

public Chunk(int chunkId, string chunkUrl, Dictionary<string, string> headers)
{
this.chunkId = chunkId;
this.chunkUrl = chunkUrl;
this.headers = headers;
this.reader = null;
}

public async Task downloadData(HttpClient client)
{
var request = new HttpRequestMessage(HttpMethod.Get, chunkUrl);
foreach (KeyValuePair<string, string> pair in headers)
{
request.Headers.Add(pair.Key, pair.Value);
}
HttpResponseMessage response = await client.SendAsync(request);
this.reader = new ArrowStreamReader(response.Content.ReadAsStream());
isDownloaded = true;

}
}
}

0 comments on commit 590b920

Please sign in to comment.