Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle updates to table schema when using Storage API writes. #24145

Merged
merged 1 commit into from
Jan 20, 2023

Conversation

reuvenlax
Copy link
Contributor

No description provided.

@github-actions
Copy link
Contributor

Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer:

R: @kennknowles for label java.
R: @ahmedabu98 for label io.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

@github-actions
Copy link
Contributor

Reminder, please take a look at this pr: @kennknowles @ahmedabu98

@github-actions
Copy link
Contributor

Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment assign to next reviewer:

R: @lukecwik for label java.
R: @pabloem for label io.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

@github-actions
Copy link
Contributor

Reminder, please take a look at this pr: @lukecwik @pabloem

@kennknowles
Copy link
Member

There's no description. Is this just a working draft?

@reuvenlax
Copy link
Contributor Author

reuvenlax commented Nov 30, 2022 via email

@reuvenlax reuvenlax force-pushed the schema_update_push_notification branch from 7d77609 to 235400c Compare December 1, 2022 00:28
@github-actions github-actions bot added hcatalog and removed hcatalog labels Dec 1, 2022
@lukecwik lukecwik changed the title Schema update push notification [WIP] Schema update push notification Dec 1, 2022
@github-actions github-actions bot added hcatalog and removed hcatalog labels Dec 2, 2022
@reuvenlax reuvenlax force-pushed the schema_update_push_notification branch from 299b96a to 283f6e1 Compare December 2, 2022 04:31
@github-actions github-actions bot added hcatalog and removed hcatalog labels Dec 2, 2022
@reuvenlax reuvenlax changed the title [WIP] Schema update push notification Handle updates to table schema when using Storage API writes. Dec 2, 2022
@reuvenlax
Copy link
Contributor Author

R: @prodriguezdefino
R: @yirutang

@github-actions
Copy link
Contributor

github-actions bot commented Dec 2, 2022

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control

@yirutang
Copy link
Contributor

yirutang commented Dec 7, 2022

R: @GaoleMeng FYI

@yirutang
Copy link
Contributor

yirutang commented Dec 7, 2022

R: @agrawal-siddharth FYI

@reuvenlax
Copy link
Contributor Author

friendly ping!

@reuvenlax
Copy link
Contributor Author

friendly ping

Copy link
Contributor

@prodriguezdefino prodriguezdefino left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

left few comment nits, but overall looks good to me

.encodeUnknownFields(unknownFields, ignoreUnknownValues));
} catch (TableRowToStorageApiProto.SchemaConversionException e) {
TableRow tableRow = appendClientInfo.toTableRow(payloadBytes);
// TODO(reuvenlax): We need to merge the unknown fields in!
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe just include the information of the unknown fields to the error message returned would be sufficient for debugging purposes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now this codepath is disabled. will revisit later.

@@ -488,6 +542,14 @@ long flush(
return RetryType.RETRY_ALL_OPERATIONS;
},
c -> {
AppendRowsResponse response = Preconditions.checkStateNotNull(c.getResult());
if (autoUpdateSchema && response.hasUpdatedSchema()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we get a similar information piece on schema updates when the insert fails? if so that info added to the failed insert rows could help on debugging.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't believe we do.

appendClientInfo.set(
AppendClientInfo.of(
updatedSchema.read(), appendClientInfo.get().getCloseAppendClient()));
// TODO: invalidate?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems that the unsharded version of the writes do invalidate when schema has changed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that put() will invalidate the old value

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added invalidate

appendClientInfo.set(
AppendClientInfo.of(
updatedSchemaReturned.get(), appendClientInfo.get().getCloseAppendClient()));
// TODO: invalidate?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as in line 434

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same - I believe that put() invalidates the old value

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added invalidate


@Override
public StorageApiWritePayload toMessage(TableRow tableRow, boolean respectRequired) {
throw new RuntimeException("Not supported");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I misunderstood the logic, but would it be possible to add a validation in BigQueryIO that schema update is not available for Beam Row payloads?
If I'm not mistaken someone could configure the writes with useBeamSchema() and set also auto update of schemas and then their pipeline will fail with this runtime exception, when in fact we could have captured that in validation time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@reuvenlax reuvenlax force-pushed the schema_update_push_notification branch from 5b25910 to 3223f4e Compare January 6, 2023 20:38
@github-actions github-actions bot removed the hcatalog label Jan 6, 2023
@reuvenlax reuvenlax force-pushed the schema_update_push_notification branch 2 times, most recently from e1616c3 to 3f1394c Compare January 11, 2023 02:39
@reuvenlax
Copy link
Contributor Author

@prodriguezdefino comments addressed

@prodriguezdefino
Copy link
Contributor

LGTM

@reuvenlax
Copy link
Contributor Author

Run Java_GCP_IO_Direct PreCommit

return updatedTableSchema.hashCode() != getTableSchema().hashCode();
}

public ByteString encodeUnknownFields(TableRow unknown, boolean ignoreUnknownValues)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case of ignoreUnknonwValues to be false, this will be a void operation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No - these are fields that are unknown to the prior step. They make actually end up being known to the current step due to the updated schema.

if (appendClientInfo == null) {
appendClientInfo = getAppendClientInfo(true, null);
}
@Nullable TableRow unknownFields = payload.getUnknownFields();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if ignoreUnknownValues is false, could we avoid doing all the following (to save some process time).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unknownFields are values that were unknown to the prior conversion step. These fields may be known to the writing step (since it gets schema updates back from Vortex) so we can't ignore them here.

invalidateWriteStream();
appendClientInfo =
Preconditions.checkStateNotNull(getAppendClientInfo(false, updatedTableSchema));
updatedTableSchema = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering if there will be races regarding this updatedSchema?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what race do you envision?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some responses are coming back and the updatedTableSchema is being updated L568, which in race with the postFlush here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

postFlush is called only after all futures have completed (in flushAll), so we would not expect any more callbacks. Also note that RetryManager calls these response callbacks in the primary thread (RetryManager.await() calls the callbacks), so the callbacks here are not being called asynchronously.

Preconditions.checkStateNotNull(info.getStreamAppendClient()).pin();
return info;
}));
TableSchema updatedSchemaValue = updatedSchema.read();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updatedSchema can be null?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updatedSchema is a state variable. If it has never been set, then it will return null.

