diff --git a/backend/src/main/java/com/bakdata/conquery/models/events/BucketManager.java b/backend/src/main/java/com/bakdata/conquery/models/events/BucketManager.java index 2135951817..fe91ec2ba8 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/events/BucketManager.java +++ b/backend/src/main/java/com/bakdata/conquery/models/events/BucketManager.java @@ -18,10 +18,7 @@ import com.bakdata.conquery.models.datasets.concepts.Connector; import com.bakdata.conquery.models.datasets.concepts.tree.ConceptTreeConnector; import com.bakdata.conquery.models.datasets.concepts.tree.TreeConcept; -import com.bakdata.conquery.models.identifiable.IdMutex; -import com.bakdata.conquery.models.identifiable.IdMutex.Locked; import com.bakdata.conquery.models.identifiable.ids.specific.CBlockId; -import com.bakdata.conquery.models.identifiable.ids.specific.ConnectorId; import com.bakdata.conquery.models.jobs.CalculateCBlocksJob; import com.bakdata.conquery.models.jobs.JobManager; import com.bakdata.conquery.models.query.entity.Entity; @@ -47,7 +44,6 @@ @RequiredArgsConstructor public class BucketManager { - private final IdMutex cBlockLocks = new IdMutex<>(); private final JobManager jobManager; private final WorkerStorage storage; @@ -123,9 +119,6 @@ private static void registerCBlock(CBlock cBlock, Map> futures = infos.stream() - .map(this::createInformationProcessor) - .map(executorService::submit) - .peek(f -> f.addListener(this::incrementProgressReporter, MoreExecutors.directExecutor())) - .collect(Collectors.toList()); + .map(this::createInformationProcessor) + .map(executorService::submit) + .peek(f -> f.addListener(this::incrementProgressReporter, MoreExecutors.directExecutor())) + .collect(Collectors.toList()); Futures.allAsList(futures).get(); } @@ -98,17 +97,15 @@ private static class CalculationInformationProcessor implements Runnable { @Override public void run() { try { - try(IdMutex.Locked ignored = bucketManager.acquireLock(info.connector)) { - if (bucketManager.hasCBlock(info.getCBlockId())) { - log.trace("Skipping calculation of CBlock[{}] because its already present in the BucketManager.", info.getCBlockId()); - return; - } + if (bucketManager.hasCBlock(info.getCBlockId())) { + log.trace("Skipping calculation of CBlock[{}] because its already present in the BucketManager.", info.getCBlockId()); + return; + } - CBlock cBlock = CBlock.createCBlock(info.getConnector(), info.getBucket(), bucketManager.getEntityBucketSize()); + CBlock cBlock = CBlock.createCBlock(info.getConnector(), info.getBucket(), bucketManager.getEntityBucketSize()); - bucketManager.addCalculatedCBlock(cBlock); - storage.addCBlock(cBlock); - } + bucketManager.addCalculatedCBlock(cBlock); + storage.addCBlock(cBlock); } catch (Exception e) { throw new RuntimeException(