Skip to content

Commit

Permalink
apacheGH-43672: [C#] Schema should be optional on FlightInfo (apache#…
Browse files Browse the repository at this point in the history
…43673)

### Rationale for this change

Schema is not required on a FlightInfo message and sometimes needs to be lazily evaluated on the server. This PR allows schema to be null on the FlightInfo since it will be picked up later when requests with those tickets are made.

### What changes are included in this PR?

### Are these changes tested?
Yes, added a test to confirm this behaviour

### Are there any user-facing changes?

* GitHub Issue: apache#43672

Authored-by: neilglover <neilglover@gmail.com>
Signed-off-by: Curt Hagenlocher <curt@hagenlocher.org>
  • Loading branch information
ndglover authored and zanmato1984 committed Sep 6, 2024
1 parent ad1b522 commit 3212f42
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 4 deletions.
5 changes: 3 additions & 2 deletions csharp/src/Apache.Arrow.Flight/FlightInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@
using System.Text;
using Apache.Arrow.Flight.Internal;
using Apache.Arrow.Ipc;
using Google.Protobuf;

namespace Apache.Arrow.Flight
{
public class FlightInfo
{
internal FlightInfo(Protocol.FlightInfo flightInfo)
{
Schema = FlightMessageSerializer.DecodeSchema(flightInfo.Schema.Memory);
Schema = flightInfo.Schema?.Length > 0 ? FlightMessageSerializer.DecodeSchema(flightInfo.Schema.Memory) : null;
Descriptor = new FlightDescriptor(flightInfo.FlightDescriptor);

var endpoints = new List<FlightEndpoint>();
Expand Down Expand Up @@ -60,7 +61,7 @@ public FlightInfo(Schema schema, FlightDescriptor descriptor, IReadOnlyList<Flig

internal Protocol.FlightInfo ToProtocol()
{
var serializedSchema = SchemaWriter.SerializeSchema(Schema);
var serializedSchema = Schema != null ? SchemaWriter.SerializeSchema(Schema) : ByteString.Empty;
var response = new Protocol.FlightInfo()
{
Schema = serializedSchema,
Expand Down
29 changes: 27 additions & 2 deletions csharp/test/Apache.Arrow.Flight.Tests/FlightTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,14 @@ private FlightInfo GivenStoreBatches(FlightDescriptor flightDescriptor, params R
{
var initialBatch = batches.FirstOrDefault();

var flightHolder = new FlightHolder(flightDescriptor, initialBatch.RecordBatch.Schema, _testWebFactory.GetAddress());
var flightHolder = new FlightHolder(flightDescriptor, initialBatch?.RecordBatch.Schema, _testWebFactory.GetAddress());

foreach (var batch in batches)
{
flightHolder.AddBatch(batch);
}

_flightStore.Flights.Add(flightDescriptor, flightHolder);
_flightStore.Flights[flightDescriptor] = flightHolder;

return flightHolder.GetFlightInfo();
}
Expand Down Expand Up @@ -124,6 +124,31 @@ public async Task TestPutTwoRecordBatches()
ArrowReaderVerifier.CompareBatches(expectedBatch2, actualBatches[1].RecordBatch);
}

[Fact]
public async Task TestGetRecordBatchWithDelayedSchema()
{
var flightDescriptor = FlightDescriptor.CreatePathDescriptor("test");
var expectedBatch = CreateTestBatch(0, 100);

//Add flight info only to the in memory store without schema or batch
GivenStoreBatches(flightDescriptor);

//Get the flight info for the ticket and verify the schema is null
var flightInfo = await _flightClient.GetInfo(flightDescriptor);
Assert.Single(flightInfo.Endpoints);
Assert.Null(flightInfo.Schema);

var endpoint = flightInfo.Endpoints.FirstOrDefault();

//Update the store with the batch and schema
GivenStoreBatches(flightDescriptor, new RecordBatchWithMetadata(expectedBatch));
var getStream = _flightClient.GetStream(endpoint.Ticket);
var resultList = await getStream.ResponseStream.ToListAsync();

Assert.Single(resultList);
ArrowReaderVerifier.CompareBatches(expectedBatch, resultList[0]);
}

[Fact]
public async Task TestGetSingleRecordBatch()
{
Expand Down

0 comments on commit 3212f42

Please sign in to comment.