diff --git a/csharp/src/Drivers/Apache/Spark/SparkStatement.cs b/csharp/src/Drivers/Apache/Spark/SparkStatement.cs index 4df66f2778..60fa8d1f28 100644 --- a/csharp/src/Drivers/Apache/Spark/SparkStatement.cs +++ b/csharp/src/Drivers/Apache/Spark/SparkStatement.cs @@ -17,6 +17,7 @@ using System; using System.Collections.Generic; +using System.Net.Http; using System.Threading; using System.Threading.Tasks; using Apache.Arrow.Adbc.Drivers.Apache.Hive2; @@ -34,6 +35,7 @@ internal SparkStatement(SparkConnection connection) protected override void SetStatementProperties(TExecuteStatementReq statement) { + statement.CanReadArrowResult = true; statement.CanDownloadResult = true; statement.ConfOverlay = SparkConnection.timestampConfig; @@ -126,4 +128,75 @@ public void Dispose() } } } + + internal class ChunkDownloader + { + private Dictionary chunks; + + int currentChunk = 0; + HttpClient client; + + public ChunkDownloader(Dictionary> links) + { + this.links = links; + this.client = new HttpClient(); + } + + void initialize() + { + int workerThreads, completionPortThreads; + ThreadPool.GetMinThreads(out workerThreads, out completionPortThreads); + ThreadPool.SetMinThreads(5, completionPortThreads); + ThreadPool.SetMaxThreads(10, completionPortThreads); + foreach (KeyValuePair chunk in chunks) + { + ThreadPool.QueueUserWorkItem(async _ => + { + try + { + await chunk.Value.downloadData(this.client); + } + catch (Exception e) + { + Console.WriteLine(e); + } + }); + + } + + } + + + } + + public class Chunk + { + int chunkId; + string chunkUrl; + Dictionary headers; + public bool isDownloaded = false; + public bool isFailed = false; + public IArrowReader reader; + + public Chunk(int chunkId, string chunkUrl, Dictionary headers) + { + this.chunkId = chunkId; + this.chunkUrl = chunkUrl; + this.headers = headers; + this.reader = null; + } + + public async Task downloadData(HttpClient client) + { + var request = new HttpRequestMessage(HttpMethod.Get, chunkUrl); + foreach (KeyValuePair pair in headers) + { + request.Headers.Add(pair.Key, pair.Value); + } + HttpResponseMessage response = await client.SendAsync(request); + this.reader = new ArrowStreamReader(response.Content.ReadAsStream()); + isDownloaded = true; + + } + } }