Skip to content

Commit

Permalink
capture prepared statement handle in Go client
Browse files Browse the repository at this point in the history
  • Loading branch information
erratic-pattern committed Apr 13, 2024
1 parent 297ae38 commit ea854e4
Showing 1 changed file with 28 additions and 4 deletions.
32 changes: 28 additions & 4 deletions go/arrow/flight/flightsql/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1045,7 +1045,6 @@ func (p *PreparedStatement) Execute(ctx context.Context, opts ...grpc.CallOption
if err != nil {
return nil, err
}

wr, err := p.writeBindParameters(pstream, desc)
if err != nil {
return nil, err
Expand All @@ -1054,7 +1053,9 @@ func (p *PreparedStatement) Execute(ctx context.Context, opts ...grpc.CallOption
return nil, err
}
pstream.CloseSend()

if err = p.captureDoPutPreparedStatementHandle(pstream); err != nil {
return nil, err
}
// wait for the server to ack the result
if _, err = pstream.Recv(); err != nil && err != io.EOF {
return nil, err
Expand Down Expand Up @@ -1094,7 +1095,9 @@ func (p *PreparedStatement) ExecutePut(ctx context.Context, opts ...grpc.CallOpt
return err
}
pstream.CloseSend()

if err = p.captureDoPutPreparedStatementHandle(pstream); err != nil {
return err
}
// wait for the server to ack the result
if _, err = pstream.Recv(); err != nil && err != io.EOF {
return err
Expand Down Expand Up @@ -1140,7 +1143,9 @@ func (p *PreparedStatement) ExecutePoll(ctx context.Context, retryDescriptor *fl
return nil, err
}
pstream.CloseSend()

if err = p.captureDoPutPreparedStatementHandle(pstream); err != nil {
return nil, err
}
// wait for the server to ack the result
if _, err = pstream.Recv(); err != nil && err != io.EOF {
return nil, err
Expand Down Expand Up @@ -1180,6 +1185,9 @@ func (p *PreparedStatement) ExecuteUpdate(ctx context.Context, opts ...grpc.Call
if err != nil {
return
}
if err = p.captureDoPutPreparedStatementHandle(pstream); err != nil {
return
}
} else {
schema := arrow.NewSchema([]arrow.Field{}, nil)
wr = flight.NewRecordWriter(pstream, ipc.WithSchema(schema))
Expand Down Expand Up @@ -1234,6 +1242,22 @@ func (p *PreparedStatement) writeBindParameters(pstream pb.FlightService_DoPutCl
}
}

func (p *PreparedStatement) captureDoPutPreparedStatementHandle(pstream pb.FlightService_DoPutClient) error {
var (
result *pb.PutResult
preparedStatementResult pb.DoPutPreparedStatementResult
err error
)
if result, err = pstream.Recv(); err != nil && err != io.EOF {
return err
}
if err = proto.Unmarshal(result.GetAppMetadata(), &preparedStatementResult); err != nil {
return err
}
p.handle = preparedStatementResult.GetPreparedStatementHandle()
return nil
}

// DatasetSchema may be nil if the server did not return it when creating the
// Prepared Statement.
func (p *PreparedStatement) DatasetSchema() *arrow.Schema { return p.datasetSchema }
Expand Down

0 comments on commit ea854e4

Please sign in to comment.