Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Schema manager supports TTL #333

Merged
merged 5 commits into from
May 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 44 additions & 2 deletions schema_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,28 @@ func (mgr *SchemaManager) ApplyTag(tag LabelSchema) (*ResultSet, error) {
}

// 4. Check if the TTL is set as expected.
// @TODO
ttlCol, ttlDuration, err := mgr.pool.GetTagTTL(tag.Name)
if err != nil {
return nil, err
}

if ttlCol != tag.TTLCol || ttlDuration != tag.TTLDuration {
if mgr.verbose {
log.Printf(
"ApplyTag: Alter the tag TTL. name=%s, col from %s to %s, duration from %d to %d\n",
tag.Name,
ttlCol,
tag.TTLCol,
ttlDuration,
tag.TTLDuration,
)
}

_, err = mgr.pool.AddTagTTL(tag.Name, tag.TTLCol, tag.TTLDuration)
if err != nil {
return nil, err
}
}

return nil, nil
}
Expand Down Expand Up @@ -215,7 +236,28 @@ func (mgr *SchemaManager) ApplyEdge(edge LabelSchema) (*ResultSet, error) {
}

// 4. Check if the TTL is set as expected.
// @TODO
ttlCol, ttlDuration, err := mgr.pool.GetEdgeTTL(edge.Name)
if err != nil {
return nil, err
}

if ttlCol != edge.TTLCol || ttlDuration != edge.TTLDuration {
if mgr.verbose {
log.Printf(
"ApplyEdge: Alter the edge TTL. name=%s, col from %s to %s, duration from %d to %d\n",
edge.Name,
ttlCol,
edge.TTLCol,
ttlDuration,
edge.TTLDuration,
)
}

_, err = mgr.pool.AddEdgeTTL(edge.Name, edge.TTLCol, edge.TTLDuration)
if err != nil {
return nil, err
}
}

return nil, nil
}
236 changes: 236 additions & 0 deletions schema_manager_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
//go:build integration
// +build integration

