Skip to content

Commit

Permalink
Extract transaction manager from JdbcConnector
Browse files Browse the repository at this point in the history
Preparatory commit for supporting table functions in JDBC connectors
  • Loading branch information
kasiafi committed May 10, 2022
1 parent 364af6b commit 287e869
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,46 +35,38 @@
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.Sets.immutableEnumSet;
import static io.trino.spi.connector.ConnectorCapabilities.NOT_NULL_COLUMN_CONSTRAINT;
import static io.trino.spi.transaction.IsolationLevel.READ_COMMITTED;
import static io.trino.spi.transaction.IsolationLevel.checkConnectorSupports;
import static java.util.Objects.requireNonNull;

public class JdbcConnector
implements Connector
{
private final LifeCycleManager lifeCycleManager;
private final JdbcMetadataFactory jdbcMetadataFactory;
private final ConnectorSplitManager jdbcSplitManager;
private final ConnectorRecordSetProvider jdbcRecordSetProvider;
private final ConnectorPageSinkProvider jdbcPageSinkProvider;
private final Optional<ConnectorAccessControl> accessControl;
private final Set<Procedure> procedures;
private final List<PropertyMetadata<?>> sessionProperties;
private final List<PropertyMetadata<?>> tableProperties;

private final ConcurrentMap<ConnectorTransactionHandle, JdbcMetadata> transactions = new ConcurrentHashMap<>();
private final JdbcTransactionManager transactionManager;

@Inject
public JdbcConnector(
LifeCycleManager lifeCycleManager,
JdbcMetadataFactory jdbcMetadataFactory,
ConnectorSplitManager jdbcSplitManager,
ConnectorRecordSetProvider jdbcRecordSetProvider,
ConnectorPageSinkProvider jdbcPageSinkProvider,
Optional<ConnectorAccessControl> accessControl,
Set<Procedure> procedures,
Set<SessionPropertiesProvider> sessionProperties,
Set<TablePropertiesProvider> tableProperties)
Set<TablePropertiesProvider> tableProperties,
JdbcTransactionManager transactionManager)
{
this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifeCycleManager is null");
this.jdbcMetadataFactory = requireNonNull(jdbcMetadataFactory, "jdbcMetadataFactory is null");
this.jdbcSplitManager = requireNonNull(jdbcSplitManager, "jdbcSplitManager is null");
this.jdbcRecordSetProvider = requireNonNull(jdbcRecordSetProvider, "jdbcRecordSetProvider is null");
this.jdbcPageSinkProvider = requireNonNull(jdbcPageSinkProvider, "jdbcPageSinkProvider is null");
Expand All @@ -86,37 +78,31 @@ public JdbcConnector(
this.tableProperties = requireNonNull(tableProperties, "tableProperties is null").stream()
.flatMap(tablePropertiesProvider -> tablePropertiesProvider.getTableProperties().stream())
.collect(toImmutableList());
this.transactionManager = requireNonNull(transactionManager, "transactionManager is null");
}

@Override
public ConnectorTransactionHandle beginTransaction(IsolationLevel isolationLevel, boolean readOnly, boolean autoCommit)
{
checkConnectorSupports(READ_COMMITTED, isolationLevel);
JdbcTransactionHandle transaction = new JdbcTransactionHandle();
transactions.put(transaction, jdbcMetadataFactory.create(transaction));
return transaction;
return transactionManager.beginTransaction(isolationLevel, readOnly, autoCommit);
}

@Override
public ConnectorMetadata getMetadata(ConnectorSession session, ConnectorTransactionHandle transaction)
{
JdbcMetadata metadata = transactions.get(transaction);
checkArgument(metadata != null, "no such transaction: %s", transaction);
return new ClassLoaderSafeConnectorMetadata(metadata, getClass().getClassLoader());
return new ClassLoaderSafeConnectorMetadata(transactionManager.getMetadata(transaction), getClass().getClassLoader());
}

@Override
public void commit(ConnectorTransactionHandle transaction)
{
checkArgument(transactions.remove(transaction) != null, "no such transaction: %s", transaction);
transactionManager.commit(transaction);
}

@Override
public void rollback(ConnectorTransactionHandle transaction)
{
JdbcMetadata metadata = transactions.remove(transaction);
checkArgument(metadata != null, "no such transaction: %s", transaction);
metadata.rollback();
transactionManager.rollback(transaction);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public void setup(Binder binder)
newOptionalBinder(binder, ConnectorRecordSetProvider.class).setDefault().to(JdbcRecordSetProvider.class).in(Scopes.SINGLETON);
newOptionalBinder(binder, ConnectorPageSinkProvider.class).setDefault().to(JdbcPageSinkProvider.class).in(Scopes.SINGLETON);

binder.bind(JdbcTransactionManager.class).in(Scopes.SINGLETON);
binder.bind(JdbcConnector.class).in(Scopes.SINGLETON);
configBinder(binder).bindConfig(JdbcMetadataConfig.class);
configBinder(binder).bindConfig(JdbcWriteConfig.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.jdbc;

import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.transaction.IsolationLevel;

import javax.inject.Inject;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import static com.google.common.base.Preconditions.checkArgument;
import static io.trino.spi.transaction.IsolationLevel.READ_COMMITTED;
import static io.trino.spi.transaction.IsolationLevel.checkConnectorSupports;
import static java.util.Objects.requireNonNull;

public class JdbcTransactionManager
{
private final ConcurrentMap<ConnectorTransactionHandle, JdbcMetadata> transactions = new ConcurrentHashMap<>();
private final JdbcMetadataFactory metadataFactory;

@Inject
public JdbcTransactionManager(JdbcMetadataFactory metadataFactory)
{
this.metadataFactory = requireNonNull(metadataFactory, "metadataFactory is null");
}

public ConnectorTransactionHandle beginTransaction(IsolationLevel isolationLevel, boolean readOnly, boolean autoCommit)
{
checkConnectorSupports(READ_COMMITTED, isolationLevel);
JdbcTransactionHandle transaction = new JdbcTransactionHandle();
transactions.put(transaction, metadataFactory.create(transaction));
return transaction;
}

public JdbcMetadata getMetadata(ConnectorTransactionHandle transaction)
{
JdbcMetadata metadata = transactions.get(transaction);
checkArgument(metadata != null, "no such transaction: %s", transaction);
return metadata;
}

public void commit(ConnectorTransactionHandle transaction)
{
checkArgument(transactions.remove(transaction) != null, "no such transaction: %s", transaction);
}

public void rollback(ConnectorTransactionHandle transaction)
{
JdbcMetadata metadata = transactions.remove(transaction);
checkArgument(metadata != null, "no such transaction: %s", transaction);
metadata.rollback();
}
}

0 comments on commit 287e869

Please sign in to comment.