Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
38446: cluster: remove a series of 2.1+ cluster versions r=tbg a=nvanbenschoten

I wanted to remove `VersionLazyTxnRecord` and `VersionPushTxnToInclusive`,
but while there I removed all of the following:
- VersionCascadingZoneConfigs
- VersionLoadSplits
- VersionExportStorageWorkload
- VersionLazyTxnRecord
- VersionSequencedReads
- VersionCreateStats
- VersionDirectImport
- VersionPushTxnToInclusive

Co-authored-by: Nathan VanBenschoten <nvanbenschoten@gmail.com>
  • Loading branch information
craig[bot] and nvanbenschoten committed Jun 28, 2019
2 parents dd9784d + 7400cc4 commit d668aa9
Show file tree
Hide file tree
Showing 24 changed files with 1,038 additions and 1,350 deletions.
6 changes: 0 additions & 6 deletions pkg/ccl/importccl/import_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,12 +371,6 @@ func importPlanHook(
}

_, ingestDirectly := opts[importOptionDirectIngest]
if ingestDirectly {
if !p.ExecCfg().Settings.Version.IsActive(cluster.VersionDirectImport) {
return errors.Errorf("Using %q requires all nodes to be upgraded to %s",
importOptionDirectIngest, cluster.VersionByKey(cluster.VersionDirectImport))
}
}

var tableDetails []jobspb.ImportDetails_Table
jobDesc, err := importJobDescription(p, importStmt, nil, files, opts)
Expand Down
5 changes: 0 additions & 5 deletions pkg/ccl/storageccl/export_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,11 +221,6 @@ func MakeExportStorage(
telemetry.Count("external-io.azure")
return makeAzureStorage(dest.AzureConfig, settings)
case roachpb.ExportStorageProvider_Workload:
if err := settings.Version.CheckVersion(
cluster.VersionExportStorageWorkload, "experimental-workload",
); err != nil {
return nil, err
}
telemetry.Count("external-io.workload")
return makeWorkloadStorage(dest.WorkloadConfig)
}
Expand Down
6 changes: 0 additions & 6 deletions pkg/ccl/storageccl/export_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,10 +591,4 @@ func TestWorkloadStorage(t *testing.T) {
require.EqualError(t, err, `expected bank version "" but got "1.0.0"`)
_, err = ExportStorageFromURI(ctx, `experimental-workload:///csv/bank/bank?version=nope`, settings)
require.EqualError(t, err, `expected bank version "nope" but got "1.0.0"`)

tooOldSettings := cluster.MakeTestingClusterSettingsWithVersion(
cluster.VersionByKey(cluster.Version2_1), cluster.VersionByKey(cluster.Version2_1))
_, err = ExportStorageFromURI(ctx, bankURL().String(), tooOldSettings)
require.EqualError(t, err,
`cluster version does not support experimental-workload (>= 2.1-3 required)`)
}
1 change: 0 additions & 1 deletion pkg/kv/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,6 @@ func (tcf *TxnCoordSenderFactory) TransactionalSender(
tcf.AmbientContext,
&tcs.mu.Mutex,
&tcs.mu.txn,
tcf.st,
tcs.clock,
tcs.heartbeatInterval,
&tcs.interceptorAlloc.txnLockGatekeeper,
Expand Down
122 changes: 0 additions & 122 deletions pkg/kv/txn_coord_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (

"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
Expand Down Expand Up @@ -1643,127 +1642,6 @@ func TestCommitMutatingTransaction(t *testing.T) {
}
}

// TestTxnInsertBeginTransaction verifies that a begin transaction
// request is inserted just before the first mutating command.
// TODO(nvanbenschoten): Remove in 2.3.
func TestTxnInsertBeginTransaction(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond)
ambient := log.AmbientContext{Tracer: tracing.NewTracer()}
sender := &mockSender{}
stopper := stop.NewStopper()
defer stopper.Stop(ctx)

var calls []roachpb.Method
sender.match(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
br := ba.CreateReply()
br.Txn = ba.Txn.Clone()

calls = append(calls, ba.Methods()...)
if bt, ok := ba.GetArg(roachpb.BeginTransaction); ok && !bt.Header().Key.Equal(roachpb.Key("a")) {
t.Errorf("expected begin transaction key to be \"a\"; got %s", bt.Header().Key)
}
if et, ok := ba.GetArg(roachpb.EndTransaction); ok {
if !et.(*roachpb.EndTransactionRequest).Commit {
t.Errorf("expected commit to be true")
}
br.Txn.Status = roachpb.COMMITTED
}
return br, nil
})

v := cluster.VersionByKey(cluster.Version2_1)
st := cluster.MakeTestingClusterSettingsWithVersion(v, v)
factory := NewTxnCoordSenderFactory(
TxnCoordSenderFactoryConfig{
AmbientCtx: ambient,
Settings: st,
Clock: clock,
Stopper: stopper,
},
sender,
)