/*
*
* Copyright (c) 2024 vesoft inc. All rights reserved.
Expand Down Expand Up @@ -292,3 +295,236 @@ func TestSchemaManagerApplyEdge(t *testing.T) {
assert.Equal(t, "email", labels[0].Field)
assert.Equal(t, "string", labels[0].Type)
}

func TestSchemaManagerApplyTagWithTTL(t *testing.T) {
spaceName := "test_space_apply_tag_with_ttl"
err := prepareSpace(spaceName)
if err != nil {
t.Fatal(err)
}
defer dropSpace(spaceName)

hostAddress := HostAddress{Host: address, Port: port}
config, err := NewSessionPoolConf(
"root",
"nebula",
[]HostAddress{hostAddress},
spaceName)
if err != nil {
t.Errorf("failed to create session pool config, %s", err.Error())
}

// allow only one session in the pool so it is easier to test
config.maxSize = 1

// create session pool
sessionPool, err := NewSessionPool(*config, DefaultLogger{})
if err != nil {
t.Fatal(err)
}
defer sessionPool.Close()

schemaManager := NewSchemaManager(sessionPool).WithVerbose(true)

spaces, err := sessionPool.ShowSpaces()
if err != nil {
t.Fatal(err)
}
assert.LessOrEqual(t, 1, len(spaces))
var spaceNames []string
for _, space := range spaces {
spaceNames = append(spaceNames, space.Name)
}
assert.Contains(t, spaceNames, spaceName)

tagSchema := LabelSchema{
Name: "user",
Fields: []LabelFieldSchema{
{
Field: "name",
Type: "string",
Nullable: false,
},
{
Field: "created_at",
Type: "int64",
Nullable: false,
},
},
TTLDuration: 100,
TTLCol: "created_at",
}
_, err = schemaManager.ApplyTag(tagSchema)
if err != nil {
t.Fatal(err)
}
tags, err := sessionPool.ShowTags()
if err != nil {
t.Fatal(err)
}
assert.Equal(t, 1, len(tags))
labels, err := sessionPool.DescTag("user")
if err != nil {
t.Fatal(err)
}
assert.Equal(t, 2, len(labels))
ttlCol, ttlDuration, err := sessionPool.GetTagTTL("user")
if err != nil {
t.Fatal(err)
}
assert.Equal(t, "created_at", ttlCol)
assert.Equal(t, uint(100), ttlDuration)

// update ttl
tagSchema.TTLDuration = 300

_, err = schemaManager.ApplyTag(tagSchema)
if err != nil {
t.Fatal(err)
}
labels, err = sessionPool.DescTag("user")
if err != nil {
t.Fatal(err)
}
assert.Equal(t, 2, len(labels))
ttlCol, ttlDuration, err = sessionPool.GetTagTTL("user")
if err != nil {
t.Fatal(err)
}
assert.Equal(t, "created_at", ttlCol)
assert.Equal(t, uint(300), ttlDuration)

// remove ttl
tagSchema.TTLDuration = 0
tagSchema.TTLCol = ""

_, err = schemaManager.ApplyTag(tagSchema)
if err != nil {
t.Fatal(err)
}
labels, err = sessionPool.DescTag("user")
if err != nil {
t.Fatal(err)
}
assert.Equal(t, 2, len(labels))
ttlCol, ttlDuration, err = sessionPool.GetTagTTL("user")
if err != nil {
t.Fatal(err)
}
assert.Equal(t, "", ttlCol)
assert.Equal(t, uint(0), ttlDuration)
}

func TestSchemaManagerApplyEdgeWithTTL(t *testing.T) {
spaceName := "test_space_apply_edge_with_ttl"
err := prepareSpace(spaceName)
if err != nil {
t.Fatal(err)
}
defer dropSpace(spaceName)

hostAddress := HostAddress{Host: address, Port: port}
config, err := NewSessionPoolConf(
"root",
"nebula",
[]HostAddress{hostAddress},
spaceName)
if err != nil {
t.Errorf("failed to create session pool config, %s", err.Error())
}

// allow only one session in the pool so it is easier to test
config.maxSize = 1

// create session pool
sessionPool, err := NewSessionPool(*config, DefaultLogger{})
if err != nil {
t.Fatal(err)
}
defer sessionPool.Close()

schemaManager := NewSchemaManager(sessionPool).WithVerbose(true)

spaces, err := sessionPool.ShowSpaces()
if err != nil {
t.Fatal(err)
}
assert.LessOrEqual(t, 1, len(spaces))
var spaceNames []string
for _, space := range spaces {
spaceNames = append(spaceNames, space.Name)
}
assert.Contains(t, spaceNames, spaceName)

edgeSchema := LabelSchema{
Name: "friend",
Fields: []LabelFieldSchema{
{
Field: "created_at",
Type: "int64",
Nullable: false,
},
},
TTLDuration: 100,
TTLCol: "created_at",
}
_, err = schemaManager.ApplyEdge(edgeSchema)
if err != nil {
t.Fatal(err)
}
edges, err := sessionPool.ShowEdges()
if err != nil {
t.Fatal(err)
}
assert.Equal(t, 1, len(edges))
labels, err := sessionPool.DescEdge("friend")
if err != nil {
t.Fatal(err)
}
assert.Equal(t, 1, len(labels))
ttlCol, ttlDuration, err := sessionPool.GetEdgeTTL("friend")
if err != nil {
t.Fatal(err)
}
assert.Equal(t, "created_at", ttlCol)
assert.Equal(t, uint(100), ttlDuration)

// update ttl
edgeSchema.TTLDuration = 300

_, err = schemaManager.ApplyEdge(edgeSchema)
if err != nil {
t.Fatal(err)
}
labels, err = sessionPool.DescEdge("friend")
if err != nil {
t.Fatal(err)
}
assert.Equal(t, 1, len(labels))
ttlCol, ttlDuration, err = sessionPool.GetEdgeTTL("friend")
if err != nil {
t.Fatal(err)
}
assert.Equal(t, "created_at", ttlCol)
assert.Equal(t, uint(300), ttlDuration)

// remove ttl
edgeSchema.TTLDuration = 0
edgeSchema.TTLCol = ""

_, err = schemaManager.ApplyEdge(edgeSchema)
if err != nil {
t.Fatal(err)
}
labels, err = sessionPool.DescEdge("friend")
if err != nil {
t.Fatal(err)
}
assert.Equal(t, 1, len(labels))
ttlCol, ttlDuration, err = sessionPool.GetEdgeTTL("friend")
if err != nil {
t.Fatal(err)
}
assert.Equal(t, "", ttlCol)
assert.Equal(t, uint(0), ttlDuration)
}
Loading