Skip to content

Commit

Permalink
Migrate RaptorMetadata to non-deprecated methods
Browse files Browse the repository at this point in the history
  • Loading branch information
losipiuk committed May 13, 2022
1 parent ff171d7 commit 99f736c
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import io.trino.spi.connector.ConnectorViewDefinition;
import io.trino.spi.connector.Constraint;
import io.trino.spi.connector.ConstraintApplicationResult;
import io.trino.spi.connector.RetryMode;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.SchemaTablePrefix;
import io.trino.spi.connector.SystemTable;
Expand Down Expand Up @@ -112,6 +113,7 @@
import static io.trino.spi.StandardErrorCode.INVALID_TABLE_PROPERTY;
import static io.trino.spi.StandardErrorCode.NOT_FOUND;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.trino.spi.connector.RetryMode.NO_RETRIES;
import static io.trino.spi.connector.SortOrder.ASC_NULLS_FIRST;
import static io.trino.spi.type.DateType.DATE;
import static io.trino.spi.type.IntegerType.INTEGER;
Expand Down Expand Up @@ -449,7 +451,7 @@ private Distribution getOrCreateDistribution(String name, List<Type> columnTypes
public void createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, boolean ignoreExisting)
{
Optional<ConnectorTableLayout> layout = getNewTableLayout(session, tableMetadata);
finishCreateTable(session, beginCreateTable(session, tableMetadata, layout), ImmutableList.of(), ImmutableList.of());
finishCreateTable(session, beginCreateTable(session, tableMetadata, layout, NO_RETRIES), ImmutableList.of(), ImmutableList.of());
}

@Override
Expand Down Expand Up @@ -543,8 +545,12 @@ public void dropColumn(ConnectorSession session, ConnectorTableHandle tableHandl
}

