Skip to content

Commit

Permalink
IGNITE-23004 Code cleanup.
Browse files Browse the repository at this point in the history
  • Loading branch information
xtern committed Dec 19, 2024
1 parent b7e6060 commit df4379c
Show file tree
Hide file tree
Showing 14 changed files with 98 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
/**
* Denotes a plan that can evaluates itself.
*/
@FunctionalInterface
public interface ExecutablePlan {
/**
* Evaluates plan and returns cursor over result.
Expand All @@ -44,4 +43,9 @@ <RowT> AsyncCursor<InternalSqlRow> execute(
ExecutableTableRegistry tableRegistry,
@Nullable QueryPrefetchCallback firstPageReadyCallback
);

/**
* Returns {@code true} if the plan is executed inside a transaction, {@code false} otherwise.
*/
boolean transactional();
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@
import org.apache.ignite.internal.sql.engine.prepare.KillPlan;
import org.apache.ignite.internal.sql.engine.prepare.MultiStepPlan;
import org.apache.ignite.internal.sql.engine.prepare.QueryPlan;
import org.apache.ignite.internal.sql.engine.prepare.SelectCountPlan;
import org.apache.ignite.internal.sql.engine.rel.IgniteIndexScan;
import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
import org.apache.ignite.internal.sql.engine.rel.IgniteTableModify;
Expand Down Expand Up @@ -332,7 +331,7 @@ private CompletableFuture<AsyncDataCursorExt<InternalSqlRow>> executeQuery(

assert txContext != null;

QueryTransactionWrapper txWrapper = txContext.getOrStartImplicit(plan.type() != SqlQueryType.DML);
QueryTransactionWrapper txWrapper = txContext.getOrStartImplicit(plan.type() != SqlQueryType.DML, false);
InternalTransaction tx = txWrapper.unwrap();

operationContext.notifyTxUsed(txWrapper);
Expand Down Expand Up @@ -445,14 +444,10 @@ private AsyncDataCursor<InternalSqlRow> executeExecutablePlan(
QueryTransactionWrapper txWrapper = txContext.explicitTx();

if (txWrapper == null) {
// TODO change doc and checks
// underlying table will initiate transaction by itself, but we need stub to reuse
// TxAwareAsyncCursor
if (plan instanceof SelectCountPlan) {
txWrapper = NoopTransactionWrapper.INSTANCE;
} else {
txWrapper = txContext.getOrStartImplicitOnePhase(((ExplainablePlan) plan).type() != SqlQueryType.DML);
}
txWrapper = plan.transactional()
// Underlying table will drive transaction by itself.
? txContext.getOrStartImplicit(((ExplainablePlan) plan).type() != SqlQueryType.DML, true)
: NoopTransactionWrapper.INSTANCE;
}

PrefetchCallback prefetchCallback = new PrefetchCallback();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,12 @@ public ParameterMetadata parameterMetadata() {
return parameterMetadata;
}

/** {@inheritDoc} */
@Override
public boolean transactional() {
return true;
}

/** Returns a table in question. */
private IgniteTable table() {
IgniteTable table = lookupNode.getTable().unwrap(IgniteTable.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@ public ParameterMetadata parameterMetadata() {
return parameterMetadata;
}

/** {@inheritDoc} */
@Override
public boolean transactional() {
return true;
}

/** Returns a table in question. */
private IgniteTable table() {
IgniteTable table = modifyNode.getTable().unwrap(IgniteTable.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,11 @@ public ParameterMetadata parameterMetadata() {
return parameterMetadata;
}

@Override
public boolean transactional() {
return false;
}

private <RowT> Function<Long, Iterator<InternalSqlRow>> createResultProjection(ExecutionContext<RowT> ctx) {
RelDataType getCountType = new RelDataTypeFactory.Builder(ctx.getTypeFactory())
.add("ROWCOUNT", SqlTypeName.BIGINT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@
* Context that allows to get explicit transaction provided by user or start implicit one.
*/
public interface QueryTransactionContext {
/** Returns explicit transaction or start implicit one. */
QueryTransactionWrapper getOrStartImplicit(boolean readOnly);

default QueryTransactionWrapper getOrStartImplicitOnePhase(boolean readOnly) {
// TODO Remove this method.
return getOrStartImplicit(readOnly);
}
/**
* Starts an implicit transaction if one has not been started previously
* and if there is no external (user-managed) transaction.
*
* @param readOnly Indicates whether the read-only transaction or read-write transaction should be started.
* @param tableDriven Indicates whether the implicit transaction will be managed by the table storage or the SQL engine.
* @return Transaction wrapper.
*/
QueryTransactionWrapper getOrStartImplicit(boolean readOnly, boolean tableDriven);

/** Updates tracker of latest time observed by client. */
void updateObservableTime(HybridTimestamp time);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,29 +50,21 @@ public QueryTransactionContextImpl(
this.txTracker = txTracker;
}

/**
* Starts an implicit transaction if there is no external transaction.
*
* @param readOnly Query type.
* @return Transaction wrapper.
*/
@Override
public QueryTransactionWrapper getOrStartImplicit(boolean readOnly) {
return getOrStartImplicit0(readOnly, false);
}

/** {@inheritDoc} */
@Override
public QueryTransactionWrapper getOrStartImplicitOnePhase(boolean readOnly) {
return getOrStartImplicit0(readOnly, true);
}

private QueryTransactionWrapper getOrStartImplicit0(boolean readOnly, boolean implicitOnePhase) {
public QueryTransactionWrapper getOrStartImplicit(boolean readOnly, boolean tableDriven) {
InternalTransaction transaction;
QueryTransactionWrapper result;

if (tx == null) {
transaction = txManager.begin(observableTimeTracker, implicitOnePhase, readOnly);
result = new QueryTransactionWrapperImpl(transaction, true, txTracker);
transaction = txManager.begin(observableTimeTracker, tableDriven, readOnly);

if (tableDriven) {
result = new TableDrivenImplicitTransactionWrapper(transaction, txTracker);
} else {
result = new QueryTransactionWrapperImpl(transaction, true, txTracker);
}
} else {
transaction = tx.unwrap();
result = tx;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,26 +57,16 @@ public ScriptTransactionContext(
/**
* Starts a new implicit transaction if there is no external or script-driven transaction.
*
* @param readOnly Type of the transaction to start if none is started.
* @param readOnly Indicates whether the read-only transaction or read-write transaction should be started.
* @param tableDriven Indicates whether the implicit transaction will be managed by the table manager or the SQL engine.
* @return Transaction wrapper.
*/
@Override
public QueryTransactionWrapper getOrStartImplicit(boolean readOnly) {
public QueryTransactionWrapper getOrStartImplicit(boolean readOnly, boolean tableDriven) {
QueryTransactionWrapper wrapper = this.wrapper;

if (wrapper == null) {
return txContext.getOrStartImplicit(readOnly);
}

return wrapper;
}

@Override
public QueryTransactionWrapper getOrStartImplicitOnePhase(boolean readOnly) {
QueryTransactionWrapper wrapper = this.wrapper;

if (wrapper == null) {
return txContext.getOrStartImplicitOnePhase(readOnly);
return txContext.getOrStartImplicit(readOnly, tableDriven);
}

return wrapper;
Expand Down Expand Up @@ -118,7 +108,7 @@ public CompletableFuture<Void> handleControlStatement(SqlNode node) {
}

boolean readOnly = ((IgniteSqlStartTransaction) node).getMode() == IgniteSqlStartTransactionMode.READ_ONLY;
InternalTransaction tx = txContext.getOrStartImplicit(readOnly).unwrap();
InternalTransaction tx = txContext.getOrStartImplicit(readOnly, false).unwrap();

this.wrapper = new ScriptTransactionWrapperImpl(tx, txTracker);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.sql.engine.tx;

import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.sql.engine.exec.TransactionTracker;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.util.CompletableFutures;

/**
* Transaction wrapper for table-driven implicit transaction.
*/
public class TableDrivenImplicitTransactionWrapper extends QueryTransactionWrapperImpl {
TableDrivenImplicitTransactionWrapper(InternalTransaction transaction, TransactionTracker txTracker) {
super(transaction, true, txTracker);
}

@Override
public CompletableFuture<Void> commitImplicit() {
return CompletableFutures.nullCompletedFuture();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,11 @@ public void testImplicitTransactionAttributes() {

QueryTransactionContext transactionHandler = new QueryTransactionContextImpl(txManager, observableTimeTracker, null,
transactionTracker);
QueryTransactionWrapper transactionWrapper = transactionHandler.getOrStartImplicit(false);
QueryTransactionWrapper transactionWrapper = transactionHandler.getOrStartImplicit(false, false);

assertThat(transactionWrapper.unwrap().isReadOnly(), equalTo(false));

transactionWrapper = transactionHandler.getOrStartImplicit(true);
transactionWrapper = transactionHandler.getOrStartImplicit(true, false);
assertThat(transactionWrapper.unwrap().isReadOnly(), equalTo(true));
}

Expand Down Expand Up @@ -157,29 +157,29 @@ public void testQueryTransactionWrapperTxInflightsInteraction() {

QueryTransactionContext implicitDmlTxCtx = new QueryTransactionContextImpl(txManager, observableTimeTracker, null,
transactionTracker);
implicitDmlTxCtx.getOrStartImplicit(false);
implicitDmlTxCtx.getOrStartImplicit(false, false);
// Check that RW txns do not create tx inflights.
log.info("inflights={}", inflights);
assertTrue(inflights.isEmpty());

QueryTransactionContext implicitQueryTxCtx = new QueryTransactionContextImpl(txManager, observableTimeTracker, null,
transactionTracker);
QueryTransactionWrapper implicitQueryTxWrapper = implicitQueryTxCtx.getOrStartImplicit(true);
QueryTransactionWrapper implicitQueryTxWrapper = implicitQueryTxCtx.getOrStartImplicit(true, false);
assertTrue(inflights.contains(implicitQueryTxWrapper.unwrap().id()));
implicitQueryTxWrapper.commitImplicit();
assertTrue(inflights.isEmpty());

NoOpTransaction rwTx = NoOpTransaction.readWrite("test-rw", false);
QueryTransactionContext explicitRwTxCtx = new QueryTransactionContextImpl(txManager, observableTimeTracker, rwTx,
transactionTracker);
explicitRwTxCtx.getOrStartImplicit(true);
explicitRwTxCtx.getOrStartImplicit(true, false);
// Check that RW txns do not create tx inflights.
assertTrue(inflights.isEmpty());

NoOpTransaction roTx = NoOpTransaction.readOnly("test-ro", false);
QueryTransactionContext explicitRoTxCtx = new QueryTransactionContextImpl(txManager, observableTimeTracker, roTx,
transactionTracker);
QueryTransactionWrapper explicitRoTxWrapper = explicitRoTxCtx.getOrStartImplicit(true);
QueryTransactionWrapper explicitRoTxWrapper = explicitRoTxCtx.getOrStartImplicit(true, false);
assertTrue(inflights.contains(explicitRoTxWrapper.unwrap().id()));
explicitRoTxWrapper.commitImplicit();
assertTrue(inflights.isEmpty());
Expand Down Expand Up @@ -209,7 +209,7 @@ public void testScriptTransactionWrapperTxInflightsInteraction() {
scriptRoTxCtx.handleControlStatement(sqlStartRoTx);
assertEquals(1, inflights.size());

QueryTransactionWrapper wrapper = scriptRoTxCtx.getOrStartImplicit(true);
QueryTransactionWrapper wrapper = scriptRoTxCtx.getOrStartImplicit(true, false);
assertEquals(1, inflights.size());

// ScriptTransactionWrapperImpl.commitImplicit is noop.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1138,7 +1138,7 @@ public void transactionRollbackOnError(NoOpTransaction tx) {

QueryTransactionWrapper txWrapper = mock(QueryTransactionWrapper.class);

when(txContext.getOrStartImplicit(anyBoolean())).thenReturn(txWrapper);
when(txContext.getOrStartImplicit(anyBoolean(), false)).thenReturn(txWrapper);

when(txWrapper.unwrap()).thenReturn(tx);
when(txWrapper.implicit()).thenReturn(tx.implicit());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ static class PredefinedTxContext implements QueryTransactionContext {
}

@Override
public QueryTransactionWrapper getOrStartImplicit(boolean readOnly) {
public QueryTransactionWrapper getOrStartImplicit(boolean readOnly, boolean tableDriven) {
return txWrapper;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ private ExplicitTxContext(QueryTransactionWrapper txWrapper) {
}

@Override
public QueryTransactionWrapper getOrStartImplicit(boolean readOnly) {
public QueryTransactionWrapper getOrStartImplicit(boolean readOnly, boolean tableDriven) {
return txWrapper;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class ImplicitTxContext implements QueryTransactionContext {
private ImplicitTxContext() { }

@Override
public QueryTransactionWrapper getOrStartImplicit(boolean readOnly) {
public QueryTransactionWrapper getOrStartImplicit(boolean readOnly, boolean tableDriven) {
return new QueryTransactionWrapperImpl(new NoOpTransaction("dummy", false), true, NoOpTransactionTracker.INSTANCE);
}

Expand Down

0 comments on commit df4379c

Please sign in to comment.