Skip to content

Commit

Permalink
replicaset: vshard bootstrap for cluster config
Browse files Browse the repository at this point in the history
@TarantoolBot document
Title: `tt replicaset vshard bootstrap` - bootstraps vshard in the cluster

This patch introduces new subcommand for the replicaset module.
`tt replicaset vshard` - manages vshard in the cluster.

`tt replicaset vshard bootstrap [--cartridge|--config] [flags] (<APP_NAME> | <APP_NAME:INSTANCE_NAME> | <URI>)`

* Bootstraps vshard in the replicaset.

* Cartridge: by fact, it is the clone of `cartridge-cli replicasets bootstrap-vshard`.

* Centralized config (3.0): lookups for the router in the cluster and calls `vshard.router.bootstrap()` on it.

Part of #316
  • Loading branch information
askalt committed Apr 12, 2024
1 parent 1d85352 commit 4efc828
Show file tree
Hide file tree
Showing 10 changed files with 371 additions and 35 deletions.
13 changes: 10 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,16 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).

## [Unreleased]

- `tt replicaset expel`: command to expel an instance from the tarantool replicaset with
cluster config (3.0) or cartridge orchestrator.
- `tt replicaset vshard`: module to manage vshard in the tarantool replicaset.
* `tt replicaset vshard bootstrap`: command to bootstrap vshard.
- `tt cluster replicaset expel`: command to expel an instance from the replicaset via cluster
config storage.


## [2.2.1] - 2024-04-03

### Added
Expand All @@ -13,12 +23,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
cluster config (3.0) or cartridge orchestrator.
- `tt replicaset demote`: command to demote an instance in the tarantool replicaset with
cluster config (3.0) orchestrator.
- `tt replicaset expel`: command to expel an instance from the tarantool replicaset with
cluster config (3.0) or cartridge orchestrator.
- `tt cluster replicaset`: module to manage replicaset via 3.0 cluster config storage.
* `tt cluster replicaset promote`: command to promote an instance in the replicaset.
* `tt cluster replicaset demote`: command to demote an instance in the replicaset.
* `tt cluster replicaset expel`: command to expel an instance from the replicaset.
- `tt connect --binary`: connect to instance using binary port.
- `tt kill`: command to stop instance(s) with SIGQUIT and SIGKILL signals.

Expand Down
91 changes: 86 additions & 5 deletions cli/replicaset/cconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ var (

//go:embed lua/cconfig/promote_election.lua
cconfigPromoteElectionBody string

//go:embed lua/cconfig/bootstrap_vshard_body.lua
cconfigBootstrapVShardBody string

cconfigGetShardingRolesBody = "return require('config'):get().sharding.roles"
)

// cconfigTopology used to export topology information from a Tarantool
Expand Down Expand Up @@ -106,10 +111,15 @@ func (c *CConfigInstance) Expel(ctx ExpelCtx) error {
return newErrExpelByInstanceNotSupported(OrchestratorCentralizedConfig)
}

// BootstrapVShard is not supported for a single instance by the
// centralized config orchestrator.
// BootstrapVShard bootstraps vshard for a single instance by the centralized config
// orchestrator.
func (c *CConfigInstance) BootstrapVShard() error {
return newErrBootstrapVShardByInstanceNotSupported(OrchestratorCentralizedConfig)
err := cconfigBootstrapVShard(c.evaler)
if err != nil {
return err
}
log.Info("Done.")
return nil
}

// CConfigApplication is an application with the centralized config
Expand Down Expand Up @@ -396,9 +406,45 @@ func (c *CConfigApplication) Demote(ctx DemoteCtx) error {
return err
}

// BootstrapVShard is not supported for an application by the centralized config orchestrator.
// BootstrapVShard bootstraps vshard for an application by the centralized config orchestrator.
func (c *CConfigApplication) BootstrapVShard() error {
return newErrBootstrapVShardByAppNotSupported(OrchestratorCentralizedConfig)
var (
lastErr error
found bool
)
eval := func(instance running.InstanceCtx, evaler connector.Evaler) (bool, error) {
roles, err := cconfigGetShardingRoles(evaler)
if err != nil {
lastErr = fmt.Errorf("failed to get sharding roles: %w", err)
// Try again with another instance.
return false, nil
}
isRouter := false
for _, role := range roles {
if role == "router" {
isRouter = true
break
}
}
if !isRouter {
// Try again with another instance.
return false, nil
}
found = true
lastErr = cconfigBootstrapVShard(evaler)
return lastErr == nil, nil
}
err := EvalForeach(c.runningCtx.Instances, InstanceEvalFunc(eval))
for _, e := range []error{err, lastErr} {
if e != nil {
return e
}
}
if !found {
return fmt.Errorf("not found any vshard router in replicaset")
}
log.Info("Done.")
return nil
}

