Skip to content

Commit

Permalink
feat: support acl dump for sql
Browse files Browse the repository at this point in the history
Signed-off-by: jiefeng <jiefeng@juicedata.io>
  • Loading branch information
jiefenghuang committed Mar 12, 2024
1 parent 73d1366 commit 5626f32
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 5 deletions.
38 changes: 38 additions & 0 deletions pkg/acl/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package acl
import (
"sort"
"sync"
"sync/atomic"
)

const None = 0
Expand All @@ -31,10 +32,12 @@ const None = 0
type Cache interface {
Put(id uint32, r *Rule)
Get(id uint32) *Rule
GetAll() map[uint32]*Rule
GetId(r *Rule) uint32
Size() int
GetMissIds() []uint32
Clear()
GetOrPut(r *Rule, currId *uint32) (id uint32, got bool)
}

func NewCache() Cache {
Expand All @@ -53,6 +56,33 @@ type cache struct {
cksum2Id map[uint32][]uint32
}

func (c *cache) GetAll() map[uint32]*Rule {
c.lock.RLock()
defer c.lock.RUnlock()

cpy := make(map[uint32]*Rule, len(c.id2Rule))
for id, r := range c.id2Rule {
cpy[id] = r
}
return cpy
}

// GetOrPut returns id for the Rule r if exists.
// Otherwise, it stores r with a new id (atomically increment curId) and returns the new id.
// The got result is true if the Rule was already exists, false if stored.
func (c *cache) GetOrPut(r *Rule, curId *uint32) (id uint32, got bool) {
c.lock.Lock()
defer c.lock.Unlock()

if id = c.getId(r); id != None {
return id, true
}

id = atomic.AddUint32(curId, 1)
c.put(id, r)
return id, false
}

func (c *cache) Clear() {
c.lock.Lock()
defer c.lock.Unlock()
Expand Down Expand Up @@ -99,6 +129,10 @@ func (c *cache) Put(id uint32, r *Rule) {
c.lock.Lock()
defer c.lock.Unlock()

c.put(id, r)
}

func (c *cache) put(id uint32, r *Rule) {
if _, ok := c.id2Rule[id]; ok {
return
}
Expand Down Expand Up @@ -129,6 +163,10 @@ func (c *cache) GetId(r *Rule) uint32 {
c.lock.RLock()
defer c.lock.RUnlock()

return c.getId(r)
}

func (c *cache) getId(r *Rule) uint32 {
if r == nil {
return None
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/fuse/fuse.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ func Serve(v *vfs.VFS, options string, xattrs, ioctl bool) error {
}

if opt.EnableAcl && opt.DisableXAttrs {
logger.Infof("The format \"enable-acl\" flag wiil enable the xattrs feature.")
logger.Infof("The format \"enable-acl\" flag will enable the xattrs feature.")
opt.DisableXAttrs = false
}
opt.IgnoreSecurityLabels = !opt.EnableAcl
Expand Down
90 changes: 86 additions & 4 deletions pkg/meta/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -3518,6 +3518,21 @@ func (m *dbMeta) dumpEntry(s *xorm.Session, inode Ino, typ uint8) (*DumpedEntry,
e.Xattrs = xattrs
}

if attr.AccessACL != aclAPI.None {
accessACl, err := m.getACL(s, attr.AccessACL)
if err != nil {
return nil, err
}
e.AccessACL = dumpACL(accessACl)
}
if attr.DefaultACL != aclAPI.None {
defaultACL, err := m.getACL(s, attr.DefaultACL)
if err != nil {
return nil, err
}
e.DefaultACL = dumpACL(defaultACL)
}

if attr.Typ == TypeFile {
for indx := uint32(0); uint64(indx)*ChunkSize < attr.Length; indx++ {
c := &chunk{Inode: inode, Indx: indx}
Expand Down Expand Up @@ -3574,6 +3589,23 @@ func (m *dbMeta) dumpEntryFast(s *xorm.Session, inode Ino, typ uint8) *DumpedEnt
e.Xattrs = xattrs
}

if attr.AccessACL != aclAPI.None {
accessACl, err := m.getACL(s, attr.AccessACL)
if err != nil {
logger.Errorf("get access acl error: %s, attr %v", err, attr)
} else {
e.AccessACL = dumpACL(accessACl)
}
}
if attr.DefaultACL != aclAPI.None {
defaultACL, err := m.getACL(s, attr.DefaultACL)
if err != nil {
logger.Errorf("get default acl error: %s, attr %v", err, attr)
} else {
e.DefaultACL = dumpACL(defaultACL)
}
}

if attr.Typ == TypeFile {
for indx := uint32(0); uint64(indx)*ChunkSize < attr.Length; indx++ {
c, ok := m.snap.chunk[fmt.Sprintf("%d-%d", inode, indx)]
Expand Down Expand Up @@ -3672,14 +3704,13 @@ func (m *dbMeta) makeSnap(ses *xorm.Session, bar *utils.Bar) error {
chunk: make(map[string]*chunk),
}

for _, s := range []interface{}{new(node), new(symlink), new(edge), new(xattr), new(chunk)} {
for _, s := range []interface{}{new(node), new(symlink), new(edge), new(xattr), new(chunk), new(acl)} {
if count, err := ses.Count(s); err == nil {
bar.IncrTotal(count)
} else {
return err
}
}

if err := ses.Table(&node{}).Iterate(new(node), func(idx int, bean interface{}) error {
n := bean.(*node)
snap.node[n.Inode] = n
Expand Down Expand Up @@ -3723,6 +3754,16 @@ func (m *dbMeta) makeSnap(ses *xorm.Session, bar *utils.Bar) error {
}); err != nil {
return err
}

if err := ses.Table(&acl{}).Iterate(new(acl), func(idx int, bean interface{}) error {
a := bean.(*acl)
m.aclCache.Put(a.Id, a.toRule())
bar.Increment()
return nil
}); err != nil {
return err
}

m.snap = snap
return nil
}
Expand Down Expand Up @@ -3871,7 +3912,7 @@ func (m *dbMeta) DumpMeta(w io.Writer, root Ino, keepSecret, fast, skipTrash boo
})
}

func (m *dbMeta) loadEntry(e *DumpedEntry, chs []chan interface{}) {
func (m *dbMeta) loadEntry(e *DumpedEntry, chs []chan interface{}, aclMaxId *uint32) {
inode := e.Attr.Inode
attr := e.Attr
n := &node{
Expand Down Expand Up @@ -3933,6 +3974,17 @@ func (m *dbMeta) loadEntry(e *DumpedEntry, chs []chan interface{}) {
for _, x := range e.Xattrs {
chs[4] <- &xattr{Inode: inode, Name: x.Name, Value: unescape(x.Value)}
}

// put dumped acls into cache, then store back to sql
if e.AccessACL != nil {
r := loadACL(e.AccessACL)
n.AccessACLId, _ = m.aclCache.GetOrPut(r, aclMaxId)
}

if e.DefaultACL != nil {
r := loadACL(e.DefaultACL)
n.DefaultACLId, _ = m.aclCache.GetOrPut(r, aclMaxId)
}
chs[0] <- n
}

Expand Down Expand Up @@ -4021,13 +4073,18 @@ func (m *dbMeta) LoadMeta(r io.Reader) error {
}(i)
}

var aclMaxId uint32 = 0
dm, counters, parents, refs, err := loadEntries(r,
func(e *DumpedEntry) { m.loadEntry(e, chs) },
func(e *DumpedEntry) { m.loadEntry(e, chs, &aclMaxId) },
func(ck *chunkKey) { chs[3] <- &sliceRef{ck.id, ck.size, 1} })
if err != nil {
return err
}
m.loadDumpedQuotas(Background, dm.Quotas)
if err = m.loadDumpedACLs(); err != nil {
return err
}

format, _ := json.MarshalIndent(dm.Setting, "", "")
chs[5] <- &setting{"format", string(format)}
chs[5] <- &counter{usedSpace, counters.UsedSpace}
Expand Down Expand Up @@ -4475,3 +4532,28 @@ func (m *dbMeta) doGetFacl(ctx Context, ino Ino, aclType uint8, rule *aclAPI.Rul
return nil
}))
}

func (m *dbMeta) loadDumpedACLs() error {
id2Rule := m.aclCache.GetAll()
if len(id2Rule) == 0 {
return nil
}

acls := make([]*acl, 0, len(id2Rule))
for id, rule := range id2Rule {
aclV := newSQLAcl(rule)
aclV.Id = id
acls = append(acls, aclV)
}

return m.txn(func(s *xorm.Session) error {
n, err := s.Insert(acls)
if err != nil {
return err
}
if int(n) != len(acls) {
return fmt.Errorf("only %d acls inserted, expected %d", n, len(acls))
}
return nil
})
}

0 comments on commit 5626f32

Please sign in to comment.