diff --git a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalDAO.java b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalDAO.java index 38a0c623e..a567bf606 100644 --- a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalDAO.java +++ b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalDAO.java @@ -759,24 +759,18 @@ private EbeanMetadataAspect buildMetadataAspectB // metadata_aspect schema and we don't take this route here to keep this change backward compatible. private static final String OPTIMISTIC_LOCKING_UPDATE_SQL = "UPDATE metadata_aspect " + "SET urn = :urn, aspect = :aspect, version = :version, metadata = :metadata, createdOn = :createdOn, createdBy = :createdBy " - + "WHERE urn = :urn and aspect = :aspect and version = :version"; + + "WHERE urn = :urn and aspect = :aspect and version = :version and createdOn = :oldTimestamp"; /** * Assembly SQL UPDATE script for old Schema. * @param aspect {@link EbeanMetadataAspect} - * @param oldTimestamp old timestamp. If provided, the generated SQL will use optimistic locking and do compare-and-set + * @param oldTimestamp old timestamp.The generated SQL will use optimistic locking and do compare-and-set * with oldTimestamp during the update. * @return {@link SqlUpdate} for SQL update execution */ - private SqlUpdate assembleOldSchemaSqlUpdate(@Nonnull EbeanMetadataAspect aspect, @Nullable Timestamp oldTimestamp) { - - final SqlUpdate oldSchemaSqlUpdate; - if (oldTimestamp == null) { - oldSchemaSqlUpdate = _server.createSqlUpdate(OPTIMISTIC_LOCKING_UPDATE_SQL); - } else { - oldSchemaSqlUpdate = _server.createSqlUpdate(OPTIMISTIC_LOCKING_UPDATE_SQL + " and createdOn = :oldTimestamp"); - oldSchemaSqlUpdate.setParameter("oldTimestamp", oldTimestamp); - } + private SqlUpdate assembleOldSchemaSqlUpdate(@Nonnull EbeanMetadataAspect aspect, @Nonnull Timestamp oldTimestamp) { + final SqlUpdate oldSchemaSqlUpdate = _server.createSqlUpdate(OPTIMISTIC_LOCKING_UPDATE_SQL); + oldSchemaSqlUpdate.setParameter("oldTimestamp", oldTimestamp); oldSchemaSqlUpdate.setParameter("urn", aspect.getKey().getUrn()); oldSchemaSqlUpdate.setParameter("aspect", aspect.getKey().getAspect()); oldSchemaSqlUpdate.setParameter("version", aspect.getKey().getVersion()); @@ -806,9 +800,14 @@ protected void updateWithOptimisticLocking(@Nonn final SqlUpdate oldSchemaSqlUpdate; if (_schemaConfig == SchemaConfig.NEW_SCHEMA_ONLY) { // In NEW_SCHEMA, the entity table is the SOT and getLatest (oldTimestamp) reads from the entity - // table. Therefore, we will apply compare-and-set with oldTimestamp on entity table (addWithOptimisticLocking) - // aspect table will apply regular update over (urn, aspect, version) primary key combination. - oldSchemaSqlUpdate = assembleOldSchemaSqlUpdate(aspect, null); + // table. Therefore, we will apply compare-and-set with oldTimestamp on entity table (addWithOptimisticLocking). + // We will also apply an optimistic locking update over (urn, aspect, version) primary key combination to avoid duplicate + // key exceptions when the primary key includes createdon. + EbeanMetadataAspect result = findLatestMetadataAspect(_server, urn, aspectClass); + if (result == null) { + throw new IllegalStateException("No entry from aspect table found even though one was expected. Urn: " + urn + ", Aspect class:" + aspectClass); + } + oldSchemaSqlUpdate = assembleOldSchemaSqlUpdate(aspect, result.getCreatedOn()); numOfUpdatedRows = runInTransactionWithRetry(() -> { // DUAL WRITE: 1) update aspect table, 2) update entity table. // Note: when cold-archive is enabled, this method: updateWithOptimisticLocking will not be called.