From 9c22147c6b757f1427f294b5ae894f84e8f0657e Mon Sep 17 00:00:00 2001 From: sanpwc Date: Wed, 16 Dec 2020 13:21:53 +0300 Subject: [PATCH] IGNITE-13870 --- .../commandline/DefragmentationCommand.java | 1 - .../apache/ignite/IgniteSystemProperties.java | 3 + .../ignite/internal/GridKernalContext.java | 8 + .../internal/GridKernalContextImpl.java | 13 + .../managers/IgniteMBeansManager.java | 6 + .../processors/cache/GridCacheAdapter.java | 137 ------- .../processors/cache/GridCacheUtils.java | 16 - .../dht/atomic/GridDhtAtomicCache.java | 27 -- .../dht/colocated/GridDhtColocatedCache.java | 6 - .../distributed/near/GridNearAtomicCache.java | 3 - .../near/GridNearCacheAdapter.java | 7 - .../near/GridNearTransactionalCache.java | 3 - .../local/atomic/GridLocalAtomicCache.java | 18 - .../CachePartitionDefragmentationManager.java | 308 ++++++++++------ .../DefragmentationMXBeanImpl.java | 85 +++++ .../IgniteDefragmentation.java | 341 ++++++++++++++++++ .../IgniteDefragmentationImpl.java | 223 ++++++++++++ .../reader/StandaloneGridKernalContext.java | 6 + .../VisorDefragmentationTask.java | 124 +++---- .../VisorDefragmentationTaskArg.java | 14 - .../ignite/mxbean/DefragmentationMXBean.java | 73 ++++ .../GridCacheKeyCheckNearEnabledSelfTest.java | 30 -- .../cache/GridCacheKeyCheckSelfTest.java | 209 ----------- .../cache/GridCacheUtilsSelfTest.java | 199 ---------- .../DefragmentationMXBeanTest.java | 322 +++++++++++++++++ .../testsuites/IgniteCacheTestSuite.java | 5 - .../testsuites/IgniteUtilSelfTestSuite.java | 2 - 27 files changed, 1320 insertions(+), 869 deletions(-) create mode 100644 modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/DefragmentationMXBeanImpl.java create mode 100644 modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/IgniteDefragmentation.java create mode 100644 modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/IgniteDefragmentationImpl.java create mode 100644 modules/core/src/main/java/org/apache/ignite/mxbean/DefragmentationMXBean.java delete mode 100644 modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheKeyCheckNearEnabledSelfTest.java delete mode 100644 modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheKeyCheckSelfTest.java delete mode 100644 modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheUtilsSelfTest.java create mode 100644 modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/DefragmentationMXBeanTest.java diff --git a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/DefragmentationCommand.java b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/DefragmentationCommand.java index ec5c1f0a02992..e42186395edac 100644 --- a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/DefragmentationCommand.java +++ b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/DefragmentationCommand.java @@ -226,7 +226,6 @@ private void printResult(VisorDefragmentationTaskResult res, Logger log) { private VisorDefragmentationTaskArg convertArguments() { return new VisorDefragmentationTaskArg( convertSubcommand(args.subcommand()), - args.nodeIds(), args.cacheNames() ); } diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java index 0035f1b8f4fc0..94f15366c986c 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java @@ -677,7 +677,10 @@ public final class IgniteSystemProperties { /** * Flag indicating whether validation of keys put to cache should be disabled. + * + * @deprecated Since 2.10 Obsolete because of common use of binary marshaller. */ + @Deprecated @SystemProperty("Disables validation of keys put to cache") public static final String IGNITE_CACHE_KEY_VALIDATION_DISABLED = "IGNITE_CACHE_KEY_VALIDATION_DISABLED"; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java index a799f6053a9de..2e2fae6ea1090 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java @@ -38,6 +38,7 @@ import org.apache.ignite.internal.processors.authentication.IgniteAuthenticationProcessor; import org.apache.ignite.internal.processors.cache.GridCacheProcessor; import org.apache.ignite.internal.processors.cache.mvcc.MvccProcessor; +import org.apache.ignite.internal.processors.cache.persistence.defragmentation.IgniteDefragmentation; import org.apache.ignite.internal.processors.cache.persistence.filename.PdsFoldersResolver; import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor; import org.apache.ignite.internal.processors.closure.GridClosureProcessor; @@ -452,6 +453,13 @@ public interface GridKernalContext extends Iterable { */ public GridEncryptionManager encryption(); + /** + * Gets defragmentation manager. + * + * @return Defragmentation manager. + */ + public IgniteDefragmentation defragmentation(); + /** * Gets workers registry. * diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java index 092cf1ab6ed41..a9f80be3ac94f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java @@ -57,6 +57,8 @@ import org.apache.ignite.internal.processors.cache.GridCacheProcessor; import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl; import org.apache.ignite.internal.processors.cache.mvcc.MvccProcessor; +import org.apache.ignite.internal.processors.cache.persistence.defragmentation.IgniteDefragmentation; +import org.apache.ignite.internal.processors.cache.persistence.defragmentation.IgniteDefragmentationImpl; import org.apache.ignite.internal.processors.cache.persistence.filename.PdsFoldersResolver; import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor; import org.apache.ignite.internal.processors.closure.GridClosureProcessor; @@ -174,6 +176,10 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable @GridToStringExclude private GridEncryptionManager encryptionMgr; + /** */ + @GridToStringExclude + private IgniteDefragmentation defragMgr; + /** */ @GridToStringExclude private GridTracingManager tracingMgr; @@ -561,6 +567,8 @@ protected GridKernalContextImpl( marshCtx = new MarshallerContextImpl(plugins, clsFilter); + defragMgr = new IgniteDefragmentationImpl(this); + try { spring = SPRING.create(false); } @@ -912,6 +920,11 @@ public void addHelper(Object helper) { return encryptionMgr; } + /** {@inheritDoc} */ + @Override public IgniteDefragmentation defragmentation() { + return defragMgr; + } + /** {@inheritDoc} */ @Override public WorkersRegistry workersRegistry() { return workersRegistry; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/IgniteMBeansManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/IgniteMBeansManager.java index 5c502e97033d6..a35bbd147c27e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/IgniteMBeansManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/IgniteMBeansManager.java @@ -38,6 +38,7 @@ import org.apache.ignite.internal.TransactionsMXBeanImpl; import org.apache.ignite.internal.managers.encryption.EncryptionMXBeanImpl; import org.apache.ignite.internal.processors.cache.persistence.DataStorageMXBeanImpl; +import org.apache.ignite.internal.processors.cache.persistence.defragmentation.DefragmentationMXBeanImpl; import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotMXBeanImpl; import org.apache.ignite.internal.processors.cache.warmup.WarmUpMXBeanImpl; import org.apache.ignite.internal.processors.cluster.BaselineAutoAdjustMXBeanImpl; @@ -52,6 +53,7 @@ import org.apache.ignite.mxbean.ClusterMetricsMXBean; import org.apache.ignite.mxbean.ComputeMXBean; import org.apache.ignite.mxbean.DataStorageMXBean; +import org.apache.ignite.mxbean.DefragmentationMXBean; import org.apache.ignite.mxbean.EncryptionMXBean; import org.apache.ignite.mxbean.FailureHandlingMxBean; import org.apache.ignite.mxbean.IgniteMXBean; @@ -187,6 +189,10 @@ public void registerMBeansAfterNodeStarted( SnapshotMXBean snpMXBean = new SnapshotMXBeanImpl(ctx); registerMBean("Snapshot", snpMXBean.getClass().getSimpleName(), snpMXBean, SnapshotMXBean.class); + // Defragmentation. + DefragmentationMXBean defragMXBean = new DefragmentationMXBeanImpl(ctx); + registerMBean("Defragmentation", defragMXBean.getClass().getSimpleName(), defragMXBean, DefragmentationMXBean.class); + // Metrics configuration MetricsMxBean metricsMxBean = new MetricsMxBeanImpl(ctx.metric(), log); registerMBean("Metrics", metricsMxBean.getClass().getSimpleName(), metricsMxBean, MetricsMxBean.class); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 0fe7e44c108df..ff80d4338153f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -160,7 +160,6 @@ import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -import static org.apache.ignite.IgniteSystemProperties.IGNITE_CACHE_KEY_VALIDATION_DISABLED; import static org.apache.ignite.IgniteSystemProperties.IGNITE_CACHE_RETRIES_COUNT; import static org.apache.ignite.internal.GridClosureCallMode.BROADCAST; import static org.apache.ignite.internal.processors.cache.CacheOperationContext.DFLT_ALLOW_ATOMIC_OPS_IN_TX; @@ -255,9 +254,6 @@ public abstract class GridCacheAdapter implements IgniteInternalCache lastFut = new ThreadLocal() { @Override protected FutureHolder initialValue() { @@ -362,10 +358,6 @@ protected GridCacheAdapter(final GridCacheContext ctx, @Nullable GridCache init(); aff = new GridCacheAffinityImpl<>(ctx); - - // The check of methods 'equals' and 'hashCode' that they had been overrode isn't required, since BinaryMarshaller doesn't use them. - if (keyCheck && ctx.binaryMarshaller()) - keyCheck = false; } /** @@ -843,9 +835,6 @@ else if (modes.heap) { throws IgniteCheckedException { A.notNull(key, "key"); - if (keyCheck) - validateCacheKey(key); - ctx.checkSecurity(SecurityPermission.CACHE_READ); PeekModes modes = parsePeekModes(peekModes, false); @@ -2004,9 +1993,6 @@ public final IgniteInternalFuture> getAllAsync(@Nullable final Collect ) { ctx.checkSecurity(SecurityPermission.CACHE_READ); - if (keyCheck) - validateCacheKeys(keys); - warnIfUnordered(keys, BulkOperation.GET); return getAllAsync0(ctx.cacheKeysView(keys), @@ -2496,9 +2482,6 @@ private void clearReservationsIfNeeded( A.notNull(key, "key", val, "val"); - if (keyCheck) - validateCacheKey(key); - V prevVal = getAndPut0(key, val, filter); if (statsEnabled) @@ -2552,9 +2535,6 @@ protected final IgniteInternalFuture getAndPutAsync(K key, V val, @Nullable C A.notNull(key, "key", val, "val"); - if (keyCheck) - validateCacheKey(key); - IgniteInternalFuture fut = getAndPutAsync0(key, val, filter); if (statsEnabled) @@ -2613,9 +2593,6 @@ public boolean put(final K key, final V val, final CacheEntryPredicate filter) A.notNull(key, "key", val, "val"); - if (keyCheck) - validateCacheKey(key); - boolean stored = put0(key, val, filter); if (statsEnabled && stored) @@ -2725,9 +2702,6 @@ private EntryProcessorResult invoke0( throws IgniteCheckedException { A.notNull(key, "key", entryProcessor, "entryProcessor"); - if (keyCheck) - validateCacheKey(key); - return syncOp(new SyncOp>(true) { @Override public EntryProcessorResult op(GridNearTxLocal tx) throws IgniteCheckedException { @@ -2774,9 +2748,6 @@ private EntryProcessorResult invoke0( final Object... args) throws IgniteCheckedException { A.notNull(keys, "keys", entryProcessor, "entryProcessor"); - if (keyCheck) - validateCacheKeys(keys); - warnIfUnordered(keys, BulkOperation.INVOKE); final boolean statsEnabled = ctx.statisticsEnabled(); @@ -2817,9 +2788,6 @@ private EntryProcessorResult invoke0( throws EntryProcessorException { A.notNull(key, "key", entryProcessor, "entryProcessor"); - if (keyCheck) - validateCacheKey(key); - final boolean statsEnabled = ctx.statisticsEnabled(); final boolean performanceStatsEnabled = ctx.kernalContext().performanceStatistics().enabled(); @@ -2873,9 +2841,6 @@ private EntryProcessorResult invoke0( final Object... args) { A.notNull(keys, "keys", entryProcessor, "entryProcessor"); - if (keyCheck) - validateCacheKeys(keys); - warnIfUnordered(keys, BulkOperation.INVOKE); final boolean statsEnabled = ctx.statisticsEnabled(); @@ -2929,9 +2894,6 @@ private EntryProcessorResult invoke0( final Object... args) { A.notNull(map, "map"); - if (keyCheck) - validateCacheKeys(map.keySet()); - warnIfUnordered(map, BulkOperation.INVOKE); final boolean statsEnabled = ctx.statisticsEnabled(); @@ -2980,9 +2942,6 @@ private EntryProcessorResult invoke0( final Object... args) throws IgniteCheckedException { A.notNull(map, "map"); - if (keyCheck) - validateCacheKeys(map.keySet()); - warnIfUnordered(map, BulkOperation.INVOKE); final boolean statsEnabled = ctx.statisticsEnabled(); @@ -3023,9 +2982,6 @@ private EntryProcessorResult invoke0( public final IgniteInternalFuture putAsync(K key, V val, @Nullable CacheEntryPredicate filter) { A.notNull(key, "key", val, "val"); - if (keyCheck) - validateCacheKey(key); - final boolean statsEnabled = ctx.statisticsEnabled(); final boolean performanceStatsEnabled = ctx.kernalContext().performanceStatistics().enabled(); @@ -3136,9 +3092,6 @@ public IgniteInternalFuture putAsync0(final K key, final V val, long start = statsEnabled || performanceStatsEnabled ? System.nanoTime() : 0L; - if (keyCheck) - validateCacheKeys(m.keySet()); - warnIfUnordered(m, BulkOperation.PUT); putAll0(m); @@ -3177,9 +3130,6 @@ protected void putAll0(final Map m) throws IgniteCheck long start = statsEnabled || performanceStatsEnabled ? System.nanoTime() : 0L; - if (keyCheck) - validateCacheKeys(m.keySet()); - warnIfUnordered(m, BulkOperation.PUT); IgniteInternalFuture fut = putAllAsync0(m); @@ -3222,9 +3172,6 @@ protected IgniteInternalFuture putAllAsync0(final Map fut = getAndRemoveAsync0(key); if (statsEnabled) @@ -3353,9 +3297,6 @@ protected IgniteInternalFuture getAndRemoveAsync0(final K key) { if (F.isEmpty(keys)) return; - if (keyCheck) - validateCacheKeys(keys); - warnIfUnordered(keys, BulkOperation.REMOVE); removeAll0(keys); @@ -3399,9 +3340,6 @@ protected void removeAll0(final Collection keys) throws IgniteCheck final long start = statsEnabled || performanceStatsEnabled ? System.nanoTime() : 0L; - if (keyCheck) - validateCacheKeys(keys); - warnIfUnordered(keys, BulkOperation.REMOVE); IgniteInternalFuture fut = removeAllAsync0(keys); @@ -3456,9 +3394,6 @@ public boolean remove(final K key, @Nullable CacheEntryPredicate filter) throws A.notNull(key, "key"); - if (keyCheck) - validateCacheKey(key); - boolean rmv = remove0(key, filter); if (statsEnabled && rmv) @@ -3518,9 +3453,6 @@ public IgniteInternalFuture removeAsync(final K key, @Nullable final Ca A.notNull(key, "key"); - if (keyCheck) - validateCacheKey(key); - IgniteInternalFuture fut = removeAsync0(key, filter); if (statsEnabled) @@ -3683,9 +3615,6 @@ public CacheMetricsImpl metrics0() { if (F.isEmpty(keys)) return true; - if (keyCheck) - validateCacheKeys(keys); - //TODO: IGNITE-9324: add explicit locks support. MvccUtils.verifyMvccOperationSupport(ctx, "Lock"); @@ -3730,9 +3659,6 @@ public CacheMetricsImpl metrics0() { @Override public IgniteInternalFuture lockAsync(K key, long timeout) { A.notNull(key, "key"); - if (keyCheck) - validateCacheKey(key); - //TODO: IGNITE-9324: add explicit locks support. MvccUtils.verifyMvccOperationSupport(ctx, "Lock"); @@ -3744,9 +3670,6 @@ public CacheMetricsImpl metrics0() { throws IgniteCheckedException { A.notNull(key, "key"); - if (keyCheck) - validateCacheKey(key); - unlockAll(Collections.singletonList(key)); } @@ -3754,9 +3677,6 @@ public CacheMetricsImpl metrics0() { @Override public boolean isLocked(K key) { A.notNull(key, "key"); - if (keyCheck) - validateCacheKey(key); - KeyCacheObject cacheKey = ctx.toCacheKeyObject(key); while (true) { @@ -3775,9 +3695,6 @@ public CacheMetricsImpl metrics0() { @Override public boolean isLockedByThread(K key) { A.notNull(key, "key"); - if (keyCheck) - validateCacheKey(key); - try { KeyCacheObject cacheKey = ctx.toCacheKeyObject(key); @@ -4855,9 +4772,6 @@ private boolean clearLocally0(K key, boolean readers) { ctx.checkSecurity(SecurityPermission.CACHE_REMOVE); - if (keyCheck) - validateCacheKey(key); - GridCacheVersion obsoleteVer = nextVersion(); ctx.shared().database().checkpointReadLock(); @@ -4887,9 +4801,6 @@ private boolean clearLocally0(K key, boolean readers) { @Override public boolean evict(K key) { A.notNull(key, "key"); - if (keyCheck) - validateCacheKey(key); - //TODO IGNITE-7956 MvccUtils.verifyMvccOperationSupport(ctx, "Evict"); @@ -4903,9 +4814,6 @@ private boolean clearLocally0(K key, boolean readers) { if (F.isEmpty(keys)) return; - if (keyCheck) - validateCacheKey(keys); - //TODO IGNITE-7956 MvccUtils.verifyMvccOperationSupport(ctx, "Evict"); @@ -5266,51 +5174,6 @@ public void onReconnected() { // No-op. } - /** - * For tests only. - */ - public void forceKeyCheck() { - keyCheck = true; - } - - /** - * Validates that given cache key has overridden equals and hashCode methods and - * implements {@link Externalizable}. - * - * @param key Cache key. - * @throws IllegalArgumentException If validation fails. - */ - protected final void validateCacheKey(Object key) { - if (keyCheck) { - CU.validateCacheKey(key); - - keyCheck = false; - } - } - - /** - * Validates that given cache keys have overridden equals and hashCode methods and - * implement {@link Externalizable}. - * - * @param keys Cache keys. - * @throws IgniteException If validation fails. - */ - protected final void validateCacheKeys(Iterable keys) { - if (keys == null) - return; - - if (keyCheck) { - for (Object key : keys) { - if (key == null || key instanceof GridCacheInternal) - continue; - - CU.validateCacheKey(key); - - keyCheck = false; - } - } - } - /** * Checks that given map is sorted or otherwise constant order, or processed inside deadlock-detecting transaction. * diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java index e260983fa7186..ff023d0d979b2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java @@ -1051,22 +1051,6 @@ private static String capitalize(String str) { return Character.toUpperCase(str.charAt(0)) + str.substring(1); } - /** - * Validates that cache key object has overridden equals and hashCode methods. - * Will also check that a BinaryObject has a hash code set. - * - * @param key Key. - * @throws IllegalArgumentException If equals or hashCode is not implemented. - */ - public static void validateCacheKey(@Nullable Object key) { - if (key == null) - return; - - if (!U.overridesEqualsAndHashCode(key)) - throw new IllegalArgumentException("Cache key must override hashCode() and equals() methods: " + - key.getClass().getName()); - } - /** * @param cacheName Cache name. * @return {@code True} if this is utility system cache. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 1bc0e7714fa01..60c0006e60c60 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -473,9 +473,6 @@ public void near(GridNearAtomicCache near) { ) { ctx.checkSecurity(SecurityPermission.CACHE_READ); - if (keyCheck) - validateCacheKey(key); - CacheOperationContext opCtx = ctx.operationContextPerCall(); subjId = ctx.subjectIdPerCall(null, opCtx); @@ -576,9 +573,6 @@ private IgniteInternalFuture> getAllAsyncInternal( if (F.isEmpty(keys)) return new GridFinishedFuture<>(Collections.emptyMap()); - if (keyCheck) - validateCacheKeys(keys); - warnIfUnordered(keys, BulkOperation.GET); CacheOperationContext opCtx = ctx.operationContextPerCall(); @@ -713,9 +707,6 @@ private IgniteInternalFuture> getAllAsyncInternal( @Override public IgniteInternalFuture putAllConflictAsync(Map conflictMap) { ctx.dr().onReceiveCacheEntriesReceived(conflictMap.size()); - if (map != null && keyCheck) - validateCacheKeys(conflictMap.keySet()); - warnIfUnordered(conflictMap, BulkOperation.PUT); return updateAll0(null, @@ -828,8 +819,6 @@ private IgniteInternalFuture asyncOp(final CO> op @Override public Map> invokeAll(Set keys, EntryProcessor entryProcessor, Object... args) throws IgniteCheckedException { - if (map != null && keyCheck) - validateCacheKeys(keys); warnIfUnordered(keys, BulkOperation.INVOKE); @@ -857,9 +846,6 @@ private IgniteInternalFuture> invoke0( Object... args) { A.notNull(key, "key", entryProcessor, "entryProcessor"); - if (keyCheck) - validateCacheKey(key); - final boolean statsEnabled = ctx.statisticsEnabled(); final long start = statsEnabled ? System.nanoTime() : 0L; @@ -911,8 +897,6 @@ private IgniteInternalFuture> invoke0( @Override public IgniteInternalFuture>> invokeAllAsync(Set keys, final EntryProcessor entryProcessor, Object... args) { - if (map != null && keyCheck) - validateCacheKeys(keys); warnIfUnordered(keys, BulkOperation.INVOKE); @@ -978,9 +962,6 @@ private IgniteInternalFuture>> invokeAll0( Object... args) throws IgniteCheckedException { A.notNull(map, "map"); - if (keyCheck) - validateCacheKeys(map.keySet()); - warnIfUnordered(map, BulkOperation.INVOKE); final boolean statsEnabled = ctx.statisticsEnabled(); @@ -1010,9 +991,6 @@ private IgniteInternalFuture>> invokeAll0( Object... args) { A.notNull(map, "map"); - if (keyCheck) - validateCacheKeys(map.keySet()); - warnIfUnordered(map, BulkOperation.INVOKE); final boolean statsEnabled = ctx.statisticsEnabled(); @@ -1173,8 +1151,6 @@ private IgniteInternalFuture update0( assert ctx.updatesAllowed(); - validateCacheKey(key); - ctx.checkSecurity(SecurityPermission.CACHE_PUT); final GridNearAtomicAbstractUpdateFuture updateFut = @@ -1361,9 +1337,6 @@ private IgniteInternalFuture removeAllAsync0( assert keys != null || conflictMap != null; - if (keyCheck) - validateCacheKeys(keys); - ctx.checkSecurity(SecurityPermission.CACHE_REMOVE); final CacheOperationContext opCtx = ctx.operationContextPerCall(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index a548fe912c7aa..298224ae839f0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@ -187,9 +187,6 @@ public GridDistributedCacheEntry entryExx( final boolean needVer) { ctx.checkSecurity(SecurityPermission.CACHE_READ); - if (keyCheck) - validateCacheKey(key); - GridNearTxLocal tx = checkCurrentTx(); final CacheOperationContext opCtx = ctx.operationContextPerCall(); @@ -328,9 +325,6 @@ else if (mvccTracker != null) if (F.isEmpty(keys)) return new GridFinishedFuture<>(Collections.emptyMap()); - if (keyCheck) - validateCacheKeys(keys); - warnIfUnordered(keys, BulkOperation.GET); GridNearTxLocal tx = checkCurrentTx(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java index 1509c5e4f04f5..8eb1e0c484815 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java @@ -428,9 +428,6 @@ private void processNearAtomicUpdateResponse( if (F.isEmpty(keys)) return new GridFinishedFuture<>(Collections.emptyMap()); - if (keyCheck) - validateCacheKeys(keys); - warnIfUnordered(keys, BulkOperation.GET); CacheOperationContext opCtx = ctx.operationContextPerCall(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java index 6e2768cd5c09f..11d7898035643 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java @@ -117,13 +117,6 @@ private GridCacheMapEntryFactory entryFactory() { */ public abstract GridDhtCacheAdapter dht(); - /** {@inheritDoc} */ - @Override public void forceKeyCheck() { - super.forceKeyCheck(); - - dht().forceKeyCheck(); - } - /** {@inheritDoc} */ @Override public void onReconnected() { map = new GridCacheLocalConcurrentMap( diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java index 7fe969f3a6bb5..bb41ac2028c3b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java @@ -130,9 +130,6 @@ public void dht(GridDhtCache dht) { if (F.isEmpty(keys)) return new GridFinishedFuture<>(Collections.emptyMap()); - if (keyCheck) - validateCacheKeys(keys); - warnIfUnordered(keys, BulkOperation.GET); GridNearTxLocal tx = ctx.tm().threadLocalTx(ctx); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java index cde847ba3207c..982e29605af3f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java @@ -388,9 +388,6 @@ private Map getAllInternal(@Nullable Collection keys, Map vals = U.newHashMap(keys.size()); - if (keyCheck) - validateCacheKeys(keys); - warnIfUnordered(keys, BulkOperation.GET); final IgniteCacheExpiryPolicy expiry = expiryPolicy(opCtx != null ? opCtx.expiry() : null); @@ -575,9 +572,6 @@ private Map getAllInternal(@Nullable Collection keys, Object... args) throws IgniteCheckedException { A.notNull(keys, "keys", entryProcessor, "entryProcessor"); - if (keyCheck) - validateCacheKeys(keys); - warnIfUnordered(keys, BulkOperation.INVOKE); final boolean statsEnabled = ctx.statisticsEnabled(); @@ -620,9 +614,6 @@ private Map getAllInternal(@Nullable Collection keys, Object... args) throws EntryProcessorException { A.notNull(key, "key", entryProcessor, "entryProcessor"); - if (keyCheck) - validateCacheKey(key); - final boolean statsEnabled = ctx.statisticsEnabled(); final long start = statsEnabled ? System.nanoTime() : 0L; @@ -664,9 +655,6 @@ private Map getAllInternal(@Nullable Collection keys, Object... args) { A.notNull(keys, "keys", entryProcessor, "entryProcessor"); - if (keyCheck) - validateCacheKeys(keys); - warnIfUnordered(keys, BulkOperation.INVOKE); final boolean statsEnabled = ctx.statisticsEnabled(); @@ -698,9 +686,6 @@ private Map getAllInternal(@Nullable Collection keys, Object... args) throws IgniteCheckedException { A.notNull(map, "map"); - if (keyCheck) - validateCacheKeys(map.keySet()); - warnIfUnordered(map, BulkOperation.INVOKE); final boolean statsEnabled = ctx.statisticsEnabled(); @@ -735,9 +720,6 @@ private Map getAllInternal(@Nullable Collection keys, Object... args) { A.notNull(map, "map"); - if (keyCheck) - validateCacheKeys(map.keySet()); - warnIfUnordered(map, BulkOperation.INVOKE); final boolean statsEnabled = ctx.statisticsEnabled(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/CachePartitionDefragmentationManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/CachePartitionDefragmentationManager.java index aa82a23cad935..48616b63f6bca 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/CachePartitionDefragmentationManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/CachePartitionDefragmentationManager.java @@ -19,12 +19,10 @@ import java.io.File; import java.nio.file.Path; -import java.text.DecimalFormat; -import java.text.DecimalFormatSymbols; import java.util.Arrays; +import java.util.HashMap; import java.util.HashSet; import java.util.List; -import java.util.Locale; import java.util.Map; import java.util.Set; import java.util.TreeMap; @@ -46,6 +44,7 @@ import org.apache.ignite.internal.processors.cache.CacheGroupContext; import org.apache.ignite.internal.processors.cache.CacheType; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager; import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager.CacheDataStore; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.cache.persistence.CheckpointState; @@ -154,7 +153,7 @@ public class CachePartitionDefragmentationManager { private final AtomicBoolean cancel = new AtomicBoolean(); /** */ - private final DefragmentationStatus status = new DefragmentationStatus(); + private final Status status = new Status(); /** */ private final GridFutureAdapter completionFut = new GridFutureAdapter<>(); @@ -221,7 +220,30 @@ public void beforeDefragmentation() throws IgniteCheckedException { /** */ public void executeDefragmentation() throws IgniteCheckedException { - status.onStart(cacheGrpCtxsForDefragmentation); + Map> oldStores = new HashMap<>(); + + for (CacheGroupContext oldGrpCtx : cacheGrpCtxsForDefragmentation) { + int grpId = oldGrpCtx.groupId(); + + final IgniteCacheOffheapManager offheap = oldGrpCtx.offheap(); + + List oldCacheDataStores = stream(offheap.cacheDataStores().spliterator(), false) + .filter(store -> { + try { + return filePageStoreMgr.exists(grpId, store.partId()); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + }) + .collect(Collectors.toList()); + + oldStores.put(grpId, oldCacheDataStores); + } + + int partitionCount = oldStores.values().stream().mapToInt(List::size).sum(); + + status.onStart(cacheGrpCtxsForDefragmentation, partitionCount); try { // Now the actual process starts. @@ -235,8 +257,10 @@ public void executeDefragmentation() throws IgniteCheckedException { File workDir = filePageStoreMgr.cacheWorkDir(oldGrpCtx.sharedGroup(), oldGrpCtx.cacheOrGroupName()); + List oldCacheDataStores = oldStores.get(grpId); + if (skipAlreadyDefragmentedCacheGroup(workDir, grpId, log)) { - status.onCacheGroupSkipped(oldGrpCtx); + status.onCacheGroupSkipped(oldGrpCtx, oldCacheDataStores.size()); continue; } @@ -244,17 +268,6 @@ public void executeDefragmentation() throws IgniteCheckedException { try { GridCacheOffheapManager offheap = (GridCacheOffheapManager)oldGrpCtx.offheap(); - List oldCacheDataStores = stream(offheap.cacheDataStores().spliterator(), false) - .filter(store -> { - try { - return filePageStoreMgr.exists(grpId, store.partId()); - } - catch (IgniteCheckedException e) { - throw new IgniteException(e); - } - }) - .collect(Collectors.toList()); - status.onCacheGroupStart(oldGrpCtx, oldCacheDataStores.size()); if (workDir == null || oldCacheDataStores.isEmpty()) { @@ -609,8 +622,8 @@ private void checkCancellation() throws DefragmentationCancelledException { } /** */ - public String status() { - return status.toString(); + public Status status() { + return status; } /** @@ -979,58 +992,110 @@ private static class DefragmentationCancelledException extends RuntimeException private static final long serialVersionUID = 0L; } - /** */ - private class DefragmentationStatus { - /** */ + /** Defragmentation status. */ + class Status { + /** Defragmentation start timestamp. */ private long startTs; - /** */ + /** Defragmentation finish timestamp. */ private long finishTs; - /** */ - private final Set scheduledGroups = new TreeSet<>(); + /** Total count of partitions. */ + private int totalPartitionCount; - /** */ - private final Map progressGroups - = new TreeMap<>(comparing(CacheGroupContext::cacheOrGroupName)); + /** Partitions, that are already defragmented. */ + private int defragmentedPartitionCount; - /** */ - private final Map finishedGroups - = new TreeMap<>(comparing(CacheGroupContext::cacheOrGroupName)); + /** Cache groups scheduled for defragmentation. */ + private final Set scheduledGroups; - /** */ - private final Set skippedGroups = new TreeSet<>(); + /** Progress for cache group. */ + private final Map progressGroups; - /** */ - public synchronized void onStart(Set scheduledGroups) { + /** Finished cache groups. */ + private final Map finishedGroups; + + /** Skipped cache groups. */ + private final Set skippedGroups; + + /** Constructor. */ + public Status() { + scheduledGroups = new TreeSet<>(); + progressGroups = new TreeMap<>(comparing(CacheGroupContext::cacheOrGroupName)); + finishedGroups = new TreeMap<>(comparing(CacheGroupContext::cacheOrGroupName)); + skippedGroups = new TreeSet<>(); + } + + /** Copy constructor. */ + public Status( + long startTs, + long finishTs, + Set scheduledGroups, + Map progressGroups, + Map finishedGroups, + Set skippedGroups + ) { + this.startTs = startTs; + this.finishTs = finishTs; + this.scheduledGroups = scheduledGroups; + this.progressGroups = progressGroups; + this.finishedGroups = finishedGroups; + this.skippedGroups = skippedGroups; + } + + /** + * Mark the start of the defragmentation. + * @param scheduledGroups Groups scheduled for defragmentation. + * @param partitions Total partition count. + */ + public synchronized void onStart(Set scheduledGroups, int partitions) { startTs = System.currentTimeMillis(); + totalPartitionCount = partitions; - for (CacheGroupContext grp : scheduledGroups) { + for (CacheGroupContext grp : scheduledGroups) this.scheduledGroups.add(grp.cacheOrGroupName()); - } log.info("Defragmentation started."); } - /** */ - public synchronized void onCacheGroupStart(CacheGroupContext grpCtx, int parts) { + /** + * Mark the start of the cache group defragmentation. + * @param grpCtx Cache group context. + * @param parts Partition count. + */ + private synchronized void onCacheGroupStart(CacheGroupContext grpCtx, int parts) { scheduledGroups.remove(grpCtx.cacheOrGroupName()); progressGroups.put(grpCtx, new DefragmentationCacheGroupProgress(parts)); } - /** */ - public synchronized void onPartitionDefragmented(CacheGroupContext grpCtx, long oldSize, long newSize) { + /** + * Mark the end of the partition defragmentation. + * @param grpCtx Cache group context. + * @param oldSize Old size. + * @param newSize New size; + */ + private synchronized void onPartitionDefragmented(CacheGroupContext grpCtx, long oldSize, long newSize) { progressGroups.get(grpCtx).onPartitionDefragmented(oldSize, newSize); + + defragmentedPartitionCount++; } - /** */ - public synchronized void onIndexDefragmented(CacheGroupContext grpCtx, long oldSize, long newSize) { + /** + * Mark the end of the index partition defragmentation. + * @param grpCtx Cache group context. + * @param oldSize Old size. + * @param newSize New size; + */ + private synchronized void onIndexDefragmented(CacheGroupContext grpCtx, long oldSize, long newSize) { progressGroups.get(grpCtx).onIndexDefragmented(oldSize, newSize); } - /** */ - public synchronized void onCacheGroupFinish(CacheGroupContext grpCtx) { + /** + * Mark the end of the cache group defragmentation. + * @param grpCtx Cache group context. + */ + private synchronized void onCacheGroupFinish(CacheGroupContext grpCtx) { DefragmentationCacheGroupProgress progress = progressGroups.remove(grpCtx); progress.onFinish(); @@ -1038,15 +1103,20 @@ public synchronized void onCacheGroupFinish(CacheGroupContext grpCtx) { finishedGroups.put(grpCtx, progress); } - /** */ - public synchronized void onCacheGroupSkipped(CacheGroupContext grpCtx) { + /** + * Mark that cache group defragmentation was skipped. + * @param grpCtx Cache group context. + */ + private synchronized void onCacheGroupSkipped(CacheGroupContext grpCtx, int partitions) { scheduledGroups.remove(grpCtx.cacheOrGroupName()); skippedGroups.add(grpCtx.cacheOrGroupName()); + + defragmentedPartitionCount += partitions; } - /** */ - public synchronized void onFinish() { + /** Mark the end of the defragmentation. */ + private synchronized void onFinish() { finishTs = System.currentTimeMillis(); progressGroups.clear(); @@ -1056,67 +1126,80 @@ public synchronized void onFinish() { log.info("Defragmentation process completed. Time: " + (finishTs - startTs) * 1e-3 + "s."); } - /** {@inheritDoc} */ - @Override public synchronized String toString() { - StringBuilder sb = new StringBuilder(); - - if (!finishedGroups.isEmpty()) { - sb.append("Defragmentation is completed for cache groups:\n"); - - for (Map.Entry entry : finishedGroups.entrySet()) { - sb.append(" ").append(entry.getKey().cacheOrGroupName()).append(" - "); - - sb.append(entry.getValue().toString()).append('\n'); - } - } + /** Copy object. */ + private synchronized Status copy() { + return new Status( + startTs, + finishTs, + new HashSet<>(scheduledGroups), + new HashMap<>(progressGroups), + new HashMap<>(finishedGroups), + new HashSet<>(skippedGroups) + ); + } - if (!progressGroups.isEmpty()) { - sb.append("Defragmentation is in progress for cache groups:\n"); + /** */ + public long getStartTs() { + return startTs; + } - for (Map.Entry entry : progressGroups.entrySet()) { - sb.append(" ").append(entry.getKey().cacheOrGroupName()).append(" - "); + /** */ + public long getFinishTs() { + return finishTs; + } - sb.append(entry.getValue().toString()).append('\n'); - } - } + /** */ + public Set getScheduledGroups() { + return scheduledGroups; + } - if (!skippedGroups.isEmpty()) - sb.append("Skipped cache groups: ").append(String.join(", ", skippedGroups)).append('\n'); + /** */ + public Map getProgressGroups() { + return progressGroups; + } - if (!scheduledGroups.isEmpty()) - sb.append("Awaiting defragmentation: ").append(String.join(", ", scheduledGroups)).append('\n'); + /** */ + public Map getFinishedGroups() { + return finishedGroups; + } - return sb.toString(); + /** */ + public Set getSkippedGroups() { + return skippedGroups; } - } - /** */ - private static class DefragmentationCacheGroupProgress { /** */ - private static final DecimalFormat MB_FORMAT = new DecimalFormat( - "#.##", - DecimalFormatSymbols.getInstance(Locale.US) - ); + public int getTotalPartitionCount() { + return totalPartitionCount; + } /** */ + public int getDefragmentedPartitionCount() { + return defragmentedPartitionCount; + } + } + + /** Cache group defragmentation progress. */ + static class DefragmentationCacheGroupProgress { + /** Partition count. */ private final int partsTotal; - /** */ + /** Defragmented partitions. */ private int partsCompleted; - /** */ + /** Old cache group size. */ private long oldSize; - /** */ + /** New cache group size. */ private long newSize; - /** */ + /** Start timestamp. */ private final long startTs; - /** */ + /** Finish timestamp. */ private long finishTs; - /** */ + /** Constructor. */ public DefragmentationCacheGroupProgress(int parts) { partsTotal = parts; @@ -1144,43 +1227,38 @@ public void onIndexDefragmented(long oldSize, long newSize) { } /** */ - public void onFinish() { - finishTs = System.currentTimeMillis(); + public long getOldSize() { + return oldSize; } - /** {@inheritDoc} */ - @Override public String toString() { - StringBuilder sb = new StringBuilder(); - - if (finishTs == 0) { - sb.append("partitions processed/all: ").append(partsCompleted).append("/").append(partsTotal); - - sb.append(", time elapsed: "); - - appendDuration(sb, System.currentTimeMillis()); - } - else { - double mb = 1024 * 1024; - - sb.append("size before/after: ").append(MB_FORMAT.format(oldSize / mb)).append("MB/"); - sb.append(MB_FORMAT.format(newSize / mb)).append("MB"); - - sb.append(", time took: "); + /** */ + public long getNewSize() { + return newSize; + } - appendDuration(sb, finishTs); - } + /** */ + public long getStartTs() { + return startTs; + } - return sb.toString(); + /** */ + public long getFinishTs() { + return finishTs; } /** */ - private void appendDuration(StringBuilder sb, long end) { - long duration = Math.round((end - startTs) * 1e-3); + public int getPartsTotal() { + return partsTotal; + } - long mins = duration / 60; - long secs = duration % 60; + /** */ + public int getPartsCompleted() { + return partsCompleted; + } - sb.append(mins).append(" mins ").append(secs).append(" secs"); + /** */ + public void onFinish() { + finishTs = System.currentTimeMillis(); } } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/DefragmentationMXBeanImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/DefragmentationMXBeanImpl.java new file mode 100644 index 0000000000000..1e3becba51319 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/DefragmentationMXBeanImpl.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.defragmentation; + +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.mxbean.DefragmentationMXBean; + +/** + * Defragmentation MX bean implementation. + */ +public class DefragmentationMXBeanImpl implements DefragmentationMXBean { + /** Defragmentation manager. */ + private final IgniteDefragmentation defragmentation; + + public DefragmentationMXBeanImpl(GridKernalContext ctx) { + this.defragmentation = ctx.defragmentation(); + } + + /** {@inheritDoc} */ + @Override public boolean schedule(String cacheNames) { + final List caches = Arrays.stream(cacheNames.split(",")) + .filter(s -> !s.isEmpty()) + .collect(Collectors.toList()); + + try { + defragmentation.schedule(caches); + + return true; + } + catch (IgniteCheckedException e) { + return false; + } + } + + /** {@inheritDoc} */ + @Override public boolean cancel() { + try { + defragmentation.cancel(); + + return true; + } + catch (IgniteCheckedException e) { + return false; + } + } + + /** {@inheritDoc} */ + @Override public boolean inProgress() { + return defragmentation.inProgress(); + } + + /** {@inheritDoc} */ + @Override public int processedPartitions() { + return defragmentation.processedPartitions(); + } + + /** {@inheritDoc} */ + @Override public int totalPartitions() { + return defragmentation.totalPartitions(); + } + + /** {@inheritDoc} */ + @Override public long startTime() { + return defragmentation.startTime(); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/IgniteDefragmentation.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/IgniteDefragmentation.java new file mode 100644 index 0000000000000..a5dc811f90880 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/IgniteDefragmentation.java @@ -0,0 +1,341 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.defragmentation; + +import java.text.DecimalFormat; +import java.text.DecimalFormatSymbols; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import org.apache.ignite.IgniteCheckedException; + +/** + * Defragmentation operation service. + */ +public interface IgniteDefragmentation { + /** + * Schedule defragmentaton on next start of the node. + * + * @param cacheNames Names of caches to run defragmentation on. + * @return Result of the scheduling. + * @throws IgniteCheckedException If failed. + */ + ScheduleResult schedule(List cacheNames) throws IgniteCheckedException; + + /** + * Cancel scheduled or ongoing defragmentation. + * @return Result of the cancellation. + * @throws IgniteCheckedException If failed. + */ + CancelResult cancel() throws IgniteCheckedException; + + /** + * Get the status of the ongoing defragmentation. + * @return Defragmentation status. + * @throws IgniteCheckedException If failed. + */ + DefragmentationStatus status() throws IgniteCheckedException; + + /** + * @return {@code true} if there is an ongoing defragmentation. + */ + boolean inProgress(); + + /** + * @return Number of processed partitions, or 0 if there is no ongoing defragmentation. + */ + int processedPartitions(); + + /** + * @return Number of total partitions, or 0 if there is no ongoing defragmentation. + */ + int totalPartitions(); + + /** + * @return Timestamp of the beginning of the ongoing defragmentation or 0 if there is none. + */ + long startTime(); + + /** Result of the scheduling. */ + public enum ScheduleResult { + /** + * Successfully scheduled. + */ + SUCCESS, + + /** + * Successfuly scheduled, superseding previously scheduled defragmentation. + */ + SUCCESS_SUPERSEDED_PREVIOUS + } + + /** Result of the cancellation. */ + public enum CancelResult { + /** + * Cancelled scheduled defragmentation. + */ + CANCELLED_SCHEDULED, + + /** + * Nothing to cancel, no ongoing defragmentation. + */ + SCHEDULED_NOT_FOUND, + + /** + * Cancelled ongoing defragmentation. + */ + CANCELLED, + + /** + * Defragmentation is already completed or cancelled. + */ + COMPLETED_OR_CANCELLED + } + + /** */ + public static class DefragmentationStatus { + /** */ + private final Map completedCaches; + + /** */ + private final Map inProgressCaches; + + /** */ + private final Set awaitingCaches; + + /** */ + private final Set skippedCaches; + + /** */ + private final int totalPartitions; + + /** */ + private final int processedPartitions; + + /** */ + private final long startTs; + + /** */ + private final long totalElapsedTime; + + public DefragmentationStatus( + Map completedCaches, + Map inProgressCaches, + Set awaitingCaches, + Set skippedCaches, + int totalPartitions, + int processedPartitions, + long startTs, + long totalElapsedTime + ) { + this.completedCaches = completedCaches; + this.inProgressCaches = inProgressCaches; + this.awaitingCaches = awaitingCaches; + this.skippedCaches = skippedCaches; + this.totalPartitions = totalPartitions; + this.processedPartitions = processedPartitions; + this.startTs = startTs; + this.totalElapsedTime = totalElapsedTime; + } + + /** {@inheritDoc} */ + @Override public String toString() { + StringBuilder sb = new StringBuilder(); + + if (!completedCaches.isEmpty()) { + sb.append("Defragmentation is completed for cache groups:\n"); + + for (Map.Entry entry : completedCaches.entrySet()) { + sb.append(" ").append(entry.getKey()).append(" - "); + + sb.append(entry.getValue().toString()).append('\n'); + } + } + + if (!inProgressCaches.isEmpty()) { + sb.append("Defragmentation is in progress for cache groups:\n"); + + for (Map.Entry entry : inProgressCaches.entrySet()) { + sb.append(" ").append(entry.getKey()).append(" - "); + + sb.append(entry.getValue().toString()).append('\n'); + } + } + + if (!skippedCaches.isEmpty()) + sb.append("Skipped cache groups: ").append(String.join(", ", skippedCaches)).append('\n'); + + if (!awaitingCaches.isEmpty()) + sb.append("Awaiting defragmentation: ").append(String.join(", ", awaitingCaches)).append('\n'); + + return sb.toString(); + } + + /** */ + public Map getCompletedCaches() { + return completedCaches; + } + + /** */ + public Map getInProgressCaches() { + return inProgressCaches; + } + + /** */ + public Set getAwaitingCaches() { + return awaitingCaches; + } + + /** */ + public Set getSkippedCaches() { + return skippedCaches; + } + + /** */ + public long getTotalElapsedTime() { + return totalElapsedTime; + } + + /** */ + public int getTotalPartitions() { + return totalPartitions; + } + + /** */ + public int getProcessedPartitions() { + return processedPartitions; + } + + /** */ + public long getStartTs() { + return startTs; + } + } + + /** */ + abstract class DefragmentationInfo { + /** */ + long elapsedTime; + + public DefragmentationInfo(long elapsedTime) { + this.elapsedTime = elapsedTime; + } + + /** */ + void appendDuration(StringBuilder sb, long elapsedTime) { + long duration = Math.round(elapsedTime * 1e-3); + + long mins = duration / 60; + long secs = duration % 60; + + sb.append(mins).append(" mins ").append(secs).append(" secs"); + } + + /** */ + public long getElapsedTime() { + return elapsedTime; + } + } + + /** */ + public static class CompletedDefragmentationInfo extends DefragmentationInfo { + /** */ + private static final DecimalFormat MB_FORMAT = new DecimalFormat( + "#.##", + DecimalFormatSymbols.getInstance(Locale.US) + ); + + /** */ + long sizeBefore; + + /** */ + long sizeAfter; + + public CompletedDefragmentationInfo(long elapsedTime, long sizeBefore, long sizeAfter) { + super(elapsedTime); + this.sizeBefore = sizeBefore; + this.sizeAfter = sizeAfter; + } + + /** {@inheritDoc} */ + @Override public String toString() { + StringBuilder sb = new StringBuilder(); + + double mb = 1024 * 1024; + + sb.append("size before/after: ").append(MB_FORMAT.format(sizeBefore / mb)).append("MB/"); + sb.append(MB_FORMAT.format(sizeAfter / mb)).append("MB"); + + sb.append(", time took: "); + + appendDuration(sb, elapsedTime); + + return sb.toString(); + } + + /** */ + public long getSizeBefore() { + return sizeBefore; + } + + /** */ + public long getSizeAfter() { + return sizeAfter; + } + } + + /** */ + public static class InProgressDefragmentationInfo extends DefragmentationInfo { + /** */ + int processedPartitions; + + /** */ + int totalPartitions; + + public InProgressDefragmentationInfo(long elapsedTime, int processedPartitions, int totalPartitions) { + super(elapsedTime); + this.processedPartitions = processedPartitions; + this.totalPartitions = totalPartitions; + } + + /** {@inheritDoc} */ + @Override public String toString() { + StringBuilder sb = new StringBuilder(); + + sb.append("partitions processed/all: ").append(processedPartitions).append("/").append(totalPartitions); + + sb.append(", time elapsed: "); + + appendDuration(sb, elapsedTime); + + return sb.toString(); + } + + /** */ + public int getProcessedPartitions() { + return processedPartitions; + } + + /** */ + public int getTotalPartitions() { + return totalPartitions; + } + } + +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/IgniteDefragmentationImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/IgniteDefragmentationImpl.java new file mode 100644 index 0000000000000..5c443baac411b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/IgniteDefragmentationImpl.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.defragmentation; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; +import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager; +import org.apache.ignite.internal.processors.cache.persistence.defragmentation.CachePartitionDefragmentationManager.Status; +import org.apache.ignite.maintenance.MaintenanceAction; +import org.apache.ignite.maintenance.MaintenanceRegistry; +import org.apache.ignite.maintenance.MaintenanceTask; + +import static org.apache.ignite.internal.processors.cache.persistence.defragmentation.CachePartitionDefragmentationManager.DEFRAGMENTATION_MNTC_TASK_NAME; +import static org.apache.ignite.internal.processors.cache.persistence.defragmentation.maintenance.DefragmentationParameters.toStore; + +/** + * Defragmentation operation service implementation. + */ +public class IgniteDefragmentationImpl implements IgniteDefragmentation { + /** Kernal context. */ + private final GridKernalContext ctx; + + public IgniteDefragmentationImpl(GridKernalContext ctx) { + this.ctx = ctx; + } + + /** {@inheritDoc} */ + @Override public ScheduleResult schedule(List cacheNames) throws IgniteCheckedException { + final MaintenanceRegistry maintenanceRegistry = ctx.maintenanceRegistry(); + + MaintenanceTask oldTask; + + try { + oldTask = maintenanceRegistry.registerMaintenanceTask(toStore(cacheNames != null ? cacheNames : Collections.emptyList())); + } + catch (IgniteCheckedException e) { + throw new IgniteCheckedException("Scheduling failed: " + e.getMessage()); + } + + return oldTask != null ? ScheduleResult.SUCCESS_SUPERSEDED_PREVIOUS : ScheduleResult.SUCCESS; + } + + /** {@inheritDoc} */ + @Override public CancelResult cancel() throws IgniteCheckedException { + final MaintenanceRegistry maintenanceRegistry = ctx.maintenanceRegistry(); + + if (!maintenanceRegistry.isMaintenanceMode()) { + boolean deleted = maintenanceRegistry.unregisterMaintenanceTask(DEFRAGMENTATION_MNTC_TASK_NAME); + + return deleted ? CancelResult.CANCELLED_SCHEDULED : CancelResult.SCHEDULED_NOT_FOUND; + } + else { + List> actions; + + try { + actions = maintenanceRegistry.actionsForMaintenanceTask(DEFRAGMENTATION_MNTC_TASK_NAME); + } + catch (IgniteException e) { + return CancelResult.COMPLETED_OR_CANCELLED; + } + + Optional> stopAct = actions.stream().filter(a -> "stop".equals(a.name())).findAny(); + + assert stopAct.isPresent(); + + try { + Object res = stopAct.get().execute(); + + assert res instanceof Boolean; + + boolean cancelled = (Boolean)res; + + return cancelled ? CancelResult.CANCELLED : CancelResult.COMPLETED_OR_CANCELLED; + } + catch (Exception e) { + throw new IgniteCheckedException("Exception occurred: " + e.getMessage(), e); + } + } + } + + /** {@inheritDoc} */ + @Override public DefragmentationStatus status() throws IgniteCheckedException { + final MaintenanceRegistry maintenanceRegistry = ctx.maintenanceRegistry(); + + if (!maintenanceRegistry.isMaintenanceMode()) + throw new IgniteCheckedException("Node is not in maintenance mode."); + + IgniteCacheDatabaseSharedManager dbMgr = ctx.cache().context().database(); + + assert dbMgr instanceof GridCacheDatabaseSharedManager; + + CachePartitionDefragmentationManager defrgMgr = ((GridCacheDatabaseSharedManager)dbMgr) + .defragmentationManager(); + + if (defrgMgr == null) + throw new IgniteCheckedException("There's no active defragmentation process on the node."); + + final Status status = defrgMgr.status(); + + final long startTs = status.getStartTs(); + final long finishTs = status.getFinishTs(); + final long elapsedTime = finishTs != 0 ? finishTs - startTs : System.currentTimeMillis() - startTs; + + Map completedCaches = new HashMap<>(); + Map progressCaches = new HashMap<>(); + + status.getFinishedGroups().forEach((context, progress) -> { + final String name = context.cacheOrGroupName(); + + final long oldSize = progress.getOldSize(); + final long newSize = progress.getNewSize(); + final long cgElapsedTime = progress.getFinishTs() - progress.getStartTs(); + + final CompletedDefragmentationInfo info = new CompletedDefragmentationInfo(cgElapsedTime, oldSize, newSize); + completedCaches.put(name, info); + }); + + status.getProgressGroups().forEach((context, progress) -> { + final String name = context.cacheOrGroupName(); + + final long cgElapsedTime = System.currentTimeMillis() - progress.getStartTs(); + final int partsTotal = progress.getPartsTotal(); + final int partsCompleted = progress.getPartsCompleted(); + + final InProgressDefragmentationInfo info = new InProgressDefragmentationInfo(cgElapsedTime, partsCompleted, partsTotal); + progressCaches.put(name, info); + }); + + return new DefragmentationStatus( + completedCaches, + progressCaches, + status.getScheduledGroups(), + status.getSkippedGroups(), + status.getTotalPartitionCount(), + status.getDefragmentedPartitionCount(), + startTs, + elapsedTime + ); + } + + /** {@inheritDoc} */ + @Override public boolean inProgress() { + final Status status = getStatus(); + + return status != null && status.getFinishTs() == 0; + } + + /** {@inheritDoc} */ + @Override public int processedPartitions() { + final Status status = getStatus(); + + if (status == null) + return 0; + + return status.getDefragmentedPartitionCount(); + } + + /** {@inheritDoc} */ + @Override public int totalPartitions() { + final CachePartitionDefragmentationManager.Status status = getStatus(); + + if (status == null) + return 0; + + return status.getTotalPartitionCount(); + } + + /** {@inheritDoc} */ + @Override public long startTime() { + final CachePartitionDefragmentationManager.Status status = getStatus(); + + if (status == null) + return 0; + + return status.getStartTs(); + } + + /** + * Get defragmentation status. + * @return Defragmentation status or {@code null} if there is no ongoing defragmentation. + */ + private Status getStatus() { + final MaintenanceRegistry maintenanceRegistry = ctx.maintenanceRegistry(); + + if (!maintenanceRegistry.isMaintenanceMode()) + return null; + + IgniteCacheDatabaseSharedManager dbMgr = ctx.cache().context().database(); + + assert dbMgr instanceof GridCacheDatabaseSharedManager; + + CachePartitionDefragmentationManager defrgMgr = ((GridCacheDatabaseSharedManager) dbMgr) + .defragmentationManager(); + + if (defrgMgr == null) + return null; + + return defrgMgr.status(); + } + +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java index 958053b9ee124..ad0957185790f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java @@ -54,6 +54,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheProcessor; import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl; import org.apache.ignite.internal.processors.cache.mvcc.MvccProcessor; +import org.apache.ignite.internal.processors.cache.persistence.defragmentation.IgniteDefragmentation; import org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderSettings; import org.apache.ignite.internal.processors.cache.persistence.filename.PdsFoldersResolver; import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor; @@ -491,6 +492,11 @@ protected IgniteConfiguration prepareIgniteConfiguration() { return null; } + /** {@inheritDoc} */ + @Override public IgniteDefragmentation defragmentation() { + return null; + } + /** {@inheritDoc} */ @Override public WorkersRegistry workersRegistry() { return null; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/defragmentation/VisorDefragmentationTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/defragmentation/VisorDefragmentationTask.java index 14cea626e7724..88fde8b5af6f6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/defragmentation/VisorDefragmentationTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/defragmentation/VisorDefragmentationTask.java @@ -17,27 +17,17 @@ package org.apache.ignite.internal.visor.defragmentation; -import java.util.Collections; import java.util.List; -import java.util.Optional; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.compute.ComputeJobResult; -import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; -import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager; -import org.apache.ignite.internal.processors.cache.persistence.defragmentation.CachePartitionDefragmentationManager; +import org.apache.ignite.internal.processors.cache.persistence.defragmentation.IgniteDefragmentation; import org.apache.ignite.internal.processors.task.GridInternal; import org.apache.ignite.internal.processors.task.GridVisorManagementTask; import org.apache.ignite.internal.visor.VisorJob; import org.apache.ignite.internal.visor.VisorMultiNodeTask; -import org.apache.ignite.maintenance.MaintenanceAction; -import org.apache.ignite.maintenance.MaintenanceRegistry; -import org.apache.ignite.maintenance.MaintenanceTask; import org.jetbrains.annotations.Nullable; -import static org.apache.ignite.internal.processors.cache.persistence.defragmentation.CachePartitionDefragmentationManager.DEFRAGMENTATION_MNTC_TASK_NAME; -import static org.apache.ignite.internal.processors.cache.persistence.defragmentation.maintenance.DefragmentationParameters.toStore; - /** */ @GridInternal @GridVisorManagementTask @@ -120,91 +110,71 @@ protected VisorDefragmentationJob(@Nullable VisorDefragmentationTaskArg arg, boo /** */ private VisorDefragmentationTaskResult runSchedule(VisorDefragmentationTaskArg arg) { - MaintenanceRegistry mntcReg = ignite.context().maintenanceRegistry(); + final IgniteDefragmentation defragmentation = ignite.context().defragmentation(); - MaintenanceTask oldTask; + final IgniteDefragmentation.ScheduleResult scheduleResult; try { - List cacheNames = arg.cacheNames(); - - oldTask = mntcReg.registerMaintenanceTask(toStore(cacheNames == null ? Collections.emptyList() : cacheNames)); + scheduleResult = defragmentation.schedule(arg.cacheNames()); } catch (IgniteCheckedException e) { - return new VisorDefragmentationTaskResult(false, "Scheduling failed: " + e.getMessage()); + return new VisorDefragmentationTaskResult(false, e.getMessage()); } - return new VisorDefragmentationTaskResult( - true, - "Scheduling completed successfully." + - (oldTask == null ? "" : " Previously scheduled task has been removed.") - ); + String message; + + switch (scheduleResult) { + case SUCCESS_SUPERSEDED_PREVIOUS: + message = "Scheduling completed successfully. Previously scheduled task has been removed."; + break; + case SUCCESS: + default: + message = "Scheduling completed successfully."; + break; + } + + return new VisorDefragmentationTaskResult(true, message); } /** */ private VisorDefragmentationTaskResult runStatus(VisorDefragmentationTaskArg arg) { - MaintenanceRegistry mntcReg = ignite.context().maintenanceRegistry(); - - if (!mntcReg.isMaintenanceMode()) - return new VisorDefragmentationTaskResult(false, "Node is not in maintenance node."); - - IgniteCacheDatabaseSharedManager dbMgr = ignite.context().cache().context().database(); + final IgniteDefragmentation defragmentation = ignite.context().defragmentation(); - assert dbMgr instanceof GridCacheDatabaseSharedManager; - - CachePartitionDefragmentationManager defrgMgr = ((GridCacheDatabaseSharedManager)dbMgr) - .defragmentationManager(); - - if (defrgMgr == null) - return new VisorDefragmentationTaskResult(true, "There's no active defragmentation process on the node."); - - return new VisorDefragmentationTaskResult(true, defrgMgr.status()); + try { + return new VisorDefragmentationTaskResult(true, defragmentation.status().toString()); + } catch (IgniteCheckedException e) { + return new VisorDefragmentationTaskResult(false, e.getMessage()); + } } /** */ private VisorDefragmentationTaskResult runCancel(VisorDefragmentationTaskArg arg) { - assert arg.cacheNames() == null : "Cancelling specific caches is not yet implemented"; - - MaintenanceRegistry mntcReg = ignite.context().maintenanceRegistry(); - - if (!mntcReg.isMaintenanceMode()) { - boolean deleted = mntcReg.unregisterMaintenanceTask(DEFRAGMENTATION_MNTC_TASK_NAME); + final IgniteDefragmentation defragmentation = ignite.context().defragmentation(); - String msg = deleted - ? "Scheduled defragmentation task cancelled successfully." - : "Scheduled defragmentation task is not found."; - - return new VisorDefragmentationTaskResult(true, msg); - } - else { - List> actions; - - try { - actions = mntcReg.actionsForMaintenanceTask(DEFRAGMENTATION_MNTC_TASK_NAME); - } - catch (IgniteException e) { - return new VisorDefragmentationTaskResult(true, "Defragmentation is already completed or has been cancelled previously."); + try { + final IgniteDefragmentation.CancelResult cancelResult = defragmentation.cancel(); + + String message; + + switch (cancelResult) { + case SCHEDULED_NOT_FOUND: + message = "Scheduled defragmentation task is not found."; + break; + case CANCELLED: + message = "Defragmentation cancelled successfully."; + break; + case COMPLETED_OR_CANCELLED: + message = "Defragmentation is already completed or has been cancelled previously."; + break; + case CANCELLED_SCHEDULED: + default: + message = "Scheduled defragmentation task cancelled successfully."; + break; } - Optional> stopAct = actions.stream().filter(a -> "stop".equals(a.name())).findAny(); - - assert stopAct.isPresent(); - - try { - Object res = stopAct.get().execute(); - - assert res instanceof Boolean; - - boolean cancelled = (Boolean)res; - - String msg = cancelled - ? "Defragmentation cancelled successfully." - : "Defragmentation is already completed or has been cancelled previously."; - - return new VisorDefragmentationTaskResult(true, msg); - } - catch (Exception e) { - return new VisorDefragmentationTaskResult(false, "Exception occurred: " + e.getMessage()); - } + return new VisorDefragmentationTaskResult(true, message); + } catch (IgniteCheckedException e) { + return new VisorDefragmentationTaskResult(false, e.getMessage()); } } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/defragmentation/VisorDefragmentationTaskArg.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/defragmentation/VisorDefragmentationTaskArg.java index 1b1c8b12ba023..9e6ec53f5e48b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/defragmentation/VisorDefragmentationTaskArg.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/defragmentation/VisorDefragmentationTaskArg.java @@ -33,9 +33,6 @@ public class VisorDefragmentationTaskArg extends IgniteDataTransferObject { /** */ private VisorDefragmentationOperation operation; - /** */ - private List nodeIds; - /** */ private List cacheNames; @@ -47,12 +44,10 @@ public VisorDefragmentationTaskArg() { /** */ public VisorDefragmentationTaskArg( VisorDefragmentationOperation operation, - List nodeIds, List cacheNames ) { this.operation = operation; - this.nodeIds = nodeIds; this.cacheNames = cacheNames; } @@ -61,11 +56,6 @@ public VisorDefragmentationOperation operation() { return operation; } - /** */ - public List nodeIds() { - return nodeIds; - } - /** */ public List cacheNames() { return cacheNames; @@ -75,8 +65,6 @@ public List cacheNames() { @Override protected void writeExternalData(ObjectOutput out) throws IOException { U.writeEnum(out, operation); - U.writeCollection(out, nodeIds); - U.writeCollection(out, cacheNames); } @@ -84,8 +72,6 @@ public List cacheNames() { @Override protected void readExternalData(byte protoVer, ObjectInput in) throws IOException, ClassNotFoundException { operation = U.readEnum(in, VisorDefragmentationOperation.class); - nodeIds = U.readList(in); - cacheNames = U.readList(in); } } diff --git a/modules/core/src/main/java/org/apache/ignite/mxbean/DefragmentationMXBean.java b/modules/core/src/main/java/org/apache/ignite/mxbean/DefragmentationMXBean.java new file mode 100644 index 0000000000000..22a5e2de9c39a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/mxbean/DefragmentationMXBean.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.mxbean; + +/** + * JMX bean for defragmentation manager. + */ +@MXBeanDescription("MBean that provides access for defragmentation features.") +public interface DefragmentationMXBean { + /** + * Schedule defragmentation for given caches. + * + * @param cacheNames Names of caches to run defragmentation on, comma separated. + * @return {@code true} if defragmentation is scheduled, {@code false} otherwise. + */ + @MXBeanDescription("Schedule defragmentation.") + public boolean schedule(@MXBeanParameter(name = "cacheNames", description = "Names of caches to run defragmentation on.") String cacheNames); + + /** + * Cancel defragmentation. + * + * @return {@code true} if defragmentation was canceled, {@code false} otherwise. + */ + @MXBeanDescription("Cancel current defragmentation.") + public boolean cancel(); + + /** + * Get defragmentation status. + * + * @return {@code true} if defragmentation is in progress right now. + */ + @MXBeanDescription("Cancel current defragmentation.") + public boolean inProgress(); + + /** + * Get count of processed partitions. + * + * @return {@code true} if defragmentation is in progress right now. + */ + @MXBeanDescription("Processed partitions.") + public int processedPartitions(); + + /** + * Get total count of partitions. + * + * @return {@code true} if defragmentation is in progress right now. + */ + @MXBeanDescription("Total partitions.") + public int totalPartitions(); + + /** + * Get defragmentation's start time. + * + * @return {@code true} if defragmentation is in progress right now. + */ + @MXBeanDescription("Start time.") + public long startTime(); +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheKeyCheckNearEnabledSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheKeyCheckNearEnabledSelfTest.java deleted file mode 100644 index c73512961defe..0000000000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheKeyCheckNearEnabledSelfTest.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache; - -import org.apache.ignite.configuration.NearCacheConfiguration; - -/** - * Tests for key check for near cache. - */ -public class GridCacheKeyCheckNearEnabledSelfTest extends GridCacheKeyCheckSelfTest { - /** {@inheritDoc} */ - @Override protected NearCacheConfiguration nearConfiguration() { - return new NearCacheConfiguration(); - } -} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheKeyCheckSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheKeyCheckSelfTest.java deleted file mode 100644 index cbabcd541d1bb..0000000000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheKeyCheckSelfTest.java +++ /dev/null @@ -1,209 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache; - -import org.apache.ignite.IgniteCache; -import org.apache.ignite.cache.CacheAtomicityMode; -import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.configuration.NearCacheConfiguration; -import org.junit.Test; - -import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; -import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; -import static org.apache.ignite.cache.CacheMode.PARTITIONED; -import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; - -/** - * Tests for cache key check. - */ -public class GridCacheKeyCheckSelfTest extends GridCacheAbstractSelfTest { - /** Atomicity mode. */ - private CacheAtomicityMode atomicityMode; - - /** {@inheritDoc} */ - @Override protected int gridCount() { - return 2; - } - - /** {@inheritDoc} */ - @Override protected NearCacheConfiguration nearConfiguration() { - return null; - } - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); - - cfg.setCacheConfiguration(cacheConfiguration()); - - return cfg; - } - - /** - * @return Cache configuration. - */ - protected CacheConfiguration cacheConfiguration() { - CacheConfiguration cfg = defaultCacheConfiguration(); - - cfg.setCacheMode(PARTITIONED); - cfg.setBackups(1); - cfg.setNearConfiguration(nearConfiguration()); - cfg.setWriteSynchronizationMode(FULL_SYNC); - cfg.setAtomicityMode(atomicityMode); - - return cfg; - } - - /** - * @throws Exception If failed. - */ - @Test - public void testGetTransactional() throws Exception { - checkGet(TRANSACTIONAL); - } - - /** - * @throws Exception If failed. - */ - @Test - public void testGetAtomic() throws Exception { - checkGet(ATOMIC); - } - - /** - * @throws Exception If failed. - */ - @Test - public void testPutTransactional() throws Exception { - checkPut(TRANSACTIONAL); - } - - /** - * @throws Exception If failed. - */ - @Test - public void testPutAtomic() throws Exception { - checkPut(ATOMIC); - } - - /** - * @throws Exception If failed. - */ - @Test - public void testRemoveTransactional() throws Exception { - checkRemove(TRANSACTIONAL); - } - - /** - * @throws Exception If failed. - */ - @Test - public void testRemoveAtomic() throws Exception { - checkRemove(ATOMIC); - } - - /** - * @throws Exception If failed. - */ - private void checkGet(CacheAtomicityMode atomicityMode) throws Exception { - this.atomicityMode = atomicityMode; - - try { - IgniteCache cache = cache(); - - cache.get(new IncorrectCacheKey(0)); - - fail("Key without hashCode()/equals() was successfully retrieved from cache."); - } - catch (IllegalArgumentException e) { - info("Catched expected exception: " + e.getMessage()); - - assertTrue(e.getMessage().startsWith("Cache key must override hashCode() and equals() methods")); - } - } - - /** - * @throws Exception If failed. - */ - private void checkPut(CacheAtomicityMode atomicityMode) throws Exception { - this.atomicityMode = atomicityMode; - - try { - IgniteCache cache = cache(); - - cache.put(new IncorrectCacheKey(0), "test_value"); - - fail("Key without hashCode()/equals() was successfully inserted to cache."); - } - catch (IllegalArgumentException e) { - info("Catched expected exception: " + e.getMessage()); - - assertTrue(e.getMessage().startsWith("Cache key must override hashCode() and equals() methods")); - } - } - - /** - * @throws Exception If failed. - */ - private void checkRemove(CacheAtomicityMode atomicityMode) throws Exception { - this.atomicityMode = atomicityMode; - - try { - IgniteCache cache = cache(); - - cache.remove(new IncorrectCacheKey(0)); - - fail("Key without hashCode()/equals() was successfully used for remove operation."); - } - catch (IllegalArgumentException e) { - info("Catched expected exception: " + e.getMessage()); - - assertTrue(e.getMessage().startsWith("Cache key must override hashCode() and equals() methods")); - } - } - - /** */ - private IgniteCache cache() { - grid(0).context().cache().internalCache(DEFAULT_CACHE_NAME).forceKeyCheck(); - - return grid(0).cache(DEFAULT_CACHE_NAME); - } - - /** - * Cache key that doesn't override hashCode()/equals(). - */ - private static final class IncorrectCacheKey { - /** */ - private int someVal; - - /** - * @param someVal Some test value. - */ - private IncorrectCacheKey(int someVal) { - this.someVal = someVal; - } - - /** - * @return Test value. - */ - public int getSomeVal() { - return someVal; - } - } -} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheUtilsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheUtilsSelfTest.java deleted file mode 100644 index c51c73b15a69e..0000000000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheUtilsSelfTest.java +++ /dev/null @@ -1,199 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache; - -import java.util.concurrent.Callable; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.binary.BinaryObject; -import org.apache.ignite.configuration.BinaryConfiguration; -import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.internal.binary.BinaryCachingMetadataHandler; -import org.apache.ignite.internal.binary.BinaryContext; -import org.apache.ignite.internal.binary.BinaryMarshaller; -import org.apache.ignite.internal.binary.BinaryObjectImpl; -import org.apache.ignite.internal.binary.GridBinaryMarshaller; -import org.apache.ignite.internal.binary.builder.BinaryObjectBuilderImpl; -import org.apache.ignite.internal.util.IgniteUtils; -import org.apache.ignite.internal.util.typedef.internal.CU; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.logger.NullLogger; -import org.apache.ignite.marshaller.MarshallerContextTestImpl; -import org.apache.ignite.testframework.GridTestUtils; -import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; -import org.junit.Test; - -/** - * Grid cache utils test. - */ -public class GridCacheUtilsSelfTest extends GridCommonAbstractTest { - /** - * Does not override equals and hashCode. - */ - private static class NoEqualsAndHashCode { - } - - /** - * Does not override equals. - */ - private static class NoEquals { - /** {@inheritDoc} */ - @Override public int hashCode() { - return 1; - } - } - - /** - * Does not override hashCode. - */ - private static class NoHashCode { - /** {@inheritDoc} */ - @Override public boolean equals(Object obj) { - return super.equals(obj); - } - } - - /** - * Defines equals with different signature. - */ - private static class WrongEquals { - /** - * @param obj Object. - * @return {@code False}. - */ - @Override public boolean equals(Object obj) { - return false; - } - } - - /** - * Overrides equals and hashCode. - */ - private static class EqualsAndHashCode { - /** {@inheritDoc} */ - @Override public int hashCode() { - return super.hashCode(); - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object obj) { - return super.equals(obj); - } - } - - /** - * Extends class which overrides equals and hashCode. - */ - private static class ExtendsClassWithEqualsAndHashCode extends EqualsAndHashCode { - } - - /** - * Extends class which overrides equals and hashCode, overrides equals and hashCode. - */ - private static class ExtendsClassWithEqualsAndHashCode2 extends EqualsAndHashCode { - /** {@inheritDoc} */ - @Override public int hashCode() { - return super.hashCode(); - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object obj) { - return super.equals(obj); - } - } - - /** - */ - @Test - public void testCacheKeyValidation() throws IgniteCheckedException { - CU.validateCacheKey("key"); - - CU.validateCacheKey(1); - - CU.validateCacheKey(1L); - - CU.validateCacheKey(1.0); - - CU.validateCacheKey(new ExtendsClassWithEqualsAndHashCode()); - - CU.validateCacheKey(new ExtendsClassWithEqualsAndHashCode2()); - - assertThrowsForInvalidKey(new NoEqualsAndHashCode()); - - assertThrowsForInvalidKey(new NoEquals()); - - assertThrowsForInvalidKey(new NoHashCode()); - - assertThrowsForInvalidKey(new WrongEquals()); - - BinaryObjectBuilderImpl binBuilder = new BinaryObjectBuilderImpl(binaryContext(), - EqualsAndHashCode.class.getName()); - - BinaryObject binObj = binBuilder.build(); - - CU.validateCacheKey(binObj); - - BinaryObjectBuilderImpl binBuilder2 = new BinaryObjectBuilderImpl((BinaryObjectImpl) binObj); - - CU.validateCacheKey(binBuilder2.build()); - } - - /** - * @return Binary marshaller. - * @throws IgniteCheckedException if failed. - */ - private BinaryMarshaller binaryMarshaller() throws IgniteCheckedException { - IgniteConfiguration iCfg = new IgniteConfiguration(); - - BinaryConfiguration bCfg = new BinaryConfiguration(); - - iCfg.setBinaryConfiguration(bCfg); - - BinaryContext ctx = new BinaryContext(BinaryCachingMetadataHandler.create(), iCfg, new NullLogger()); - - BinaryMarshaller marsh = new BinaryMarshaller(); - - marsh.setContext(new MarshallerContextTestImpl(null)); - - IgniteUtils.invoke(BinaryMarshaller.class, marsh, "setBinaryContext", ctx, iCfg); - - return marsh; - } - - /** - * @return Binary context. - * @throws IgniteCheckedException if failed. - */ - private BinaryContext binaryContext() throws IgniteCheckedException { - GridBinaryMarshaller impl = U.field(binaryMarshaller(), "impl"); - - return impl.context(); - } - - /** - * @param key Cache key. - */ - private void assertThrowsForInvalidKey(final Object key) { - GridTestUtils.assertThrows(log, new Callable() { - @Override public Void call() throws Exception { - CU.validateCacheKey(key); - - return null; - } - }, IllegalArgumentException.class, null); - } -} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/DefragmentationMXBeanTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/DefragmentationMXBeanTest.java new file mode 100644 index 0000000000000..f1e5c77cb64f0 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/DefragmentationMXBeanTest.java @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.defragmentation; + +import java.util.Collections; +import java.util.concurrent.CountDownLatch; +import java.util.function.UnaryOperator; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cluster.ClusterState; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.internal.IgnitionEx; +import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; +import org.apache.ignite.internal.processors.cache.persistence.defragmentation.maintenance.DefragmentationParameters; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; +import org.apache.ignite.maintenance.MaintenanceTask; +import org.apache.ignite.mxbean.DefragmentationMXBean; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Test; + +import static org.apache.ignite.cluster.ClusterState.ACTIVE; + +/** + * Tests for defragmentation JMX bean. + */ +public class DefragmentationMXBeanTest extends GridCommonAbstractTest { + /** */ + private static CountDownLatch blockCdl; + + /** */ + private static CountDownLatch waitCdl; + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + stopAllGrids(); + + cleanPersistenceDir(); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + final DataStorageConfiguration dsCfg = new DataStorageConfiguration(); + + dsCfg.setWalSegmentSize(512 * 1024).setWalSegments(3); + dsCfg.setDefaultDataRegionConfiguration( + new DataRegionConfiguration().setMaxSize(50L * 1024 * 1024).setPersistenceEnabled(true) + ); + + return cfg.setDataStorageConfiguration(dsCfg); + } + + /** + * Test that defragmentation won't be scheduled second time, if previously scheduled via maintenance registry. + * Description: + * 1. Start two nodes. + * 2. Register defragmentation maintenance task on the first node. + * 3. Restart node. + * 3. Scheduling of the defragmentation on the first node via JMX bean should fail. + * @throws Exception If failed. + */ + @Test + public void testDefragmentationSchedule() throws Exception { + Ignite ignite = startGrids(2); + + ignite.cluster().state(ACTIVE); + + DefragmentationMXBean mxBean = defragmentationMXBean(ignite.name()); + + assertTrue(mxBean.schedule("")); + + MaintenanceTask mntcTask = DefragmentationParameters.toStore(Collections.emptyList()); + + assertNotNull(grid(0).context().maintenanceRegistry().registerMaintenanceTask(mntcTask)); + assertNull(grid(1).context().maintenanceRegistry().registerMaintenanceTask(mntcTask)); + + stopGrid(0); + startGrid(0); + + // node is already in defragmentation mode, hence scheduling is not possible + assertFalse(mxBean.schedule("")); + } + + /** + * Test that defragmentation can be successfuly cancelled via JMX bean. + * @throws Exception If failed. + */ + @Test + public void testDefragmentationCancel() throws Exception { + Ignite ignite = startGrids(2); + + ignite.cluster().state(ACTIVE); + + DefragmentationMXBean mxBean = defragmentationMXBean(ignite.name()); + + mxBean.schedule(""); + + assertTrue(mxBean.cancel()); + + // subsequent cancel call should be successful + assertTrue(mxBean.cancel()); + } + + /** + * Test that ongong defragmentation can be stopped via JMX bean. + * Description: + * 1. Start one node. + * 2. Put a load of a data on it. + * 3. Schedule defragmentation. + * 4. Make IO factory slow down after 128 partitions are processed, so we have time to stop the defragmentation. + * 5. Stop the defragmentation. + * @throws Exception If failed. + */ + @Test + public void testDefragmentationCancelInProgress() throws Exception { + IgniteEx ig = startGrid(0); + + ig.cluster().state(ClusterState.ACTIVE); + + IgniteCache cache = ig.getOrCreateCache(DEFAULT_CACHE_NAME); + + for (int i = 0; i < 1024; i++) + cache.put(i, i); + + forceCheckpoint(ig); + + DefragmentationMXBean mxBean = defragmentationMXBean(ig.name()); + + mxBean.schedule(""); + + stopGrid(0); + + blockCdl = new CountDownLatch(128); + + UnaryOperator cfgOp = cfg -> { + DataStorageConfiguration dsCfg = cfg.getDataStorageConfiguration(); + + FileIOFactory delegate = dsCfg.getFileIOFactory(); + + dsCfg.setFileIOFactory((file, modes) -> { + if (file.getName().contains("dfrg")) { + if (blockCdl.getCount() == 0) { + try { + // Slow down defragmentation process. + // This'll be enough for the test since we have, like, 900 partitions left. + Thread.sleep(100); + } + catch (InterruptedException ignore) { + // No-op. + } + } + else + blockCdl.countDown(); + } + + return delegate.create(file, modes); + }); + + return cfg; + }; + + IgniteInternalFuture fut = GridTestUtils.runAsync(() -> { + try { + startGrid(0, cfgOp); + } + catch (Exception e) { + // No-op. + throw new RuntimeException(e); + } + }); + + blockCdl.await(); + + mxBean = defragmentationMXBean(ig.name()); + + assertTrue(mxBean.cancel()); + + fut.get(); + + assertTrue(mxBean.cancel()); + } + + /** + * Test that JMX bean provides correct defragmentation status. + * Description: + * 1. Start one node, + * 2. Put a load of data on it. + * 3. Schedule defragmentation. + * 4. Completely stop defragmentation when 128 partitions processed. + * 5. Check defragmentation status. + * 6. Continue defragmentation and wait for it to end. + * 7. Check defragmentation finished. + * @throws Exception If failed. + */ + @Test + public void testDefragmentationStatus() throws Exception { + IgniteEx ig = startGrid(0); + + ig.cluster().state(ClusterState.ACTIVE); + + ig.getOrCreateCache(DEFAULT_CACHE_NAME + "1"); + + IgniteCache cache = ig.getOrCreateCache(DEFAULT_CACHE_NAME + "2"); + + ig.getOrCreateCache(DEFAULT_CACHE_NAME + "3"); + + for (int i = 0; i < 1024; i++) + cache.put(i, i); + + forceCheckpoint(ig); + + DefragmentationMXBean mxBean = defragmentationMXBean(ig.name()); + + mxBean.schedule(""); + + stopGrid(0); + + blockCdl = new CountDownLatch(128); + waitCdl = new CountDownLatch(1); + + UnaryOperator cfgOp = cfg -> { + DataStorageConfiguration dsCfg = cfg.getDataStorageConfiguration(); + + FileIOFactory delegate = dsCfg.getFileIOFactory(); + + dsCfg.setFileIOFactory((file, modes) -> { + if (file.getName().contains("dfrg")) { + if (blockCdl.getCount() == 0) { + try { + waitCdl.await(); + } + catch (InterruptedException ignore) { + // No-op. + } + } + else + blockCdl.countDown(); + } + + return delegate.create(file, modes); + }); + + return cfg; + }; + + IgniteInternalFuture fut = GridTestUtils.runAsync(() -> { + try { + startGrid(0, cfgOp); + } + catch (Exception e) { + // No-op. + throw new RuntimeException(e); + } + }); + + blockCdl.await(); + + mxBean = defragmentationMXBean(ig.name()); + + final IgniteKernal gridx = IgnitionEx.gridx(ig.name()); + final IgniteDefragmentation defragmentation = gridx.context().defragmentation(); + final IgniteDefragmentation.DefragmentationStatus status1 = defragmentation.status(); + + assertEquals(status1.getStartTs(), mxBean.startTime()); + + assertTrue(mxBean.inProgress()); + assertEquals(126, mxBean.processedPartitions()); + final int totalPartitions = status1.getTotalPartitions(); + assertEquals(totalPartitions, mxBean.totalPartitions()); + + waitCdl.countDown(); + + fut.get(); + + ((GridCacheDatabaseSharedManager) grid(0).context().cache().context().database()) + .defragmentationManager() + .completionFuture() + .get(); + + assertFalse(mxBean.inProgress()); + assertEquals(totalPartitions, mxBean.processedPartitions()); + } + + /** + * Get defragmentation JMX bean. + * @param name Ignite instance name. + * @return Defragmentation JMX bean. + */ + private DefragmentationMXBean defragmentationMXBean(String name) { + return getMxBean( + name, + "Defragmentation", + DefragmentationMXBeanImpl.class, + DefragmentationMXBean.class + ); + } + +} diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java index f89759a2d0eec..b66ffaa9d9577 100755 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java @@ -74,8 +74,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheConfigurationConsistencySelfTest; import org.apache.ignite.internal.processors.cache.GridCacheConfigurationValidationSelfTest; import org.apache.ignite.internal.processors.cache.GridCacheEntryMemorySizeSelfTest; -import org.apache.ignite.internal.processors.cache.GridCacheKeyCheckNearEnabledSelfTest; -import org.apache.ignite.internal.processors.cache.GridCacheKeyCheckSelfTest; import org.apache.ignite.internal.processors.cache.GridCacheLeakTest; import org.apache.ignite.internal.processors.cache.GridCacheLifecycleAwareSelfTest; import org.apache.ignite.internal.processors.cache.GridCacheLocalTxStoreExceptionSelfTest; @@ -371,9 +369,6 @@ public static List> suite(Collection ignoredTests) { GridTestUtils.addTestIfNeeded(suite, GridCacheClearLocallySelfTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, GridCacheConcurrentGetCacheOnClientTest.class, ignoredTests); - GridTestUtils.addTestIfNeeded(suite, GridCacheKeyCheckNearEnabledSelfTest.class, ignoredTests); - GridTestUtils.addTestIfNeeded(suite, GridCacheKeyCheckSelfTest.class, ignoredTests); - GridTestUtils.addTestIfNeeded(suite, GridCacheLeakTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, GridCacheMvccFlagsTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, GridCacheReturnValueTransferSelfTest.class, ignoredTests); diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteUtilSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteUtilSelfTestSuite.java index 1e110b1fba6ff..6e1dd3a5e6133 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteUtilSelfTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteUtilSelfTestSuite.java @@ -19,7 +19,6 @@ import org.apache.ignite.internal.IgniteVersionUtilsSelfTest; import org.apache.ignite.internal.pagemem.impl.PageIdUtilsSelfTest; -import org.apache.ignite.internal.processors.cache.GridCacheUtilsSelfTest; import org.apache.ignite.internal.util.BasicRateLimiterTest; import org.apache.ignite.internal.util.DistributedProcessCoordinatorLeftTest; import org.apache.ignite.internal.util.GridArraysSelfTest; @@ -93,7 +92,6 @@ GridThreadTest.class, GridIntListSelfTest.class, GridArraysSelfTest.class, - GridCacheUtilsSelfTest.class, IgniteExceptionRegistrySelfTest.class, GridMessageCollectionTest.class, WorkersControlMXBeanTest.class,