Skip to content

Commit 2acf6e0

Browse files
authored
Async multi packet fixes (#3534)
* remove spare packet * fix read error with reused uncleared packet fix 0 length read at the start of a packe in plp stream returning 0 when continuing handle char array sizing better change existing test to use multiple packet sizes * additinal fix for 0 length case * fix pending read counters force process sni compatibility mode by default * use division not shift * Fix cached buffer attempting to use storage when continue is not occuring * fix and clarify RequiredLength vs CurrentLength in ProcessSniPacket * sync assertions * prevent multiple appends to an open snapshot * add continue capability back and make it opt-in for supporting callee functions * reduce memory usage when reading strings in continue mode * avoid 0 length array allocations * fix ReqeustContinue to be sync safe * fix 3527 shotren new test duration by using larger increments * fix rebase merge error * review feedback, add local for char size
1 parent cfaa576 commit 2acf6e0

File tree

9 files changed

+196
-118
lines changed

9 files changed

+196
-118
lines changed

src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/TdsParser.cs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13055,6 +13055,8 @@ internal TdsOperationStatus TryReadPlpUnicodeCharsWithContinue(TdsParserStateObj
1305513055
char[] temp = null;
1305613056
bool buffIsRented = false;
1305713057
int startOffset = 0;
13058+
13059+
stateObj.RequestContinue(true);
1305813060
(bool canContinue, bool isStarting, bool isContinuing) = stateObj.GetSnapshotStatuses();
1305913061

1306013062
if (canContinue)
@@ -13149,7 +13151,7 @@ bool writeDataSizeToSnapshot
1314913151
if (stateObj._longlen == 0)
1315013152
{
1315113153
Debug.Assert(stateObj._longlenleft == 0);
13152-
totalCharsRead = 0;
13154+
totalCharsRead = startOffsetByteCount / 2;
1315313155
return TdsOperationStatus.Done; // No data
1315413156
}
1315513157

@@ -13169,14 +13171,15 @@ bool writeDataSizeToSnapshot
1316913171
// later needing to repeatedly allocate new target buffers and copy data as we discover new data
1317013172
if (buff == null && stateObj._longlen != TdsEnums.SQL_PLP_UNKNOWNLEN && stateObj._longlen < (int.MaxValue >> 1))
1317113173
{
13172-
if (supportRentedBuff && stateObj._longlen < 1073741824) // 1 Gib
13174+
int stateLen = (int)stateObj._longlen >> 1;
13175+
if (supportRentedBuff && stateLen < 1073741824) // 1 Gib
1317313176
{
13174-
buff = ArrayPool<char>.Shared.Rent((int)Math.Min((int)stateObj._longlen, len));
13177+
buff = ArrayPool<char>.Shared.Rent(Math.Min(stateLen, len));
1317513178
rentedBuff = true;
1317613179
}
1317713180
else
1317813181
{
13179-
buff = new char[(int)Math.Min((int)stateObj._longlen, len)];
13182+
buff = new char[Math.Min(stateLen, len)];
1318013183
rentedBuff = false;
1318113184
}
1318213185
}

src/Microsoft.Data.SqlClient/netfx/src/Microsoft/Data/SqlClient/TdsParser.cs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13312,7 +13312,7 @@ bool writeDataSizeToSnapshot
1331213312
if (stateObj._longlen == 0)
1331313313
{
1331413314
Debug.Assert(stateObj._longlenleft == 0);
13315-
totalCharsRead = 0;
13315+
totalCharsRead = startOffsetByteCount / 2;
1331613316
return TdsOperationStatus.Done; // No data
1331713317
}
1331813318

@@ -13327,19 +13327,20 @@ bool writeDataSizeToSnapshot
1332713327
);
1332813328
charsLeft = len;
1332913329

13330-
// If total length is known up front, the length isn't specified as unknown
13331-
// and the caller doesn't pass int.max/2 indicating that it doesn't know the length
13332-
// allocate the whole buffer in one shot instead of realloc'ing and copying over each time
13330+
// If total data length is known up front from the plp header by being not SQL_PLP_UNKNOWNLEN
13331+
// and the number of chars required is less than int.max/2 allocate the entire buffer now to avoid
13332+
// later needing to repeatedly allocate new target buffers and copy data as we discover new data
1333313333
if (buff == null && stateObj._longlen != TdsEnums.SQL_PLP_UNKNOWNLEN && stateObj._longlen < (int.MaxValue >> 1))
1333413334
{
13335-
if (supportRentedBuff && stateObj._longlen < 1073741824) // 1 Gib
13335+
int stateLen = (int)stateObj._longlen >> 1;
13336+
if (supportRentedBuff && stateLen < 1073741824) // 1 Gib
1333613337
{
13337-
buff = ArrayPool<char>.Shared.Rent((int)Math.Min((int)stateObj._longlen, len));
13338+
buff = ArrayPool<char>.Shared.Rent(Math.Min(stateLen, len));
1333813339
rentedBuff = true;
1333913340
}
1334013341
else
1334113342
{
13342-
buff = new char[(int)Math.Min((int)stateObj._longlen, len)];
13343+
buff = new char[Math.Min(stateLen, len)];
1334313344
rentedBuff = false;
1334413345
}
1334513346
}

src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/SqlCachedBuffer.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,10 @@ internal static TdsOperationStatus TryCreate(SqlMetaDataPriv metadata, TdsParser
3939
{
4040
buffer = null;
4141

42-
(bool canContinue, bool isStarting, _) = stateObj.GetSnapshotStatuses();
42+
(bool canContinue, bool isStarting, bool isContinuing) = stateObj.GetSnapshotStatuses();
4343

4444
List<byte[]> cachedBytes = null;
45-
if (canContinue)
45+
if (canContinue && isContinuing)
4646
{
4747
cachedBytes = stateObj.TryTakeSnapshotStorage() as List<byte[]>;
4848
if (isStarting)
@@ -81,7 +81,7 @@ internal static TdsOperationStatus TryCreate(SqlMetaDataPriv metadata, TdsParser
8181
result = stateObj.TryReadPlpBytes(ref byteArr, 0, cb, out cb, canContinue, writeDataSizeToSnapshot: false, compatibilityMode: false);
8282
if (result != TdsOperationStatus.Done)
8383
{
84-
if (result == TdsOperationStatus.NeedMoreData && canContinue && cb == byteArr.Length)
84+
if (result == TdsOperationStatus.NeedMoreData && canContinue && cb == byteArr.Length && (isContinuing || !isStarting))
8585
{
8686
// succeeded in getting the data but failed to find the next plp length
8787
returnAfterAdd = true;

src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/SqlDataReader.cs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4238,9 +4238,7 @@ private TdsOperationStatus TryResetBlobState()
42384238
else
42394239
{
42404240
Debug.Assert(
4241-
(_sharedState._columnDataBytesRemaining == 0 || _sharedState._columnDataBytesRemaining == -1)
4242-
&&
4243-
(_stateObj._longlen == 0 || _stateObj.IsSnapshotContinuing()),
4241+
(_sharedState._columnDataBytesRemaining == 0 || _sharedState._columnDataBytesRemaining == -1),
42444242
"Haven't read header yet, but column is partially read?"
42454243
);
42464244
}
@@ -5395,6 +5393,10 @@ private static Task<T> GetFieldValueAsyncExecute<T>(Task task, object state)
53955393
{
53965394
return Task.FromResult<T>(reader.GetFieldValueFromSqlBufferInternal<T>(reader._data[columnIndex], reader._metaData[columnIndex], isAsync: true));
53975395
}
5396+
else
5397+
{
5398+
return reader.ExecuteAsyncCall(context);
5399+
}
53985400
}
53995401
}
54005402

src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/TdsParserStateObject.Multiplexer.cs

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,10 @@ public void ProcessSniPacket(PacketHandle packet, uint error)
4141
if (PartialPacketContainsCompletePacket())
4242
{
4343
Packet partialPacket = _partialPacket;
44+
// the partial packet can contain more than a single packet worth of data so to consume the
45+
// partial packet we must use the CurrentLength not just the RequiredLength and then later
46+
// the multiplexer will split out the complete packet for consumption and maintain the
47+
// additional data
4448
SetBuffer(partialPacket.Buffer, 0, partialPacket.CurrentLength);
4549
ClearPartialPacket();
4650
getDataError = TdsEnums.SNI_SUCCESS;
@@ -50,7 +54,7 @@ public void ProcessSniPacket(PacketHandle packet, uint error)
5054
{
5155
if (_inBytesRead != 0)
5256
{
53-
SetBuffer(new byte[_inBuff.Length], 0, 0);
57+
NewBuffer(_inBuff.Length);
5458
}
5559
getDataError = GetSniPacket(packet, ref dataSize);
5660
}
@@ -76,7 +80,7 @@ public void ProcessSniPacket(PacketHandle packet, uint error)
7680
{
7781
if (recurse && appended)
7882
{
79-
SetBuffer(new byte[_inBuff.Length], 0, 0);
83+
NewBuffer(_inBuff.Length);
8084
appended = false;
8185
}
8286
MultiplexPackets(
@@ -95,16 +99,19 @@ out recurse
9599
// if a partial packet was reconstructed it must be handled first
96100
if (consumePartialPacket)
97101
{
102+
// the partial packet has been processed by the multiplexer and should now have
103+
// only data from a single packet in it so we should use RequiredLength which
104+
// is defined by the packet header here not CurrentLength
105+
Debug.Assert(PartialPacket != null && PartialPacket.RequiredLength == PartialPacket.CurrentLength);
98106
if (_snapshot != null)
99107
{
100-
_snapshot.AppendPacketData(PartialPacket.Buffer, PartialPacket.CurrentLength);
101-
SetBuffer(new byte[_inBuff.Length], 0, 0);
108+
_snapshot.AppendPacketData(PartialPacket.Buffer, PartialPacket.RequiredLength);
109+
NewBuffer(_inBuff.Length);
102110
appended = true;
103111
}
104112
else
105113
{
106-
SetBuffer(PartialPacket.Buffer, 0, PartialPacket.CurrentLength);
107-
114+
SetBuffer(PartialPacket.Buffer, 0, PartialPacket.RequiredLength);
108115
}
109116
bufferIsPartialCompleted = true;
110117
ClearPartialPacket();
@@ -125,7 +132,7 @@ out recurse
125132
// if we SetBuffer here to clear the packet buffer we will break the attention handling which relies
126133
// on the attention containing packet remaining in the active buffer even if we're appending to the
127134
// snapshot so we will have to use the appended variable to prevent the same buffer being added again
128-
//// SetBuffer(new byte[_inBuff.Length], 0, 0);
135+
//// NewBuffer(_inBuff.Length);
129136
appended = true;
130137
}
131138
else
@@ -141,19 +148,28 @@ out recurse
141148
// we don't process it
142149
if (!bufferIsPartialCompleted)
143150
{
144-
SetBuffer(_inBuff, 0, 0);
151+
NewBuffer(_inBuff.Length);
145152
}
146153
}
147154

148155
// if there is a remainder it must be last
149156
if (remainderPacketProduced)
150157
{
151158
SetPartialPacket(remainderPacket);
159+
if (appended && recurse)
160+
{
161+
// if we've appended to the snapshot already we can't recurse and append to it again because the
162+
// snapshot might be cleared by the async cleanup functions
163+
// the only way to get a recurse output from the multiplexer is if it has produced a remainder packet so
164+
// assert that this is the case and the put the remainder packet in the partial packet so that it
165+
// can be picked up in another call.
166+
recurse = false;
167+
}
152168
if (!bufferIsPartialCompleted)
153169
{
154170
// we are keeping the partial packet buffer so replace it with a new one
155171
// unless we have already set the buffer to the partial packet buffer
156-
SetBuffer(new byte[_inBuff.Length], 0, 0);
172+
NewBuffer(_inBuff.Length);
157173
}
158174
}
159175

@@ -301,7 +317,7 @@ out bool recurse
301317
remainderPacket = new Packet
302318
{
303319
Buffer = new byte[dataBuffer.Length],
304-
CurrentLength = remainderLength,
320+
CurrentLength = remainderLength
305321
};
306322
remainderPacket.SetCreatedBy(1);
307323

0 commit comments

Comments
 (0)