@@ -479,15 +502,41 @@ public static DynamicMessage messageFromTableRow(
}
}

if (unknownFields != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this file the conversion stage?

If user has input ABC and the original schema is AB but later on the schema is updated to ABC, will L496 fail the check?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, because the caller (StorageApiDynamicDestinationsTableRow.java:151) passes in true if autoSchemaUpdates==true

@Test
public void testIgnoreUnknownNestedField() throws Exception {
TableRow rowNoF = new TableRow();
rowNoF.putAll(BASE_TABLE_ROW_NO_F);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just try to understand the test, so Beam accepts two format of TableRow? One is "F"->List of values and the other is List(field_name, field_value), and they can be mixed up as nested field value to the same TableRow struct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

correct. This is the unfortunate history of TableRow (which long predated Beam)

assertEquals(1, ((TableRow) unknown.get("nestedvaluenof1")).size());
assertEquals(
"foobar",
((TableRow) unknown.get("nestedvalue1")).getF().get(BASE_TABLE_ROW.getF().size()).getV());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, the unknown value is at the last offset, is there elements ahead of it? What if the unknown field is in the middle of the array list?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an unordered map, so there should be no offsets.

.to(tableRef)
.withMethod(method)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.ignoreUnknownValues()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Add a test for ignoreUnknownValue(false) and withAutoSchemaUpdate(true)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

.map(tr -> filterUnknownValues(tr, tableSchema.getFields()))
.collect(Collectors.toList());
Iterable<TableRow> expectedFullValues =
LongStream.range(6, 10).mapToObj(getRowSet).collect(Collectors.toList());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be 6,9 and above 0,5?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(6,10) is open on the upper bound (i.e. it's [6, 10) )


/**
* Takes in an iterable and batches the results into multiple ProtoRows objects. The splitSize
* parameter controls how many rows are batched into a single ProtoRows object before we move on to
* the next one.
*/
class SplittingIterable implements Iterable<ProtoRows> {
interface ConvertUnknownFields {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I somehow couldn't find the implementation of this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a Java functional interface - any matching lambda will conform. e.g. you can pass in (tableRow, ignore) -> {} and this will conform to the interface

// into a proto and concatenate to the existing proto.
try {
byteString =
byteString.concat(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I missed something, so every time we will concat the unknown fields with the existing byte string? Then what's the difference between this and passing down the entire message? Maybe the unknownFieldsToMessage will filter something out? But I don't see it has a schema.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The prior convert message only includes fields known to it in the proto it generates. It can't include fields it doesn't know about as they would have to be in the proto descriptor (and it can't use the proto's unknown field set as that requires field ids, which is not known yet).

Therefore the incoming byteString contains only fields that were known to the convert stage, and all other fields are put into the json unknownFields object. What we are doing here is taking advantage of the fact that the write step has a more up-to-date view on the schema, so we walk over the unknownFields json and extract whatever fields are now known (which might still be only a subset of the remaining fields). We then convert those unknownFields to a proto, and concatenate the two protos.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not an expert in modern Java... Where is the implementation of unknownFieldsToMessage.convert? How could only convert the "known" unknown fields?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm, found it, should be encodeUnknownFields

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, code looks good here.

@@ -35,6 +35,8 @@

StorageApiWritePayload toMessage(T element) throws Exception;

StorageApiWritePayload toMessage(TableRow tableRow, boolean respectRequired) throws Exception;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this relevant to this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't simply take the unknownFields and convert them to a proto, as there may be missing required fields (because those fields are in the original proto). We need a way to do the conversion without enforcing nullability.

// If we got a response indicating an updated schema, recreate the client.
if (updatedTableSchema != null
&& this.appendClientInfo != null
&& this.appendClientInfo.hasSchemaChanged(updatedTableSchema)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this is expensive to perform, theoretically it is not needed since when setting updatedTableSchema it is already checked.

We also provided something here:
https://github.com/googleapis/java-bigquerystorage/blob/main/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java#L157

If that field is not null, it means a new schema appeared. We do the comparison based on timestamp.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed this

@@ -652,6 +692,18 @@ public void process(
}
appendSplitDistribution.update(numAppends);

if (updatedSchemaReturned.get() != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe - is there an advantage to using that over the one in the response?

Copy link
Contributor

@yirutang yirutang Jan 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the logic can be simplified down to:
https://screenshot.googleplex.com/9o5fvs3UgNEWa9E

No extra comparison needed.

However it can only capture the first schema update event, which is enough since we will refresh the whole writer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@reuvenlax reuvenlax force-pushed the schema_update_push_notification branch from e0dd65d to 7ad44c8 Compare January 19, 2023 21:50
@reuvenlax reuvenlax merged commit f5020e7 into apache:master Jan 20, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants