Skip to content

Commit

Permalink
drop CBlockLock, as it is not needed and causes severe performance de…
Browse files Browse the repository at this point in the history
…gradation
  • Loading branch information
awildturtok committed Jul 2, 2024
1 parent 49a1af7 commit ce4b91b
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@
import com.bakdata.conquery.models.datasets.concepts.Connector;
import com.bakdata.conquery.models.datasets.concepts.tree.ConceptTreeConnector;
import com.bakdata.conquery.models.datasets.concepts.tree.TreeConcept;
import com.bakdata.conquery.models.identifiable.IdMutex;
import com.bakdata.conquery.models.identifiable.IdMutex.Locked;
import com.bakdata.conquery.models.identifiable.ids.specific.CBlockId;
import com.bakdata.conquery.models.identifiable.ids.specific.ConnectorId;
import com.bakdata.conquery.models.jobs.CalculateCBlocksJob;
import com.bakdata.conquery.models.jobs.JobManager;
import com.bakdata.conquery.models.query.entity.Entity;
Expand All @@ -47,7 +44,6 @@
@RequiredArgsConstructor
public class BucketManager {

private final IdMutex<ConnectorId> cBlockLocks = new IdMutex<>();
private final JobManager jobManager;
private final WorkerStorage storage;

Expand Down Expand Up @@ -123,9 +119,6 @@ private static void registerCBlock(CBlock cBlock, Map<Connector, Int2ObjectMap<M
.put(cBlock.getBucket(), cBlock);
}

public Locked acquireLock(Connector connector) {
return cBlockLocks.acquire(connector.getId());
}

@SneakyThrows
public void fullUpdate() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import com.bakdata.conquery.models.events.Bucket;
import com.bakdata.conquery.models.events.BucketManager;
import com.bakdata.conquery.models.events.CBlock;
import com.bakdata.conquery.models.identifiable.IdMutex;
import com.bakdata.conquery.models.identifiable.ids.specific.CBlockId;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
Expand Down Expand Up @@ -47,7 +46,7 @@ public void addCBlock(Bucket bucket, ConceptTreeConnector connector) {

@Override
public void execute() throws Exception {
if(infos.isEmpty()){
if (infos.isEmpty()) {
return;
}

Expand All @@ -56,10 +55,10 @@ public void execute() throws Exception {
final ListeningExecutorService executorService = MoreExecutors.listeningDecorator(this.executorService);

final List<? extends ListenableFuture<?>> futures = infos.stream()
.map(this::createInformationProcessor)
.map(executorService::submit)
.peek(f -> f.addListener(this::incrementProgressReporter, MoreExecutors.directExecutor()))
.collect(Collectors.toList());
.map(this::createInformationProcessor)
.map(executorService::submit)
.peek(f -> f.addListener(this::incrementProgressReporter, MoreExecutors.directExecutor()))
.collect(Collectors.toList());

Futures.allAsList(futures).get();
}
Expand Down Expand Up @@ -98,17 +97,15 @@ private static class CalculationInformationProcessor implements Runnable {
@Override
public void run() {
try {
try(IdMutex.Locked ignored = bucketManager.acquireLock(info.connector)) {
if (bucketManager.hasCBlock(info.getCBlockId())) {
log.trace("Skipping calculation of CBlock[{}] because its already present in the BucketManager.", info.getCBlockId());
return;
}
if (bucketManager.hasCBlock(info.getCBlockId())) {
log.trace("Skipping calculation of CBlock[{}] because its already present in the BucketManager.", info.getCBlockId());
return;
}

CBlock cBlock = CBlock.createCBlock(info.getConnector(), info.getBucket(), bucketManager.getEntityBucketSize());
CBlock cBlock = CBlock.createCBlock(info.getConnector(), info.getBucket(), bucketManager.getEntityBucketSize());

bucketManager.addCalculatedCBlock(cBlock);
storage.addCBlock(cBlock);
}
bucketManager.addCalculatedCBlock(cBlock);
storage.addCBlock(cBlock);
}
catch (Exception e) {
throw new RuntimeException(
Expand Down

0 comments on commit ce4b91b

Please sign in to comment.