@Override
public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Optional<ConnectorTableLayout> layout)
public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, Optional<ConnectorTableLayout> layout, RetryMode retryMode)
{
if (retryMode != NO_RETRIES) {
throw new TrinoException(NOT_SUPPORTED, "This connector does not support query retries");
}

if (viewExists(session, tableMetadata.getTable())) {
throw new TrinoException(ALREADY_EXISTS, "View already exists: " + tableMetadata.getTable());
}
Expand Down Expand Up @@ -705,8 +711,12 @@ public Optional<ConnectorOutputMetadata> finishCreateTable(ConnectorSession sess
}

@Override
public ConnectorInsertTableHandle beginInsert(ConnectorSession session, ConnectorTableHandle tableHandle)
public ConnectorInsertTableHandle beginInsert(ConnectorSession session, ConnectorTableHandle tableHandle, List<ColumnHandle> columns, RetryMode retryMode)
{
if (retryMode != NO_RETRIES) {
throw new TrinoException(NOT_SUPPORTED, "This connector does not support query retries");
}

RaptorTableHandle handle = (RaptorTableHandle) tableHandle;
long tableId = handle.getTableId();

Expand Down Expand Up @@ -784,8 +794,12 @@ public ColumnHandle getDeleteRowIdColumnHandle(ConnectorSession session, Connect
}

@Override
public ConnectorTableHandle beginDelete(ConnectorSession session, ConnectorTableHandle tableHandle)
public ConnectorTableHandle beginDelete(ConnectorSession session, ConnectorTableHandle tableHandle, RetryMode retryMode)
{
if (retryMode != NO_RETRIES) {
throw new TrinoException(NOT_SUPPORTED, "This connector does not support query retries");
}

RaptorTableHandle handle = (RaptorTableHandle) tableHandle;

beginDeleteForTableId.accept(handle.getTableId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import static io.trino.plugin.raptor.legacy.metadata.SchemaDaoUtil.createTablesWithRetry;
import static io.trino.plugin.raptor.legacy.metadata.TestDatabaseShardManager.createShardManager;
import static io.trino.plugin.raptor.legacy.storage.TestRaptorStorageManager.createRaptorStorageManager;
import static io.trino.spi.connector.RetryMode.NO_RETRIES;
import static io.trino.spi.transaction.IsolationLevel.READ_COMMITTED;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.DateType.DATE;
Expand Down Expand Up @@ -132,23 +133,23 @@ public void testMaintenanceBlocked()
// begin delete for table1
ConnectorTransactionHandle txn1 = beginTransaction();
ConnectorTableHandle handle1 = getTableHandle(connector.getMetadata(SESSION, txn1), "test1");
connector.getMetadata(SESSION, txn1).beginDelete(SESSION, handle1);
connector.getMetadata(SESSION, txn1).beginDelete(SESSION, handle1, NO_RETRIES);

assertTrue(metadataDao.isMaintenanceBlockedLocked(tableId1));
assertFalse(metadataDao.isMaintenanceBlockedLocked(tableId2));

// begin delete for table2
ConnectorTransactionHandle txn2 = beginTransaction();
ConnectorTableHandle handle2 = getTableHandle(connector.getMetadata(SESSION, txn2), "test2");
connector.getMetadata(SESSION, txn2).beginDelete(SESSION, handle2);
connector.getMetadata(SESSION, txn2).beginDelete(SESSION, handle2, NO_RETRIES);

assertTrue(metadataDao.isMaintenanceBlockedLocked(tableId1));
assertTrue(metadataDao.isMaintenanceBlockedLocked(tableId2));

// begin another delete for table1
ConnectorTransactionHandle txn3 = beginTransaction();
ConnectorTableHandle handle3 = getTableHandle(connector.getMetadata(SESSION, txn3), "test1");
connector.getMetadata(SESSION, txn3).beginDelete(SESSION, handle3);
connector.getMetadata(SESSION, txn3).beginDelete(SESSION, handle3, NO_RETRIES);

assertTrue(metadataDao.isMaintenanceBlockedLocked(tableId1));
assertTrue(metadataDao.isMaintenanceBlockedLocked(tableId2));
Expand Down Expand Up @@ -222,7 +223,7 @@ private void assertSplitShard(Type temporalType, String min, String max, int exp

ConnectorTransactionHandle txn1 = beginTransaction();
ConnectorTableHandle handle1 = getTableHandle(connector.getMetadata(SESSION, txn1), "test");
ConnectorInsertTableHandle insertTableHandle = connector.getMetadata(SESSION, txn1).beginInsert(session, handle1);
ConnectorInsertTableHandle insertTableHandle = connector.getMetadata(SESSION, txn1).beginInsert(session, handle1, ImmutableList.of(), NO_RETRIES);
ConnectorPageSink raptorPageSink = connector.getPageSinkProvider().createPageSink(txn1, session, insertTableHandle);

Object timestamp1 = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import static io.trino.plugin.raptor.legacy.metadata.SchemaDaoUtil.createTablesWithRetry;
import static io.trino.plugin.raptor.legacy.metadata.TestDatabaseShardManager.createShardManager;
import static io.trino.spi.StandardErrorCode.TRANSACTION_CONFLICT;
import static io.trino.spi.connector.RetryMode.NO_RETRIES;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.DateType.DATE;
import static io.trino.spi.type.DoubleType.DOUBLE;
Expand Down Expand Up @@ -386,7 +387,7 @@ public void testCreateBucketedTableAsSelect()
RaptorPartitioningHandle partitioning = (RaptorPartitioningHandle) layout.getPartitioning().get();
assertEquals(partitioning.getDistributionId(), 1);

ConnectorOutputTableHandle outputHandle = metadata.beginCreateTable(SESSION, ordersTable, Optional.of(layout));
ConnectorOutputTableHandle outputHandle = metadata.beginCreateTable(SESSION, ordersTable, Optional.of(layout), NO_RETRIES);
metadata.finishCreateTable(SESSION, outputHandle, ImmutableList.of(), ImmutableList.of());

ConnectorTableHandle tableHandle = metadata.getTableHandle(SESSION, DEFAULT_TEST_ORDERS);
Expand Down Expand Up @@ -664,7 +665,7 @@ public void testTransactionTableWrite()
{
// start table creation
long transactionId = 1;
ConnectorOutputTableHandle outputHandle = metadata.beginCreateTable(SESSION, getOrdersTable(), Optional.empty());
ConnectorOutputTableHandle outputHandle = metadata.beginCreateTable(SESSION, getOrdersTable(), Optional.empty(), NO_RETRIES);

// transaction is in progress
assertTrue(transactionExists(transactionId));
Expand All @@ -687,7 +688,7 @@ public void testTransactionInsert()
// start insert
transactionId++;
ConnectorTableHandle tableHandle = metadata.getTableHandle(SESSION, DEFAULT_TEST_ORDERS);
ConnectorInsertTableHandle insertHandle = metadata.beginInsert(SESSION, tableHandle);
ConnectorInsertTableHandle insertHandle = metadata.beginInsert(SESSION, tableHandle, ImmutableList.of(), NO_RETRIES);

// transaction is in progress
assertTrue(transactionExists(transactionId));
Expand All @@ -710,7 +711,7 @@ public void testTransactionDelete()
// start delete
transactionId++;
ConnectorTableHandle tableHandle = metadata.getTableHandle(SESSION, DEFAULT_TEST_ORDERS);
tableHandle = metadata.beginDelete(SESSION, tableHandle);
tableHandle = metadata.beginDelete(SESSION, tableHandle, NO_RETRIES);

// verify transaction is assigned for deletion handle
assertInstanceOf(tableHandle, RaptorTableHandle.class);
Expand All @@ -729,7 +730,7 @@ public void testTransactionDelete()

// start another delete
transactionId++;
tableHandle = metadata.beginDelete(SESSION, tableHandle);
tableHandle = metadata.beginDelete(SESSION, tableHandle, NO_RETRIES);

// transaction is in progress
assertTrue(transactionExists(transactionId));
Expand All @@ -746,7 +747,7 @@ public void testTransactionAbort()
{
// start table creation
long transactionId = 1;
ConnectorOutputTableHandle outputHandle = metadata.beginCreateTable(SESSION, getOrdersTable(), Optional.empty());
ConnectorOutputTableHandle outputHandle = metadata.beginCreateTable(SESSION, getOrdersTable(), Optional.empty(), NO_RETRIES);

// transaction is in progress
assertTrue(transactionExists(transactionId));
Expand Down

0 comments on commit 99f736c

Please sign in to comment.