// cconfigPromoteElection tries to promote an instance via `box.ctl.promote()`.
Expand All @@ -412,6 +458,41 @@ func cconfigPromoteElection(evaler connector.Evaler, timeout int) error {
return waitRW(evaler, timeout)
}

// cconfigBootstrapVShard bootstraps vshard on the passed instance.
func cconfigBootstrapVShard(evaler connector.Evaler) error {
var opts connector.RequestOpts
_, err := evaler.Eval(cconfigBootstrapVShardBody, []any{}, opts)
if err != nil {
return fmt.Errorf("failed to bootstrap vshard: %w", err)
}
return nil
}

// cconfigGetShardingRoles returns sharding roles of the passed instance.
func cconfigGetShardingRoles(evaler connector.Evaler) ([]string, error) {
var opts connector.RequestOpts
resp, err := evaler.Eval(cconfigGetShardingRolesBody, []any{}, opts)
if err != nil {
return nil, err
}
if len(resp) != 1 {
return nil, fmt.Errorf("unexpected response length: %d", len(resp))
}
rolesAnyArray, ok := resp[0].([]any)
if !ok {
return nil, fmt.Errorf("unexpected response type: %T", resp[0])
}
var ret []string
for _, role := range rolesAnyArray {
if roleStr, ok := role.(string); ok {
ret = append(ret, roleStr)
} else {
return nil, fmt.Errorf("unexpected role type: %T", role)
}
}
return ret, nil
}

// reloadCConfig reloads a cluster config on the several instances.
func reloadCConfig(instances []running.InstanceCtx) error {
errored := []string{}
Expand Down
15 changes: 0 additions & 15 deletions cli/replicaset/cconfig_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/stretchr/testify/require"

"github.com/tarantool/tt/cli/replicaset"
"github.com/tarantool/tt/cli/running"
)

var _ replicaset.Discoverer = &replicaset.CConfigInstance{}
Expand All @@ -24,13 +23,6 @@ var _ replicaset.Demoter = &replicaset.CConfigApplication{}
var _ replicaset.Expeller = &replicaset.CConfigApplication{}
var _ replicaset.VShardBootstrapper = &replicaset.CConfigApplication{}

func TestCConfigApplication_BootstrapVShard(t *testing.T) {
app := replicaset.NewCConfigApplication(running.RunningCtx{}, nil, nil)
err := app.BootstrapVShard()
assert.EqualError(t, err,
`bootstrap vshard is not supported for an application by "centralized config" orchestrator`)
}

