Skip to content

Commit

Permalink
Subscription: intro memory control when constructing file payload to …
Browse files Browse the repository at this point in the history
…avoid OOM (apache#13937)
  • Loading branch information
VGalaxies authored Nov 1, 2024
1 parent 35f5482 commit 0f86054
Show file tree
Hide file tree
Showing 11 changed files with 299 additions and 82 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.pipe.resource.memory;

import java.util.function.BiConsumer;
import java.util.function.LongUnaryOperator;

public abstract class PipeFixedMemoryBlock extends PipeMemoryBlock {

public PipeFixedMemoryBlock(long memoryUsageInBytes) {
super(memoryUsageInBytes);
}

@Override
boolean shrink() {
return false;
}

@Override
boolean expand() {
return false;
}

@Override
public PipeMemoryBlock setShrinkMethod(LongUnaryOperator shrinkMethod) {
throw new UnsupportedOperationException(
"Shrink method is not supported in PipeFixedMemoryBlock");
}

@Override
public PipeMemoryBlock setShrinkCallback(BiConsumer<Long, Long> shrinkCallback) {
throw new UnsupportedOperationException(
"Shrink callback is not supported in PipeFixedMemoryBlock");
}

@Override
public PipeMemoryBlock setExpandMethod(LongUnaryOperator extendMethod) {
throw new UnsupportedOperationException(
"Expand method is not supported in PipeFixedMemoryBlock");
}

@Override
public PipeMemoryBlock setExpandCallback(BiConsumer<Long, Long> expandCallback) {
throw new UnsupportedOperationException(
"Expand callback is not supported in PipeFixedMemoryBlock");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.pipe.resource.memory;

public enum PipeMemoryBlockType {
NORMAL,
TABLET,
TS_FILE,
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ public class PipeMemoryManager {
PipeConfig.getInstance().getPipeDataStructureTabletMemoryBlockAllocationRejectThreshold();
private volatile long usedMemorySizeInBytesOfTablets;

private static final double TS_FILE_MEMORY_REJECT_THRESHOLD =
PipeConfig.getInstance().getPipeDataStructureTsFileMemoryBlockAllocationRejectThreshold();
private volatile long usedMemorySizeInBytesOfTsFiles;

private final Set<PipeMemoryBlock> allocatedBlocks = new HashSet<>();

public PipeMemoryManager() {
Expand All @@ -71,7 +75,7 @@ public PipeMemoryManager() {

public synchronized PipeMemoryBlock forceAllocate(long sizeInBytes)
throws PipeRuntimeOutOfMemoryCriticalException {
return forceAllocate(sizeInBytes, false);
return forceAllocate(sizeInBytes, PipeMemoryBlockType.NORMAL);
}

public PipeTabletMemoryBlock forceAllocateForTabletWithRetry(long tabletSizeInBytes)
Expand Down Expand Up @@ -100,29 +104,71 @@ public PipeTabletMemoryBlock forceAllocateForTabletWithRetry(long tabletSizeInBy
throw new PipeRuntimeOutOfMemoryCriticalException(
String.format(
"forceAllocateForTablet: failed to allocate because there's too much memory for tablets, "
+ "total memory size %d bytes, used memory for tablet size %d bytes,",
+ "total memory size %d bytes, used memory for tablet size %d bytes",
TOTAL_MEMORY_SIZE_IN_BYTES, usedMemorySizeInBytesOfTablets));
}

synchronized (this) {
final PipeTabletMemoryBlock block =
(PipeTabletMemoryBlock) forceAllocate(tabletSizeInBytes, true);
(PipeTabletMemoryBlock) forceAllocate(tabletSizeInBytes, PipeMemoryBlockType.TABLET);
usedMemorySizeInBytesOfTablets += block.getMemoryUsageInBytes();
return block;
}
}

private PipeMemoryBlock forceAllocate(long sizeInBytes, boolean isForTablet)
public PipeTsFileMemoryBlock forceAllocateForTsFileWithRetry(long tsFileSizeInBytes)
throws PipeRuntimeOutOfMemoryCriticalException {
if (!PIPE_MEMORY_MANAGEMENT_ENABLED) {
return isForTablet
? new PipeTabletMemoryBlock(sizeInBytes)
: new PipeMemoryBlock(sizeInBytes);
return new PipeTsFileMemoryBlock(0);
}

for (int i = 1; i <= MEMORY_ALLOCATE_MAX_RETRIES; i++) {
if ((double) usedMemorySizeInBytesOfTsFiles / TOTAL_MEMORY_SIZE_IN_BYTES
< TS_FILE_MEMORY_REJECT_THRESHOLD) {
break;
}

try {
Thread.sleep(MEMORY_ALLOCATE_RETRY_INTERVAL_IN_MS);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
LOGGER.warn("forceAllocateWithRetry: interrupted while waiting for available memory", ex);
}
}

if ((double) usedMemorySizeInBytesOfTsFiles / TOTAL_MEMORY_SIZE_IN_BYTES
>= TS_FILE_MEMORY_REJECT_THRESHOLD) {
throw new PipeRuntimeOutOfMemoryCriticalException(
String.format(
"forceAllocateForTsFile: failed to allocate because there's too much memory for tsfiles, "
+ "total memory size %d bytes, used memory for tsfile size %d bytes",
TOTAL_MEMORY_SIZE_IN_BYTES, usedMemorySizeInBytesOfTsFiles));
}

synchronized (this) {
final PipeTsFileMemoryBlock block =
(PipeTsFileMemoryBlock) forceAllocate(tsFileSizeInBytes, PipeMemoryBlockType.TS_FILE);
usedMemorySizeInBytesOfTsFiles += block.getMemoryUsageInBytes();
return block;
}
}

private PipeMemoryBlock forceAllocate(long sizeInBytes, PipeMemoryBlockType type)
throws PipeRuntimeOutOfMemoryCriticalException {
if (!PIPE_MEMORY_MANAGEMENT_ENABLED) {
switch (type) {
case TABLET:
return new PipeTabletMemoryBlock(sizeInBytes);
case TS_FILE:
return new PipeTsFileMemoryBlock(sizeInBytes);
default:
return new PipeMemoryBlock(sizeInBytes);
}
}

for (int i = 1; i <= MEMORY_ALLOCATE_MAX_RETRIES; i++) {
if (TOTAL_MEMORY_SIZE_IN_BYTES - usedMemorySizeInBytes >= sizeInBytes) {
return registerMemoryBlock(sizeInBytes, isForTablet);
return registerMemoryBlock(sizeInBytes, type);
}

try {
Expand Down Expand Up @@ -163,6 +209,9 @@ public synchronized void forceResize(PipeMemoryBlock block, long targetSize) {
if (block instanceof PipeTabletMemoryBlock) {
usedMemorySizeInBytesOfTablets -= oldSize - targetSize;
}
if (block instanceof PipeTsFileMemoryBlock) {
usedMemorySizeInBytesOfTsFiles -= oldSize - targetSize;
}
block.setMemoryUsageInBytes(targetSize);
return;
}
Expand All @@ -174,6 +223,9 @@ public synchronized void forceResize(PipeMemoryBlock block, long targetSize) {
if (block instanceof PipeTabletMemoryBlock) {
usedMemorySizeInBytesOfTablets += sizeInBytes;
}
if (block instanceof PipeTsFileMemoryBlock) {
usedMemorySizeInBytesOfTsFiles += sizeInBytes;
}
block.setMemoryUsageInBytes(targetSize);
return;
}
Expand Down Expand Up @@ -252,7 +304,7 @@ public synchronized PipeMemoryBlock tryAllocate(
LOGGER.info(
"tryAllocate: allocated memory, "
+ "total memory size {} bytes, used memory size {} bytes, "
+ "original requested memory size {} bytes,"
+ "original requested memory size {} bytes, "
+ "actual requested memory size {} bytes",
TOTAL_MEMORY_SIZE_IN_BYTES,
usedMemorySizeInBytes,
Expand All @@ -271,7 +323,7 @@ public synchronized PipeMemoryBlock tryAllocate(
LOGGER.info(
"tryAllocate: allocated memory, "
+ "total memory size {} bytes, used memory size {} bytes, "
+ "original requested memory size {} bytes,"
+ "original requested memory size {} bytes, "
+ "actual requested memory size {} bytes",
TOTAL_MEMORY_SIZE_IN_BYTES,
usedMemorySizeInBytes,
Expand Down Expand Up @@ -301,6 +353,9 @@ public synchronized boolean tryAllocate(
if (block instanceof PipeTabletMemoryBlock) {
usedMemorySizeInBytesOfTablets += memoryInBytesNeededToBeAllocated;
}
if (block instanceof PipeTsFileMemoryBlock) {
usedMemorySizeInBytesOfTsFiles += memoryInBytesNeededToBeAllocated;
}
block.setMemoryUsageInBytes(block.getMemoryUsageInBytes() + memoryInBytesNeededToBeAllocated);
return true;
}
Expand All @@ -309,14 +364,25 @@ public synchronized boolean tryAllocate(
}

private PipeMemoryBlock registerMemoryBlock(long sizeInBytes) {
return registerMemoryBlock(sizeInBytes, false);
return registerMemoryBlock(sizeInBytes, PipeMemoryBlockType.NORMAL);
}

private PipeMemoryBlock registerMemoryBlock(long sizeInBytes, boolean isForTablet) {
private PipeMemoryBlock registerMemoryBlock(long sizeInBytes, PipeMemoryBlockType type) {
usedMemorySizeInBytes += sizeInBytes;

final PipeMemoryBlock returnedMemoryBlock =
isForTablet ? new PipeTabletMemoryBlock(sizeInBytes) : new PipeMemoryBlock(sizeInBytes);
final PipeMemoryBlock returnedMemoryBlock;
switch (type) {
case TABLET:
returnedMemoryBlock = new PipeTabletMemoryBlock(sizeInBytes);
break;
case TS_FILE:
returnedMemoryBlock = new PipeTsFileMemoryBlock(sizeInBytes);
break;
default:
returnedMemoryBlock = new PipeMemoryBlock(sizeInBytes);
break;
}

allocatedBlocks.add(returnedMemoryBlock);
return returnedMemoryBlock;
}
Expand Down Expand Up @@ -366,6 +432,19 @@ public synchronized void tryExpandAllAndCheckConsistency() {
usedMemorySizeInBytesOfTablets,
tabletBlockSum);
}

long tsFileBlockSum =
allocatedBlocks.stream()
.filter(PipeTsFileMemoryBlock.class::isInstance)
.mapToLong(PipeMemoryBlock::getMemoryUsageInBytes)
.sum();
if (tsFileBlockSum != usedMemorySizeInBytesOfTsFiles) {
LOGGER.warn(
"tryExpandAllAndCheckConsistency: memory usage of tsfiles is not consistent with allocated blocks,"
+ " usedMemorySizeInBytesOfTsFiles is {} but sum of all tsfile blocks is {}",
usedMemorySizeInBytesOfTsFiles,
tsFileBlockSum);
}
}

public synchronized void release(PipeMemoryBlock block) {
Expand All @@ -378,6 +457,9 @@ public synchronized void release(PipeMemoryBlock block) {
if (block instanceof PipeTabletMemoryBlock) {
usedMemorySizeInBytesOfTablets -= block.getMemoryUsageInBytes();
}
if (block instanceof PipeTsFileMemoryBlock) {
usedMemorySizeInBytesOfTsFiles -= block.getMemoryUsageInBytes();
}
block.markAsReleased();

this.notifyAll();
Expand All @@ -392,6 +474,9 @@ public synchronized boolean release(PipeMemoryBlock block, long sizeInBytes) {
if (block instanceof PipeTabletMemoryBlock) {
usedMemorySizeInBytesOfTablets -= sizeInBytes;
}
if (block instanceof PipeTsFileMemoryBlock) {
usedMemorySizeInBytesOfTsFiles -= sizeInBytes;
}
block.setMemoryUsageInBytes(block.getMemoryUsageInBytes() - sizeInBytes);

this.notifyAll();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,46 +19,9 @@

package org.apache.iotdb.db.pipe.resource.memory;

import java.util.function.BiConsumer;
import java.util.function.LongUnaryOperator;

public class PipeTabletMemoryBlock extends PipeMemoryBlock {
public class PipeTabletMemoryBlock extends PipeFixedMemoryBlock {

public PipeTabletMemoryBlock(long memoryUsageInBytes) {
super(memoryUsageInBytes);
}

@Override
boolean shrink() {
return false;
}

@Override
boolean expand() {
return false;
}

@Override
public PipeMemoryBlock setShrinkMethod(LongUnaryOperator shrinkMethod) {
throw new UnsupportedOperationException(
"Shrink method is not supported in PipeTabletMemoryBlock");
}

@Override
public PipeMemoryBlock setShrinkCallback(BiConsumer<Long, Long> shrinkCallback) {
throw new UnsupportedOperationException(
"Shrink callback is not supported in PipeTabletMemoryBlock");
}

@Override
public PipeMemoryBlock setExpandMethod(LongUnaryOperator extendMethod) {
throw new UnsupportedOperationException(
"Expand method is not supported in PipeTabletMemoryBlock");
}

@Override
public PipeMemoryBlock setExpandCallback(BiConsumer<Long, Long> expandCallback) {
throw new UnsupportedOperationException(
"Expand callback is not supported in PipeTabletMemoryBlock");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.pipe.resource.memory;

public class PipeTsFileMemoryBlock extends PipeFixedMemoryBlock {

public PipeTsFileMemoryBlock(long memoryUsageInBytes) {
super(memoryUsageInBytes);
}
}
Loading

0 comments on commit 0f86054

Please sign in to comment.