Skip to content

Commit a94d470

Browse files
committed
address review comments
1 parent 3ad7638 commit a94d470

File tree

6 files changed

+47
-55
lines changed

6 files changed

+47
-55
lines changed

sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,6 @@
3030
* this class per writing program, so that the memory segment/data buffer can be reused. Note that
3131
* for each incoming record, we should call `reset` of BufferHolder instance before write the record
3232
* and reuse the data buffer.
33-
*
34-
* Generally we should call `UnsafeRowWriter.setTotalSize` using `BufferHolder.totalSize` to update
35-
* the size of the result row, after writing a record to the buffer. However, we can skip this step
36-
* if the fields of row are all fixed-length, as the size of result row is also fixed.
3733
*/
3834
final class BufferHolder {
3935

sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeArrayWriter.java

Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -73,18 +73,14 @@ public void initialize(int numElements) {
7373
incrementCursor(headerInBytes + fixedPartInBytes);
7474
}
7575

76-
protected long getOffset(int ordinal, int elementSize) {
77-
return getElementOffset(ordinal, elementSize);
78-
}
79-
80-
private long getElementOffset(int ordinal, int elementSize) {
76+
private long getElementOffset(int ordinal) {
8177
return startingOffset + headerInBytes + ordinal * elementSize;
8278
}
8379

8480
@Override
85-
public void setOffsetAndSizeFromMark(int ordinal, int mark) {
81+
public void setOffsetAndSizeFromPreviousCursor(int ordinal, int mark) {
8682
assertIndexIsValid(ordinal);
87-
_setOffsetAndSizeFromMark(ordinal, mark);
83+
_setOffsetAndSizeFromPreviousCursor(ordinal, mark);
8884
}
8985

9086
private void setNullBit(int ordinal) {
@@ -95,62 +91,62 @@ private void setNullBit(int ordinal) {
9591
public void setNull1Bytes(int ordinal) {
9692
setNullBit(ordinal);
9793
// put zero into the corresponding field when set null
98-
Platform.putByte(buffer(), getElementOffset(ordinal, 1), (byte)0);
94+
Platform.putByte(buffer(), getElementOffset(ordinal), (byte)0);
9995
}
10096

10197
public void setNull2Bytes(int ordinal) {
10298
setNullBit(ordinal);
10399
// put zero into the corresponding field when set null
104-
Platform.putShort(buffer(), getElementOffset(ordinal, 2), (short)0);
100+
Platform.putShort(buffer(), getElementOffset(ordinal), (short)0);
105101
}
106102

107103
public void setNull4Bytes(int ordinal) {
108104
setNullBit(ordinal);
109105
// put zero into the corresponding field when set null
110-
Platform.putInt(buffer(), getElementOffset(ordinal, 4), 0);
106+
Platform.putInt(buffer(), getElementOffset(ordinal), 0);
111107
}
112108

113109
public void setNull8Bytes(int ordinal) {
114110
setNullBit(ordinal);
115111
// put zero into the corresponding field when set null
116-
Platform.putLong(buffer(), getElementOffset(ordinal, 8), (long)0);
112+
Platform.putLong(buffer(), getElementOffset(ordinal), (long)0);
117113
}
118114

119115
public void setNull(int ordinal) { setNull8Bytes(ordinal); }
120116

121117
public void write(int ordinal, boolean value) {
122118
assertIndexIsValid(ordinal);
123-
writeBoolean(getElementOffset(ordinal, 1), value);
119+
writeBoolean(getElementOffset(ordinal), value);
124120
}
125121

126122
public void write(int ordinal, byte value) {
127123
assertIndexIsValid(ordinal);
128-
writeByte(getElementOffset(ordinal, 1), value);
124+
writeByte(getElementOffset(ordinal), value);
129125
}
130126

131127
public void write(int ordinal, short value) {
132128
assertIndexIsValid(ordinal);
133-
writeShort(getElementOffset(ordinal, 2), value);
129+
writeShort(getElementOffset(ordinal), value);
134130
}
135131

136132
public void write(int ordinal, int value) {
137133
assertIndexIsValid(ordinal);
138-
writeInt(getElementOffset(ordinal, 4), value);
134+
writeInt(getElementOffset(ordinal), value);
139135
}
140136

141137
public void write(int ordinal, long value) {
142138
assertIndexIsValid(ordinal);
143-
writeLong(getElementOffset(ordinal, 8), value);
139+
writeLong(getElementOffset(ordinal), value);
144140
}
145141

146142
public void write(int ordinal, float value) {
147143
assertIndexIsValid(ordinal);
148-
writeFloat(getElementOffset(ordinal, 4), value);
144+
writeFloat(getElementOffset(ordinal), value);
149145
}
150146

151147
public void write(int ordinal, double value) {
152148
assertIndexIsValid(ordinal);
153-
writeDouble(getElementOffset(ordinal, 8), value);
149+
writeDouble(getElementOffset(ordinal), value);
154150
}
155151

156152
public void write(int ordinal, Decimal input, int precision, int scale) {

sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,10 @@
3434
* Note that if this is the outermost writer, which means we will always write from the very
3535
* beginning of the global row buffer, we don't need to update `startingOffset` and can just call
3636
* `zeroOutNullBytes` before writing new data.
37+
*
38+
* Generally we should call `UnsafeRowWriter.setTotalSize` to update the size of the result row,
39+
* after writing a record to the buffer. However, we can skip this step if the fields of row are
40+
* all fixed-length, as the size of result row is also fixed.
3741
*/
3842
public final class UnsafeRowWriter extends UnsafeWriter {
3943

@@ -130,18 +134,13 @@ public void setNull8Bytes(int ordinal) {
130134
setNullAt(ordinal);
131135
}
132136

133-
@Override
134-
protected final long getOffset(int ordinal, int elementSize) {
135-
return getFieldOffset(ordinal);
136-
}
137-
138137
public long getFieldOffset(int ordinal) {
139138
return startingOffset + nullBitsSize + 8 * ordinal;
140139
}
141140

142141
@Override
143-
public void setOffsetAndSizeFromMark(int ordinal, int mark) {
144-
_setOffsetAndSizeFromMark(ordinal, mark);
142+
public void setOffsetAndSizeFromPreviousCursor(int ordinal, int previousCursor) {
143+
_setOffsetAndSizeFromPreviousCursor(ordinal, previousCursor);
145144
}
146145

147146
public void write(int ordinal, boolean value) {

sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeWriter.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -67,10 +67,10 @@ public final void incrementCursor(int val) {
6767
holder.incrementCursor(val);
6868
}
6969

70-
public abstract void setOffsetAndSizeFromMark(int ordinal, int mark);
70+
public abstract void setOffsetAndSizeFromPreviousCursor(int ordinal, int previousCursor);
7171

72-
protected void _setOffsetAndSizeFromMark(int ordinal, int mark) {
73-
setOffsetAndSize(ordinal, mark, cursor() - mark);
72+
protected void _setOffsetAndSizeFromPreviousCursor(int ordinal, int previousCursor) {
73+
setOffsetAndSize(ordinal, previousCursor, cursor() - previousCursor);
7474
}
7575

7676
protected void setOffsetAndSize(int ordinal, int size) {
@@ -90,8 +90,6 @@ protected final void zeroOutPaddingBytes(int numBytes) {
9090
}
9191
}
9292

93-
protected abstract long getOffset(int ordinal, int elementSize);
94-
9593
public abstract void setNull1Bytes(int ordinal);
9694
public abstract void setNull2Bytes(int ordinal);
9795
public abstract void setNull4Bytes(int ordinal);

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedUnsafeProjection.scala

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ object InterpretedUnsafeProjection extends UnsafeProjectionCreator {
174174
val rowWriter = new UnsafeRowWriter(writer, numFields)
175175
val structWriter = generateStructWriter(rowWriter, fields)
176176
(v, i) => {
177-
val markCursor = writer.cursor()
177+
val previousCursor = writer.cursor()
178178
v.getStruct(i, fields.length) match {
179179
case row: UnsafeRow =>
180180
writeUnsafeData(
@@ -188,7 +188,7 @@ object InterpretedUnsafeProjection extends UnsafeProjectionCreator {
188188
rowWriter.resetRowWriter()
189189
structWriter.apply(row)
190190
}
191-
writer.setOffsetAndSizeFromMark(i, markCursor)
191+
writer.setOffsetAndSizeFromPreviousCursor(i, previousCursor)
192192
}
193193

194194
case ArrayType(elementType, containsNull) =>
@@ -198,9 +198,9 @@ object InterpretedUnsafeProjection extends UnsafeProjectionCreator {
198198
elementType,
199199
containsNull)
200200
(v, i) => {
201-
val markCursor = writer.cursor()
201+
val previousCursor = writer.cursor()
202202
writeArray(arrayWriter, elementWriter, v.getArray(i))
203-
writer.setOffsetAndSizeFromMark(i, markCursor)
203+
writer.setOffsetAndSizeFromPreviousCursor(i, previousCursor)
204204
}
205205

206206
case MapType(keyType, valueType, valueContainsNull) =>
@@ -215,7 +215,7 @@ object InterpretedUnsafeProjection extends UnsafeProjectionCreator {
215215
valueType,
216216
valueContainsNull)
217217
(v, i) => {
218-
val markCursor = writer.cursor()
218+
val previousCursor = writer.cursor()
219219
v.getMap(i) match {
220220
case map: UnsafeMapData =>
221221
writeUnsafeData(
@@ -231,12 +231,15 @@ object InterpretedUnsafeProjection extends UnsafeProjectionCreator {
231231
// Write the keys and write the numBytes of key array into the first 8 bytes.
232232
writeArray(keyArrayWriter, keyWriter, map.keyArray())
233233
Platform.putLong(
234-
valueArrayWriter.buffer, markCursor, valueArrayWriter.cursor - markCursor - 8)
234+
valueArrayWriter.buffer,
235+
previousCursor,
236+
valueArrayWriter.cursor - previousCursor - 8
237+
)
235238

236239
// Write the values.
237240
writeArray(valueArrayWriter, valueWriter, map.valueArray())
238241
}
239-
writer.setOffsetAndSizeFromMark(i, markCursor)
242+
writer.setOffsetAndSizeFromPreviousCursor(i, previousCursor)
240243
}
241244

242245
case udt: UserDefinedType[_] =>

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -104,34 +104,34 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro
104104
s"$rowWriter.write($index, (Decimal) null, ${t.precision}, ${t.scale});"
105105
case _ => s"$rowWriter.setNullAt($index);"
106106
}
107-
val markCursor = ctx.freshName("markCursor")
107+
val previousCursor = ctx.freshName("previousCursor")
108108

109109
val writeField = dt match {
110110
case t: StructType =>
111111
s"""
112112
// Remember the current cursor so that we can calculate how many bytes are
113113
// written later.
114-
final int $markCursor = $rowWriter.cursor();
114+
final int $previousCursor = $rowWriter.cursor();
115115
${writeStructToBuffer(ctx, input.value, t.map(_.dataType), rowWriter)}
116-
$rowWriter.setOffsetAndSizeFromMark($index, $markCursor);
116+
$rowWriter.setOffsetAndSizeFromPreviousCursor($index, $previousCursor);
117117
"""
118118

119119
case a @ ArrayType(et, _) =>
120120
s"""
121121
// Remember the current cursor so that we can calculate how many bytes are
122122
// written later.
123-
final int $markCursor = $rowWriter.cursor();
123+
final int $previousCursor = $rowWriter.cursor();
124124
${writeArrayToBuffer(ctx, input.value, et, rowWriter)}
125-
$rowWriter.setOffsetAndSizeFromMark($index, $markCursor);
125+
$rowWriter.setOffsetAndSizeFromPreviousCursor($index, $previousCursor);
126126
"""
127127

128128
case m @ MapType(kt, vt, _) =>
129129
s"""
130130
// Remember the current cursor so that we can calculate how many bytes are
131131
// written later.
132-
final int $markCursor = $rowWriter.cursor();
132+
final int $previousCursor = $rowWriter.cursor();
133133
${writeMapToBuffer(ctx, input.value, kt, vt, rowWriter)}
134-
$rowWriter.setOffsetAndSizeFromMark($index, $markCursor);
134+
$rowWriter.setOffsetAndSizeFromPreviousCursor($index, $previousCursor);
135135
"""
136136

137137
case t: DecimalType =>
@@ -203,29 +203,29 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro
203203
val arrayWriterClass = classOf[UnsafeArrayWriter].getName
204204
val arrayWriter = ctx.addMutableState(arrayWriterClass, "arrayWriter",
205205
v => s"$v = new $arrayWriterClass($rowWriter, $elementOrOffsetSize);")
206-
val markCursor = ctx.freshName("markCursor")
206+
val previousCursor = ctx.freshName("previousCursor")
207207

208208
val element = CodeGenerator.getValue(tmpInput, et, index)
209209
val writeElement = et match {
210210
case t: StructType =>
211211
s"""
212-
final int $markCursor = $arrayWriter.cursor();
212+
final int $previousCursor = $arrayWriter.cursor();
213213
${writeStructToBuffer(ctx, element, t.map(_.dataType), arrayWriter)}
214-
$arrayWriter.setOffsetAndSizeFromMark($index, $markCursor);
214+
$arrayWriter.setOffsetAndSizeFromPreviousCursor($index, $previousCursor);
215215
"""
216216

217217
case a @ ArrayType(et, _) =>
218218
s"""
219-
final int $markCursor = $arrayWriter.cursor();
219+
final int $previousCursor = $arrayWriter.cursor();
220220
${writeArrayToBuffer(ctx, element, et, arrayWriter)}
221-
$arrayWriter.setOffsetAndSizeFromMark($index, $markCursor);
221+
$arrayWriter.setOffsetAndSizeFromPreviousCursor($index, $previousCursor);
222222
"""
223223

224224
case m @ MapType(kt, vt, _) =>
225225
s"""
226-
final int $markCursor = $arrayWriter.cursor();
226+
final int $previousCursor = $arrayWriter.cursor();
227227
${writeMapToBuffer(ctx, element, kt, vt, arrayWriter)}
228-
$arrayWriter.setOffsetAndSizeFromMark($index, $markCursor);
228+
$arrayWriter.setOffsetAndSizeFromPreviousCursor($index, $previousCursor);
229229
"""
230230

231231
case t: DecimalType =>

0 commit comments

Comments
 (0)