@@ -15,11 +15,55 @@ using namespace NYql::NNodes;
1515
1616namespace {
1717
18+ TVector<TString> GetMissingInputColumnsForReturning (
19+ const TKiWriteTable& write, const TCoAtomList& inputColumns)
20+ {
21+ auto returnColumns = write.ReturningColumns ().Cast <TCoAtomList>();
22+ if (returnColumns.Ref ().ChildrenSize () == 0 ) {
23+ return {};
24+ }
25+
26+ THashSet<TStringBuf> currentInput;
27+ for (const auto & name : inputColumns) {
28+ currentInput.insert (name.Value ());
29+ }
30+
31+ TVector<TString> result;
32+ for (const auto & returnCol : returnColumns) {
33+ if (!currentInput.contains (returnCol.Value ())) {
34+ result.push_back (TString (returnCol.Value ()));
35+ }
36+ }
37+
38+ return result;
39+ }
40+
1841// Replace absent input columns to NULL to perform REPLACE via UPSERT
19- std::pair<TExprBase, TCoAtomList> CreateRowsToReplace ( const TExprBase& input,
42+ std::pair<TExprBase, TCoAtomList> ExtendInputRowsWithAbsentNullColumns ( const TKiWriteTable& write, const TExprBase& input,
2043 const TCoAtomList& inputColumns, const TKikimrTableDescription& tableDesc,
2144 TPositionHandle pos, TExprContext& ctx)
2245{
46+ TVector<TString> maybeMissingColumnsToReplace;
47+ const auto op = GetTableOp (write);
48+ if (op == TYdbOperation::Replace) {
49+ for (const auto &[name, _]: tableDesc.Metadata ->Columns ) {
50+ maybeMissingColumnsToReplace.push_back (name);
51+ }
52+ }
53+
54+ if (op == TYdbOperation::InsertAbort || op == TYdbOperation::InsertRevert || op == TYdbOperation::Upsert) {
55+ maybeMissingColumnsToReplace = GetMissingInputColumnsForReturning (write, inputColumns);
56+ if (maybeMissingColumnsToReplace.size () > 0 ) {
57+ for (const auto & inputCol: inputColumns) {
58+ maybeMissingColumnsToReplace.push_back (TString (inputCol.Value ()));
59+ }
60+ }
61+ }
62+
63+ if (maybeMissingColumnsToReplace.size () == 0 ) {
64+ return {input, inputColumns};
65+ }
66+
2367 THashSet<TStringBuf> inputColumnsSet;
2468 for (const auto & name : inputColumns) {
2569 inputColumnsSet.insert (name.Value ());
@@ -32,7 +76,7 @@ std::pair<TExprBase, TCoAtomList> CreateRowsToReplace(const TExprBase& input,
3276 TVector<TCoAtom> writeColumns;
3377 TVector<TExprBase> writeMembers;
3478
35- for (const auto & [ name, _] : tableDesc. Metadata -> Columns ) {
79+ for (const auto & name : maybeMissingColumnsToReplace ) {
3680 TMaybeNode<TExprBase> memberValue;
3781 if (tableDesc.GetKeyColumnIndex (name) || inputColumnsSet.contains (name)) {
3882 memberValue = Build<TCoMember>(ctx, pos)
@@ -214,21 +258,18 @@ TCoAtomList BuildUpsertInputColumns(const TCoAtomList& inputColumns,
214258}
215259
216260std::pair<TExprBase, TCoAtomList> BuildWriteInput (const TKiWriteTable& write, const TKikimrTableDescription& table,
217- const TCoAtomList& inputColumns, const TCoAtomList& autoIncrement, const bool isSink,
261+ const TCoAtomList& inputColumns, const TCoAtomList& autoIncrement, const bool /* isSink*/ ,
218262 TPositionHandle pos, TExprContext& ctx)
219263{
220264 auto input = write.Input ();
221- const bool isWriteReplace = (GetTableOp (write) == TYdbOperation::Replace) && !isSink;
222265
223266 TCoAtomList inputCols = BuildUpsertInputColumns (inputColumns, autoIncrement, pos, ctx);
224267
225268 if (autoIncrement.Ref ().ChildrenSize () > 0 ) {
226269 input = BuildKqlSequencer (input, table, inputCols, autoIncrement, pos, ctx);
227270 }
228271
229- if (isWriteReplace) {
230- std::tie (input, inputCols) = CreateRowsToReplace (input, inputCols, table, write.Pos (), ctx);
231- }
272+ std::tie (input, inputCols) = ExtendInputRowsWithAbsentNullColumns (write, input, inputCols, table, write.Pos (), ctx);
232273
233274 auto baseInput = Build<TKqpWriteConstraint>(ctx, pos)
234275 .Input (input)
@@ -238,15 +279,35 @@ std::pair<TExprBase, TCoAtomList> BuildWriteInput(const TKiWriteTable& write, co
238279 return {baseInput, inputCols};
239280}
240281
282+
283+ TCoAtomList ExtendGenerateOnInsertColumnsList (const TKiWriteTable& write, TCoAtomList& generateColumnsIfInsert, const TCoAtomList& inputColumns, const TCoAtomList& autoincrement, TExprContext& ctx) {
284+ auto inputCols = BuildUpsertInputColumns (inputColumns, autoincrement, write.Pos (), ctx);
285+ auto maybeMissingColumnsToReplace = GetMissingInputColumnsForReturning (write, inputCols);
286+ TVector<TExprNode::TPtr> result;
287+ result.reserve (generateColumnsIfInsert.Ref ().ChildrenSize () + maybeMissingColumnsToReplace.size ());
288+ for (const auto & item: generateColumnsIfInsert) {
289+ result.push_back (item.Ptr ());
290+ }
291+
292+ for (auto & name: maybeMissingColumnsToReplace) {
293+ auto atom = TCoAtom (ctx.NewAtom (write.Pos (), name));
294+ result.push_back (atom.Ptr ());
295+ }
296+
297+ return Build<TCoAtomList>(ctx, write.Pos ()).Add (result).Done ();
298+ }
299+
241300TExprBase BuildUpsertTable (const TKiWriteTable& write, const TCoAtomList& inputColumns,
242301 const TCoAtomList& autoincrement, const bool isSink,
243302 const TKikimrTableDescription& table, TExprContext& ctx)
244303{
245304 auto generateColumnsIfInsertNode = GetSetting (write.Settings ().Ref (), " generate_columns_if_insert" );
246305 YQL_ENSURE (generateColumnsIfInsertNode);
247306 TCoAtomList generateColumnsIfInsert = TCoNameValueTuple (generateColumnsIfInsertNode).Value ().Cast <TCoAtomList>();
248-
249307 auto settings = FilterSettings (write.Settings ().Ref (), {" AllowInconsistentWrites" }, ctx);
308+
309+ generateColumnsIfInsert = ExtendGenerateOnInsertColumnsList (write, generateColumnsIfInsert, inputColumns, autoincrement, ctx);
310+
250311 settings = AddSetting (*settings, write.Pos (), " Mode" , Build<TCoAtom>(ctx, write.Pos ()).Value (" upsert" ).Done ().Ptr (), ctx);
251312 const auto [input, columns] = BuildWriteInput (write, table, inputColumns, autoincrement, isSink, write.Pos (), ctx);
252313 if (generateColumnsIfInsert.Ref ().ChildrenSize () > 0 ) {
@@ -282,6 +343,8 @@ TExprBase BuildUpsertTableWithIndex(const TKiWriteTable& write, const TCoAtomLis
282343 YQL_ENSURE (generateColumnsIfInsertNode);
283344 TCoAtomList generateColumnsIfInsert = TCoNameValueTuple (generateColumnsIfInsertNode).Value ().Cast <TCoAtomList>();
284345
346+ generateColumnsIfInsert = ExtendGenerateOnInsertColumnsList (write, generateColumnsIfInsert, inputColumns, autoincrement, ctx);
347+
285348 auto effect = Build<TKqlUpsertRowsIndex>(ctx, write.Pos ())
286349 .Table (BuildTableMeta (table, write.Pos (), ctx))
287350 .Input (input.Ptr ())
0 commit comments