db := client.NewDB(testutils.MakeAmbientCtx(), factory, clock)
if err := db.Txn(context.TODO(), func(ctx context.Context, txn *client.Txn) error {
if _, err := txn.Get(ctx, "foo"); err != nil {
return err
}
return txn.Put(ctx, "a", "b")
}); err != nil {
t.Fatalf("unexpected error on commit: %s", err)
}
expectedCalls := []roachpb.Method{
roachpb.Get,
roachpb.BeginTransaction,
roachpb.Put,
roachpb.QueryIntent,
roachpb.EndTransaction}
if !reflect.DeepEqual(expectedCalls, calls) {
t.Fatalf("expected %s, got %s", expectedCalls, calls)
}
}

// TestBeginTransactionErrorIndex verifies that the error index is cleared
// when a BeginTransaction command causes an error.
// TODO(nvanbenschoten): Remove in 2.3.
func TestBeginTransactionErrorIndex(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond)
ambient := log.AmbientContext{Tracer: tracing.NewTracer()}
sender := &mockSender{}
stopper := stop.NewStopper()
defer stopper.Stop(ctx)

sender.match(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
pErr := roachpb.NewError(&roachpb.WriteIntentError{})
pErr.SetErrorIndex(0)
return nil, pErr
})

v := cluster.VersionByKey(cluster.Version2_1)
st := cluster.MakeTestingClusterSettingsWithVersion(v, v)
factory := NewTxnCoordSenderFactory(
TxnCoordSenderFactoryConfig{
AmbientCtx: ambient,
Settings: st,
Clock: clock,
Stopper: stopper,
},
sender,
)

db := client.NewDB(testutils.MakeAmbientCtx(), factory, clock)
_ = db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
b := txn.NewBatch()
b.Put("a", "b")
err := getOneErr(txn.Run(ctx, b), b)
if err == nil {
t.Fatal("missing err")
}
pErr := b.MustPErr()
// Verify that the original error type is preserved, but the error index is unset.
if _, ok := pErr.GetDetail().(*roachpb.WriteIntentError); !ok {
t.Fatalf("unexpected error %s", pErr)
}
if pErr.Index != nil {
t.Errorf("error index must not be set, but got %d", pErr.Index)
}
return err
})
}

// getOneErr returns the error for a single-request Batch that was run.
// runErr is the error returned by Run, b is the Batch that was passed to Run.
func getOneErr(runErr error, b *client.Batch) error {
if runErr != nil && len(b.Results) > 0 {
return b.Results[0].Err
}
return runErr
}

