Skip to content

Commit

Permalink
used implementation of delta serialization on distributed network, fi…
Browse files Browse the repository at this point in the history
…xed relative ridbag issues
  • Loading branch information
tglman committed Sep 30, 2019
1 parent cdd9224 commit bcdc5b3
Show file tree
Hide file tree
Showing 8 changed files with 65 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -270,41 +270,48 @@ public void checkAndConvert() {
if (isEmbedded() && ODatabaseRecordThreadLocal.instance().get().getSbTreeCollectionManager() != null
&& delegate.size() >= topThreshold) {
ORidBagDelegate oldDelegate = delegate;
boolean isTransactionModified = oldDelegate.isTransactionModified();
delegate = new OSBTreeRidBag();
boolean oldAutoConvert = oldDelegate.isAutoConvertToRecord();
oldDelegate.setAutoConvertToRecord(false);

final ORecordElement owner = oldDelegate.getOwner();
delegate.disableTracking(owner);
for (OIdentifiable identifiable : oldDelegate) {
delegate.add(identifiable);
}

final ORecordElement owner = oldDelegate.getOwner();
delegate.setOwner(owner);

delegate.setTracker(oldDelegate.getTracker());
oldDelegate.disableTracking(owner);

delegate.setDirty();
delegate.setTransactionModified(isTransactionModified);
delegate.enableTracking(owner);

oldDelegate.setAutoConvertToRecord(oldAutoConvert);
oldDelegate.requestDelete();
} else if (bottomThreshold >= 0 && !isEmbedded() && delegate.size() <= bottomThreshold) {
ORidBagDelegate oldDelegate = delegate;
boolean isTransactionModified = oldDelegate.isTransactionModified();
boolean oldAutoConvert = oldDelegate.isAutoConvertToRecord();
oldDelegate.setAutoConvertToRecord(false);
delegate = new OEmbeddedRidBag();

final ORecordElement owner = oldDelegate.getOwner();
delegate.disableTracking(owner);
for (OIdentifiable identifiable : oldDelegate) {
delegate.add(identifiable);
}

final ORecordElement owner = oldDelegate.getOwner();
delegate.setOwner(owner);

delegate.setTracker(oldDelegate.getTracker());
oldDelegate.disableTracking(owner);

delegate.setDirty();
delegate.setTransactionModified(isTransactionModified);
delegate.enableTracking(owner);

oldDelegate.setAutoConvertToRecord(oldAutoConvert);
oldDelegate.requestDelete();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,6 @@ public interface ORidBagDelegate
OSimpleMultiValueTracker<OIdentifiable, OIdentifiable> getTracker();

void setTracker(OSimpleMultiValueTracker<OIdentifiable, OIdentifiable> tracker);

void setTransactionModified(boolean transactionModified);
}
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,6 @@ public void remove(OIdentifiable identifiable) {
*
* @param index
* @param newValue
*
* @return
*/
public boolean swap(int index, OIdentifiable newValue) {
Expand Down Expand Up @@ -616,6 +615,10 @@ public <RET> RET setDirty() {
return (RET) this;
}

public void setTransactionModified(boolean transactionDirty) {
this.transactionDirty = transactionDirty;
}

@Override
public void setDirtyNoChanged() {
if (owner != null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,9 @@ public boolean isEnabled() {
return enabled;
}

public void sourceFrom(OSimpleMultiValueTracker<V, V> tracker) {
public void sourceFrom(OSimpleMultiValueTracker<K, V> tracker) {
this.timeLine = tracker.timeLine;
this.transactionTimeLine = tracker.transactionTimeLine;
}

public OMultiValueChangeTimeLine<Object, Object> getTimeLine() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,7 @@ private void serializeDeltaLinkBag(BytesContainer bytes, ORidBag value) {
OUUIDSerializer.INSTANCE.serialize(uuid, bytes.bytes, uuidPos);

OMultiValueChangeTimeLine<OIdentifiable, OIdentifiable> timeline = value.getTransactionTimeLine();
assert timeline != null : "Collection timeline required for link* types serialization";
assert timeline != null : "Collection timeline required for link types serialization";
OVarIntSerializer.write(bytes, timeline.getMultiValueChangeEvents().size());
for (OMultiValueChangeEvent<OIdentifiable, OIdentifiable> event : timeline.getMultiValueChangeEvents()) {
switch (event.getChangeType()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1067,6 +1067,10 @@ public <RET> RET setDirty() {
return (RET) this;
}

public void setTransactionModified(boolean transactionDirty) {
this.transactionDirty = transactionDirty;
}

@Override
public void setDirtyNoChanged() {
if (owner != null)
Expand All @@ -1089,4 +1093,5 @@ public void setTracker(OSimpleMultiValueTracker<OIdentifiable, OIdentifiable> tr
public OMultiValueChangeTimeLine<OIdentifiable, OIdentifiable> getTransactionTimeLine() {
return this.tracker.getTransactionTimeLine();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -71,29 +71,23 @@ private void resolveTracking(ORecordOperation change, boolean onlyExecutorCase)
break;
case ORecordOperation.UPDATED: {
OIdentifiable updateRecord = change.getRecord();

ODocument original = database.load(updateRecord.getIdentity());
if (original == null) {
throw new ORecordNotFoundException(updateRecord.getIdentity());
}

original.merge((ODocument) updateRecord, false, false);

ODocument updateDoc = original;
OLiveQueryHook.addOp(updateDoc, ORecordOperation.UPDATED, database);
OLiveQueryHookV2.addOp(updateDoc, ORecordOperation.UPDATED, database);
OImmutableClass clazz = ODocumentInternal.getImmutableSchemaClass(updateDoc);
if (clazz != null) {
if (onlyExecutorCase) {
OClassIndexManager.processIndexOnUpdate(database, updateDoc, changes);
}
if (clazz.isFunction()) {
database.getSharedContext().getFunctionLibrary().updatedFunction(updateDoc);
Orient.instance().getScriptManager().close(database.getName());
}
if (clazz.isSequence()) {
((OSequenceLibraryProxy) database.getMetadata().getSequenceLibrary()).getDelegate()
.onSequenceUpdated(database, updateDoc);
if (updateRecord instanceof ODocument) {
ODocument updateDoc = (ODocument) updateRecord;
OLiveQueryHook.addOp(updateDoc, ORecordOperation.UPDATED, database);
OLiveQueryHookV2.addOp(updateDoc, ORecordOperation.UPDATED, database);
OImmutableClass clazz = ODocumentInternal.getImmutableSchemaClass(updateDoc);
if (clazz != null) {
if (onlyExecutorCase) {
OClassIndexManager.processIndexOnUpdate(database, updateDoc, changes);
}
if (clazz.isFunction()) {
database.getSharedContext().getFunctionLibrary().updatedFunction(updateDoc);
Orient.instance().getScriptManager().close(database.getName());
}
if (clazz.isSequence()) {
((OSequenceLibraryProxy) database.getMetadata().getSequenceLibrary()).getDelegate()
.onSequenceUpdated(database, updateDoc);
}
}
}
updatedRecords.put(change.getRID(), change.getRecord());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import com.orientechnologies.orient.core.id.ORecordId;
import com.orientechnologies.orient.core.record.ORecord;
import com.orientechnologies.orient.core.record.ORecordInternal;
import com.orientechnologies.orient.core.record.impl.ODocument;
import com.orientechnologies.orient.core.serialization.serializer.record.binary.ODocumentSerializerDelta;
import com.orientechnologies.orient.core.serialization.serializer.record.binary.ORecordSerializerNetworkDistributed;
import com.orientechnologies.orient.core.storage.ORecordDuplicatedException;
import com.orientechnologies.orient.core.storage.impl.local.paginated.wal.OLogSequenceNumber;
Expand Down Expand Up @@ -69,10 +71,17 @@ public void genOps(List<ORecordOperation> ops) {
request.setRecordType(ORecordInternal.getRecordType(txEntry.getRecord()));
switch (txEntry.type) {
case ORecordOperation.CREATED:
case ORecordOperation.UPDATED:
request.setRecord(ORecordSerializerNetworkDistributed.INSTANCE.toStream(txEntry.getRecord()));
request.setContentChanged(ORecordInternal.isContentChanged(txEntry.getRecord()));
break;
case ORecordOperation.UPDATED:
if (request.getRecordType() == ODocument.RECORD_TYPE) {
request.setRecord(ODocumentSerializerDelta.instance().serializeDelta((ODocument) txEntry.getRecord()));
} else {
request.setRecord(ORecordSerializerNetworkDistributed.INSTANCE.toStream(txEntry.getRecord()));
}
request.setContentChanged(ORecordInternal.isContentChanged(txEntry.getRecord()));
break;
case ORecordOperation.DELETED:
break;
}
Expand Down Expand Up @@ -162,20 +171,30 @@ private void convert(ODatabaseDocumentInternal database) {

ORecord record = null;
switch (type) {
case ORecordOperation.CREATED:
case ORecordOperation.UPDATED: {
case ORecordOperation.CREATED: {
record = ORecordSerializerNetworkDistributed.INSTANCE.fromStream(req.getRecord(), null);
ORecordInternal.setRecordSerializer(record, database.getSerializer());
break;
}
case ORecordOperation.UPDATED: {
if (req.getRecordType() == ODocument.RECORD_TYPE) {
record = database.load(req.getId());
ODocumentSerializerDelta.instance().deserializeDelta(req.getRecord(), (ODocument) record);
} else {
record = ORecordSerializerNetworkDistributed.INSTANCE.fromStream(req.getRecord(), null);
ORecordInternal.setRecordSerializer(record, database.getSerializer());
}
break;
case ORecordOperation.DELETED:
record = database.getRecord(req.getId());
}
case ORecordOperation.DELETED: {
record = database.load(req.getId());
if (record == null) {
record = Orient.instance().getRecordFactoryManager()
.newInstance(req.getRecordType(), req.getId().getClusterId(), database);
}
break;
}
}
ORecordInternal.setIdentity(record, (ORecordId) req.getId());
ORecordInternal.setVersion(record, req.getVersion());
ORecordOperation op = new ORecordOperation(record, type);
Expand Down

0 comments on commit bcdc5b3

Please sign in to comment.