func TestCConfigInstance_Discovery(t *testing.T) {
cases := []struct {
Name string
Expand Down Expand Up @@ -457,10 +449,3 @@ func TestCConfigInstance_Expel(t *testing.T) {
assert.EqualError(t, err,
`expel is not supported for a single instance by "centralized config" orchestrator`)
}

func TestCConfigInstance_BootstrapVShard(t *testing.T) {
instance := replicaset.NewCConfigInstance(nil)
err := instance.BootstrapVShard()
assert.EqualError(t, err, `bootstrap vshard is not supported for a single instance by `+
`"centralized config" orchestrator`)
}
20 changes: 20 additions & 0 deletions cli/replicaset/lua/cconfig/bootstrap_vshard_body.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
local ok, vshard = pcall(require, 'vshard')
if not ok then
error("failed to require vshard module")
end

local config = require('config')
local is_router = false
for _, role in ipairs(config:get().sharding.roles) do
if role == "router" then
is_router = true
break
end
end

if not is_router then
error("instance must be a router to bootstrap vshard")
end

local ok, err = vshard.router.bootstrap()
assert(ok, tostring(err))
10 changes: 1 addition & 9 deletions test/conftest.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,18 @@
import os
import platform
import subprocess
import tempfile

import psutil
import py
import pytest
from cartridge_helper import CartridgeApp
from etcd_helper import EtcdInstance

from utils import create_tt_config, kill_procs
from utils import create_tt_config, get_tmpdir, kill_procs


# ######## #
# Fixtures #
# ######## #
def get_tmpdir(request):
tmpdir = py.path.local(tempfile.mkdtemp())
request.addfinalizer(lambda: tmpdir.remove(rec=1))
return str(tmpdir)


@pytest.fixture(scope="session")
def cli_config_dir():
if platform.system() == "Darwin":
Expand Down
122 changes: 119 additions & 3 deletions test/integration/replicaset/test_replicaset_vshard.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
import pytest
from cartridge_helper import (cartridge_name, cartridge_password,
cartridge_username)
from replicaset_helpers import eval_on_instance, parse_status
from replicaset_helpers import eval_on_instance, parse_status, stop_application

from utils import (get_tarantool_version, run_command_and_get_output,
wait_event, wait_file)
from utils import (create_tt_config, get_tarantool_version, get_tmpdir,
run_command_and_get_output, wait_event, wait_file)

tarantool_major_version, tarantool_minor_version = get_tarantool_version()

Expand Down Expand Up @@ -125,3 +125,119 @@ def have_buckets_created():
return out.find("false") != -1

assert wait_event(10, have_buckets_created)


@pytest.mark.skipif(tarantool_major_version < 3,
reason="skip centralized config test for Tarantool < 3")
def test_vshard_bootstrap_cconfig_vshard_not_installed(tt_cmd, tmpdir_with_cfg):
tmpdir = tmpdir_with_cfg
app_name = "test_ccluster_app"
app_path = os.path.join(tmpdir, app_name)
shutil.copytree(os.path.join(os.path.dirname(__file__), app_name), app_path)
try:
# Start a cluster.
start_cmd = [tt_cmd, "start", app_name]
rc, out = run_command_and_get_output(start_cmd, cwd=tmpdir)
assert rc == 0

for i in range(1, 6):
file = wait_file(os.path.join(tmpdir, app_name), f'ready-instance-00{i}', [])
assert file != ""

cmd = [tt_cmd, "rs", "vs", "bootstrap", app_name]

rc, out = run_command_and_get_output(cmd, cwd=tmpdir)

assert rc != 0
buf = io.StringIO(out)
assert "• Discovery application..." in buf.readline()
buf.readline()
# Skip init status in the output.
parse_status(buf)
assert "Bootstrapping vshard" in buf.readline()
assert "failed to get sharding roles" in buf.readline()
finally:
stop_application(tt_cmd, app_name, tmpdir, [])


@pytest.fixture(scope="session")
def vshard_tt_env_session(request, tt_cmd):
tmpdir = get_tmpdir(request)
create_tt_config(tmpdir, "")

# Install vshard.
cmd = [tt_cmd, "rocks", "install", "vshard"]
rc, out = run_command_and_get_output(cmd, cwd=tmpdir)
assert rc == 0
assert re.search(r"vshard .* is now installed", out)
return tmpdir


vshard_cconfig_app_name = "test_vshard_app"


@pytest.fixture
def vshard_cconfig_app_tt_env(request, tt_cmd, vshard_tt_env_session):
tmpdir = vshard_tt_env_session
app_path = os.path.join(tmpdir, vshard_cconfig_app_name)

# Copy application.
shutil.copytree(os.path.join(os.path.dirname(__file__), vshard_cconfig_app_name), app_path)

# Start a cluster.
start_cmd = [tt_cmd, "start", vshard_cconfig_app_name]
rc, _ = run_command_and_get_output(start_cmd, cwd=tmpdir)
assert rc == 0

instances = ["storage-001-a", "storage-001-b", "storage-002-a", "storage-002-b", "router-001-a"]

def stop_and_clean():
stop_application(tt_cmd, app_name=vshard_cconfig_app_name,
workdir=tmpdir, instances=instances)
shutil.rmtree(app_path)
request.addfinalizer(stop_and_clean)

for inst in instances:
file = wait_file(app_path, f'ready-{inst}', [])
assert file != ""
return tmpdir


@pytest.mark.skipif(tarantool_major_version < 3,
reason="skip centralized config test for Tarantool < 3")
def test_vshard_bootstrap_cconfig_via_uri_no_router(tt_cmd, vshard_cconfig_app_tt_env):
tmpdir = vshard_cconfig_app_tt_env
cmd = [tt_cmd, "rs", "vs", "bootstrap",
"--username", "client", "--password", "secret",
os.path.join(tmpdir, vshard_cconfig_app_name, "storage-001-a.iproto")]
rc, out = run_command_and_get_output(cmd, cwd=tmpdir)
assert rc != 0
assert "instance must be a router to bootstrap vshard" in out


@pytest.mark.skipif(tarantool_major_version < 3,
reason="skip centralized config test for Tarantool < 3")
@pytest.mark.parametrize("flag", [None, "--config"])
def test_vshard_bootstrap_cconfig(tt_cmd, vshard_cconfig_app_tt_env, flag):
tmpdir = vshard_cconfig_app_tt_env
cmd = [tt_cmd, "rs", "vs", "bootstrap"]
if flag:
cmd.append(flag)
cmd.append(vshard_cconfig_app_name)
rc, out = run_command_and_get_output(cmd, cwd=tmpdir)

assert rc == 0
buf = io.StringIO(out)
assert "Discovery application..." in buf.readline()
buf.readline()
# Skip init status in the output.
parse_status(buf)
assert "Bootstrapping vshard" in buf.readline()
assert "Done." in buf.readline()

def have_buckets_created():
expr = "require('vshard').storage.buckets_count() == 0"
out = eval_on_instance(tt_cmd, vshard_cconfig_app_name, "storage-001-a", tmpdir, expr)
return out.find("false") != -1

assert wait_event(10, have_buckets_created)
Loading

0 comments on commit 4efc828

Please sign in to comment.