// TestAbortReadOnlyTransaction verifies that aborting a read-only
// transaction does not prompt an EndTransaction call.
func TestAbortReadOnlyTransaction(t *testing.T) {
Expand Down
103 changes: 21 additions & 82 deletions pkg/kv/txn_interceptor_heartbeater.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
Expand Down Expand Up @@ -68,7 +67,6 @@ type txnHeartbeater struct {
// sends got through `wrapped`, not directly through `gatekeeper`.
gatekeeper lockedSender

st *cluster.Settings
clock *hlc.Clock
heartbeatInterval time.Duration
metrics *TxnMetrics
Expand All @@ -86,6 +84,10 @@ type txnHeartbeater struct {
mu struct {
sync.Locker

// loopStarted indicates whether the heartbeat loop has been launched
// for the transaction or not. It remains true once the loop terminates.
loopStarted bool

// txnEnd is closed when the transaction is aborted or committed, terminating
// the heartbeat loop. Nil if the heartbeat loop is not running.
txnEnd chan struct{}
Expand All @@ -96,19 +98,6 @@ type txnHeartbeater struct {
// finalErr, if set, will be returned by all subsequent SendLocked() calls,
// except rollbacks.
finalErr *roachpb.Error

// needBeginTxn dictates whether a BeginTxn request is to be prepended to a
// write batch. It starts as set and then gets unset when the BeginTxn is
// sent. It gets reset on epoch increment, as it's possible that the
// retriable error was generated by the BeginTxn batch and the transaction
// record has not been written.
// We could be smarter about not resetting this if there's ever been a
// successful BeginTxn (in which case we know that there is a txn record)
// but as of May 2018 we don't do that. Note that the server accepts a
// BeginTxn with a higher epoch if a transaction record already exists.
// TODO(nvanbenschoten): Once we stop sending BeginTxn entirely (v2.3)
// we can get rid of this. For now, we keep it to ensure compatibility.
needBeginTxn bool
}
}

Expand All @@ -118,7 +107,6 @@ func (h *txnHeartbeater) init(
ac log.AmbientContext,
mu sync.Locker,
txn *roachpb.Transaction,
st *cluster.Settings,
clock *hlc.Clock,
heartbeatInterval time.Duration,
gatekeeper lockedSender,
Expand All @@ -129,13 +117,11 @@ func (h *txnHeartbeater) init(
h.AmbientContext = ac
h.AmbientContext.AddLogTag("txn-hb", txn.Short())
h.stopper = stopper
h.st = st
h.clock = clock
h.heartbeatInterval = heartbeatInterval
h.metrics = metrics
h.mu.Locker = mu
h.mu.txn = txn
h.mu.needBeginTxn = true
h.gatekeeper = gatekeeper
h.asyncAbortCallbackLocked = asyncAbortCallbackLocked
}
Expand All @@ -158,81 +144,35 @@ func (h *txnHeartbeater) SendLocked(
return nil, pErr
}
haveTxnWrite := firstWriteIdx != -1
_, haveEndTxn := ba.GetArg(roachpb.EndTransaction)

addedBeginTxn := false
needBeginTxn := haveTxnWrite && h.mu.needBeginTxn
if needBeginTxn {
h.mu.needBeginTxn = false

if haveTxnWrite {
// Set txn key based on the key of the first transactional write if not
// already set. If we're in a restart, make sure we keep the anchor key the
// same.
// already set. If it is already set, make sure we keep the anchor key
// the same.
if len(h.mu.txn.Key) == 0 {
anchor := ba.Requests[firstWriteIdx].GetInner().Header().Key
h.mu.txn.Key = anchor
// Put the anchor also in the ba's copy of the txn, since this batch was
// prepared before we had an anchor.
// Put the anchor also in the ba's copy of the txn, since this batch
// was prepared before we had an anchor.
ba.Txn.Key = anchor
}

if !h.st.Version.IsActive(cluster.VersionLazyTxnRecord) {
addedBeginTxn = true

// Set the key in the begin transaction request to the txn's anchor key.
bt := &roachpb.BeginTransactionRequest{
RequestHeader: roachpb.RequestHeader{
Key: h.mu.txn.Key,
},
}

// Inject the new request before the first write position, taking care to
// avoid unnecessary allocations.
oldRequests := ba.Requests
ba.Requests = make([]roachpb.RequestUnion, len(ba.Requests)+1)
copy(ba.Requests, oldRequests[:firstWriteIdx])
ba.Requests[firstWriteIdx].MustSetInner(bt)
copy(ba.Requests[firstWriteIdx+1:], oldRequests[firstWriteIdx:])
}

// Start the heartbeat loop.
// Start the heartbeat loop if it has not already started.
//
// Note that we don't do it for 1PC txns: they only leave intents around on
// retriable errors if the batch has been split between ranges. We consider
// that unlikely enough so we prefer to not pay for a goroutine.
//
// Note that we don't start the heartbeat loop if the loop is already
// running. That can happen because we send BeginTransaction again after
// retriable errors.
if h.mu.txnEnd == nil && !haveEndTxn {
if err := h.startHeartbeatLoopLocked(ctx); err != nil {
h.mu.finalErr = roachpb.NewError(err)
return nil, h.mu.finalErr
if !h.mu.loopStarted {
if _, haveEndTxn := ba.GetArg(roachpb.EndTransaction); !haveEndTxn {
if err := h.startHeartbeatLoopLocked(ctx); err != nil {
h.mu.finalErr = roachpb.NewError(err)
return nil, h.mu.finalErr
}
}
}
}

// Forward the batch through the wrapped lockedSender.
br, pErr := h.wrapped.SendLocked(ctx, ba)

// If we inserted a begin transaction request, remove it here.
if addedBeginTxn {
if br != nil && br.Responses != nil {
br.Responses = append(br.Responses[:firstWriteIdx], br.Responses[firstWriteIdx+1:]...)
}
// Handle case where inserted begin txn confused an indexed error.
if pErr != nil && pErr.Index != nil {
idx := pErr.Index.Index
if idx == int32(firstWriteIdx) {
// An error was encountered on begin txn; disallow the indexing.
pErr.Index = nil
} else if idx > int32(firstWriteIdx) {
// An error was encountered after begin txn; decrement index.
pErr.SetErrorIndex(idx - 1)
}
}
}

return br, pErr
return h.wrapped.SendLocked(ctx, ba)
}

// setWrapped is part of the txnInteceptor interface.
Expand All @@ -247,9 +187,7 @@ func (h *txnHeartbeater) populateMetaLocked(*roachpb.TxnCoordMeta) {}
func (h *txnHeartbeater) augmentMetaLocked(roachpb.TxnCoordMeta) {}

// epochBumpedLocked is part of the txnInteceptor interface.
func (h *txnHeartbeater) epochBumpedLocked() {
h.mu.needBeginTxn = true
}
func (h *txnHeartbeater) epochBumpedLocked() {}

// closeLocked is part of the txnInteceptor interface.
func (h *txnHeartbeater) closeLocked() {
Expand All @@ -263,11 +201,12 @@ func (h *txnHeartbeater) closeLocked() {

// startHeartbeatLoopLocked starts a heartbeat loop in a different goroutine.
func (h *txnHeartbeater) startHeartbeatLoopLocked(ctx context.Context) error {
if h.mu.txnEnd != nil {
if h.mu.loopStarted || h.mu.txnEnd != nil {
log.Fatal(ctx, "attempting to start a second heartbeat loop ")
}

log.VEventf(ctx, 2, "coordinator spawns heartbeat loop")
h.mu.loopStarted = true
h.mu.txnEnd = make(chan struct{})

// Create a new context so that the heartbeat loop doesn't inherit the
Expand Down
Loading

0 comments on commit d668aa9

Please sign in to comment.