Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.io.lsm;

import java.io.IOException;
import java.util.Iterator;

public interface ExternalRecordReader<I, O> extends RecordReader<O> {
/**
* Spill all record to disk and return a external record iterator
*
* @param iterator
* @return A External record iterator
* @throws IOException
*/
Iterator<I> spillToDisk(Iterator<I> iterator) throws IOException;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.io.lsm;

import java.util.Comparator;

public interface FieldsComparator<T> extends Comparator<T> {

int[] compareFields();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.io.lsm;

import org.apache.hudi.common.model.HoodieRecord;

/**
* Merge function to merge multiple {@link KeyValue}s.
*
* <p>IMPORTANT, Object reusing inside the kv of the {@link #add} input:
*
* <ul>
* <li>Please don't save KeyValue and InternalRow references to the List: the KeyValue of the
* first two objects and the InternalRow object inside them are safe, but the reference of the
* third object may overwrite the reference of the first object.
* <li>You can save fields references: fields don't reuse their objects.
* </ul>
*
* @param <T> result type
*/
public interface MergeFunction<T> {
/** Reset the merge function to its default state. */
void reset();

/** Add the given {@link KeyValue} to the merge function. */
void add(HoodieRecord kv);

/** Get current merged value. */
T getResult();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.io.lsm;

import org.apache.hudi.common.model.HoodieRecord;
import java.util.Comparator;
import java.util.List;

public class MergeSorter {
private final SortEngine sortEngine;

public MergeSorter() {
this.sortEngine = SortEngine.LOSER_TREE;
}

public <T> RecordReader<T> mergeSort(List<RecordReader<HoodieRecord>> readers,
RecordMergeWrapper<T> mergeFunctionWrapper,
Comparator<HoodieRecord> comparator) {
return SortMergeReader.createSortMergeReader(readers, comparator, mergeFunctionWrapper, sortEngine);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.io.lsm;

import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;

import java.util.Iterator;
import java.util.List;

public interface RecordMergeWrapper<T> {

/**
* merge a group of HoodieRecords with the same record key
* @param recordGroup
* @return
*/
Option<T> merge(List<HoodieRecord> recordGroup);

/**
* merge a iterator of HoodieRecords with the same record key
* @param sameKeyIterator
* @return
*/
Option<T> merge(Iterator<HoodieRecord> sameKeyIterator);

/**
* Sequentially merge HoodieRecords with the same record key
* @param record
*/
void merge(HoodieRecord record);

/**
* Obtain the sequentially merged results of HoodieRecords
* @return Option<InternalRow>
*/
Option<T> getMergedResult();

void reset();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.io.lsm;

import javax.annotation.Nullable;

import java.io.Closeable;
import java.io.IOException;
import java.util.Iterator;

public interface RecordReader<T> extends Closeable {

/**
* Reads one batch. The method should return null when reaching the end of the input.
*
* <p>The returned iterator object and any contained objects may be held onto by the source for
* some time, so it should not be immediately reused by the reader.
*/
@Nullable
Iterator<T> read() throws IOException;

/** Closes the reader and should release all resources. */
@Override
void close() throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.io.lsm;

public enum SortEngine {
MIN_HEAP("min-heap", "Use min-heap for multiway sorting."),
LOSER_TREE(
"loser-tree",
"Use loser-tree for multiway sorting. Compared with heapsort, loser-tree has fewer comparisons and is more efficient.");

private final String value;
private final String description;

SortEngine(String value, String description) {
this.value = value;
this.description = description;
}

@Override
public String toString() {
return value;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.io.lsm;

import org.apache.hudi.common.model.HoodieRecord;

import java.util.Comparator;
import java.util.List;

public interface SortMergeReader<T> extends RecordReader<T> {

// todo zhangyue143 实现 LOSER_TREE
static <T> SortMergeReader<T> createSortMergeReader(List<RecordReader<HoodieRecord>> readers,
Comparator<HoodieRecord> userKeyComparator,
RecordMergeWrapper<T> mergeFunctionWrapper,
SortEngine sortEngine) {
switch (sortEngine) {
case LOSER_TREE:
return new SortMergeReaderLoserTreeStateMachine<>(readers, userKeyComparator, mergeFunctionWrapper);

default:
throw new UnsupportedOperationException("Unsupported sort engine: " + sortEngine);
}
}
}
Loading
Loading