From 7bb878ff695b89b62483dd6155f5815ad0d06258 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Tue, 11 Feb 2014 12:16:12 -0700 Subject: [PATCH] Mmap remap. --- db.go | 90 ++++++++++++++++++++++++++++++++++++------ db_test.go | 12 ++++++ node.go | 24 +++++++++-- rwtransaction.go | 52 ++++++++++++++++++------ syscall_darwin.go | 6 ++- syscall_darwin_test.go | 5 +++ syscall_linux.go | 5 +++ syscall_linux_test.go | 5 +++ 8 files changed, 171 insertions(+), 28 deletions(-) diff --git a/db.go b/db.go index 65879ec3..8592d836 100644 --- a/db.go +++ b/db.go @@ -15,6 +15,9 @@ const ( const minPageSize = 0x1000 +const minMmapSize = 1 << 22 // 4MB +const maxMmapStep = 1 << 30 // 1GB + type DB struct { os _os syscall _syscall @@ -98,7 +101,7 @@ func (db *DB) Open(path string, mode os.FileMode) error { } // Memory map the data file. - if err := db.mmap(); err != nil { + if err := db.mmap(0); err != nil { db.close() return err } @@ -113,7 +116,19 @@ func (db *DB) Open(path string, mode os.FileMode) error { } // mmap opens the underlying memory-mapped file and initializes the meta references. -func (db *DB) mmap() error { +// minsz is the minimum size that the new mmap can be. +func (db *DB) mmap(minsz int) error { + db.mmaplock.Lock() + defer db.mmaplock.Unlock() + + // Dereference all mmap references before unmapping. + if db.rwtransaction != nil { + db.rwtransaction.dereference() + } + + // Unmap existing data before continuing. + db.munmap() + info, err := db.file.Stat() if err != nil { return &Error{"mmap stat error", err} @@ -121,8 +136,12 @@ func (db *DB) mmap() error { return &Error{"file size too small", err} } - // TODO(benbjohnson): Determine appropriate mmap size by db size. - size := 2 << 30 + // Ensure the size is at least the minimum size. + var size = int(info.Size()) + if size < minsz { + size = minsz + } + size = db.mmapSize(minsz) // Memory-map the data file as a byte slice. if db.data, err = db.syscall.Mmap(int(db.file.Fd()), 0, size, syscall.PROT_READ, syscall.MAP_SHARED); err != nil { @@ -144,6 +163,35 @@ func (db *DB) mmap() error { return nil } +// munmap unmaps the data file from memory. +func (db *DB) munmap() { + if db.data != nil { + if err := db.syscall.Munmap(db.data); err != nil { + panic("unmap error: " + err.Error()) + } + db.data = nil + } +} + +// mmapSize determines the appropriate size for the mmap given the current size +// of the database. The minimum size is 4MB and doubles until it reaches 1GB. +func (db *DB) mmapSize(size int) int { + if size < minMmapSize { + return minMmapSize + } else if size < maxMmapStep { + size *= 2 + } else { + size += maxMmapStep + } + + // Ensure that the mmap size is a multiple of the page size. + if (size % db.pageSize) != 0 { + size = ((size / db.pageSize) + 1) * db.pageSize + } + + return size +} + // init creates a new database file and initializes its meta pages. func (db *DB) init() error { // Set the page size to the OS page size. @@ -196,8 +244,14 @@ func (db *DB) Close() { } func (db *DB) close() { - // TODO: Undo everything in Open(). + // Wait for pending transactions before closing and unmapping the data. + // db.mmaplock.Lock() + // defer db.mmaplock.Unlock() + + // TODO(benbjohnson): Undo everything in Open(). db.freelist = nil + + db.munmap() } // Transaction creates a read-only transaction. @@ -206,6 +260,11 @@ func (db *DB) Transaction() (*Transaction, error) { db.metalock.Lock() defer db.metalock.Unlock() + // Obtain a read-only lock on the mmap. When the mmap is remapped it will + // obtain a write lock so all transactions must finish before it can be + // remapped. + db.mmaplock.RLock() + // Exit if the database is not open yet. if !db.opened { return nil, DatabaseNotOpenError @@ -260,6 +319,9 @@ func (db *DB) removeTransaction(t *Transaction) { db.metalock.Lock() defer db.metalock.Unlock() + // Release the read lock on the mmap. + db.mmaplock.RUnlock() + // Remove the transaction. for i, txn := range db.transactions { if txn == t { @@ -412,7 +474,7 @@ func (db *DB) meta() *meta { } // allocate returns a contiguous block of memory starting at a given page. -func (db *DB) allocate(count int) *page { +func (db *DB) allocate(count int) (*page, error) { // Allocate a temporary buffer for the page. buf := make([]byte, count*db.pageSize) p := (*page)(unsafe.Pointer(&buf[0])) @@ -420,16 +482,22 @@ func (db *DB) allocate(count int) *page { // Use pages from the freelist if they are available. if p.id = db.freelist.allocate(count); p.id != 0 { - return p + return p, nil } - // TODO(benbjohnson): Resize mmap(). - - // If there are no free pages then allocate from the end of the file. + // Resize mmap() if we're at the end. p.id = db.rwtransaction.meta.pgid + var minsz = int((p.id+pgid(count))+1) * db.pageSize + if minsz >= len(db.data) { + if err := db.mmap(minsz); err != nil { + return nil, &Error{"mmap allocate error", err} + } + } + + // Move the page id high water mark. db.rwtransaction.meta.pgid += pgid(count) - return p + return p, nil } // sync flushes the file descriptor to disk. diff --git a/db_test.go b/db_test.go index 8211eac7..db578256 100644 --- a/db_test.go +++ b/db_test.go @@ -187,6 +187,18 @@ func TestDBWriteFail(t *testing.T) { t.Skip("pending") // TODO(benbjohnson) } +// Ensure that the mmap grows appropriately. +func TestDBMmapSize(t *testing.T) { + db := &DB{pageSize: 4096} + assert.Equal(t, db.mmapSize(0), minMmapSize) + assert.Equal(t, db.mmapSize(16384), minMmapSize) + assert.Equal(t, db.mmapSize(minMmapSize-1), minMmapSize) + assert.Equal(t, db.mmapSize(minMmapSize), minMmapSize*2) + assert.Equal(t, db.mmapSize(10000000), 20000768) + assert.Equal(t, db.mmapSize((1<<30)-1), 2147483648) + assert.Equal(t, db.mmapSize(1<<30), 1<<31) +} + // withDB executes a function with a database reference. func withDB(fn func(*DB, string)) { f, _ := ioutil.TempFile("", "bolt-") diff --git a/node.go b/node.go index 11b5e2f0..1460f529 100644 --- a/node.go +++ b/node.go @@ -161,10 +161,8 @@ func (n *node) write(p *page) { // Initialize page. if n.isLeaf { p.flags |= p_leaf - // warn("∑", p.id, "leaf") } else { p.flags |= p_branch - // warn("∑", p.id, "branch") } p.count = uint16(len(n.inodes)) @@ -177,13 +175,11 @@ func (n *node) write(p *page) { elem.pos = uint32(uintptr(unsafe.Pointer(&b[0])) - uintptr(unsafe.Pointer(elem))) elem.ksize = uint32(len(item.key)) elem.vsize = uint32(len(item.value)) - // warn(" »", string(item.key), "->", string(item.value)) } else { elem := p.branchPageElement(uint16(i)) elem.pos = uint32(uintptr(unsafe.Pointer(&b[0])) - uintptr(unsafe.Pointer(elem))) elem.ksize = uint32(len(item.key)) elem.pgid = item.pgid - // warn(" »", string(item.key)) } // Write data for the element to the end of the page. @@ -341,6 +337,26 @@ func (n *node) rebalance() { n.parent.rebalance() } +// dereference causes the node to copy all its inode key/value references to heap memory. +// This is required when the mmap is reallocated so inodes are not pointing to stale data. +func (n *node) dereference() { + key := make([]byte, len(n.key)) + copy(key, n.key) + n.key = key + + for i, _ := range n.inodes { + inode := &n.inodes[i] + + key := make([]byte, len(inode.key)) + copy(key, inode.key) + inode.key = key + + value := make([]byte, len(inode.value)) + copy(value, inode.value) + inode.value = value + } +} + // nodesByDepth sorts a list of branches by deepest first. type nodesByDepth []*node diff --git a/rwtransaction.go b/rwtransaction.go index 5058dd3b..b96df51c 100644 --- a/rwtransaction.go +++ b/rwtransaction.go @@ -9,7 +9,8 @@ import ( // Only one read/write transaction can be active for a DB at a time. type RWTransaction struct { Transaction - nodes map[pgid]*node + nodes map[pgid]*node + pending []*node } // init initializes the transaction. @@ -35,7 +36,10 @@ func (t *RWTransaction) CreateBucket(name string) error { } // Create a blank root leaf page. - p := t.allocate(1) + p, err := t.allocate(1) + if err != nil { + return err + } p.flags = p_leaf // Add bucket to buckets page. @@ -100,14 +104,15 @@ func (t *RWTransaction) Commit() error { // TODO(benbjohnson): Use vectorized I/O to write out dirty pages. - // TODO(benbjohnson): Move rebalancing to occur immediately after deletion (?). - // Rebalance and spill data onto dirty pages. t.rebalance() t.spill() // Spill buckets page. - p := t.allocate((t.buckets.size() / t.db.pageSize) + 1) + p, err := t.allocate((t.buckets.size() / t.db.pageSize) + 1) + if err != nil { + return err + } t.buckets.write(p) // Write dirty pages to disk. @@ -135,13 +140,16 @@ func (t *RWTransaction) close() { } // allocate returns a contiguous block of memory starting at a given page. -func (t *RWTransaction) allocate(count int) *page { - p := t.db.allocate(count) +func (t *RWTransaction) allocate(count int) (*page, error) { + p, err := t.db.allocate(count) + if err != nil { + return nil, err + } // Save to our page cache. t.pages[p.id] = p - return p + return p, nil } // rebalance attempts to balance all nodes. @@ -152,7 +160,7 @@ func (t *RWTransaction) rebalance() { } // spill writes all the nodes to dirty pages. -func (t *RWTransaction) spill() { +func (t *RWTransaction) spill() error { // Keep track of the current root nodes. // We will update this at the end once all nodes are created. type root struct { @@ -180,6 +188,7 @@ func (t *RWTransaction) spill() { // Split nodes into appropriate sized nodes. // The first node in this list will be a reference to n to preserve ancestry. newNodes := n.split(t.db.pageSize) + t.pending = newNodes // If this is a root node that split then create a parent node. if n.parent == nil && len(newNodes) > 1 { @@ -195,7 +204,10 @@ func (t *RWTransaction) spill() { // Write nodes to dirty pages. for i, newNode := range newNodes { // Allocate contiguous space for the node. - p := t.allocate((newNode.size() / t.db.pageSize) + 1) + p, err := t.allocate((newNode.size() / t.db.pageSize) + 1) + if err != nil { + return err + } // Write the node to the page. newNode.write(p) @@ -215,6 +227,8 @@ func (t *RWTransaction) spill() { newNode.parent.put(oldKey, newNode.inodes[0].key, nil, newNode.pgid) } } + + t.pending = nil } // Update roots with new roots. @@ -224,12 +238,12 @@ func (t *RWTransaction) spill() { // Clear out nodes now that they are all spilled. t.nodes = make(map[pgid]*node) + + return nil } // write writes any dirty pages to disk. func (t *RWTransaction) write() error { - // TODO(benbjohnson): If our last page id is greater than the mmap size then lock the DB and resize. - // Sort pages by id. pages := make(pages, 0, len(t.pages)) for _, p := range t.pages { @@ -247,6 +261,9 @@ func (t *RWTransaction) write() error { } } + // Clear out page cache. + t.pages = make(map[pgid]*page) + return nil } @@ -280,3 +297,14 @@ func (t *RWTransaction) node(pgid pgid, parent *node) *node { return n } + +// dereference removes all references to the old mmap. +func (t *RWTransaction) dereference() { + for _, n := range t.nodes { + n.dereference() + } + + for _, n := range t.pending { + n.dereference() + } +} diff --git a/syscall_darwin.go b/syscall_darwin.go index de341937..cb9a20c9 100644 --- a/syscall_darwin.go +++ b/syscall_darwin.go @@ -6,11 +6,15 @@ import ( type _syscall interface { Mmap(fd int, offset int64, length int, prot int, flags int) (data []byte, err error) + Munmap([]byte) error } type syssyscall struct{} func (o *syssyscall) Mmap(fd int, offset int64, length int, prot int, flags int) (data []byte, err error) { - // err = (EACCES, EBADF, EINVAL, ENODEV, ENOMEM, ENXIO, EOVERFLOW) return syscall.Mmap(fd, offset, length, prot, flags) } + +func (o *syssyscall) Munmap(b []byte) error { + return syscall.Munmap(b) +} diff --git a/syscall_darwin_test.go b/syscall_darwin_test.go index 9e64cf73..6b468f63 100644 --- a/syscall_darwin_test.go +++ b/syscall_darwin_test.go @@ -12,3 +12,8 @@ func (m *mocksyscall) Mmap(fd int, offset int64, length int, prot int, flags int args := m.Called(fd, offset, length, prot, flags) return args.Get(0).([]byte), args.Error(1) } + +func (m *mocksyscall) Munmap(b []byte) error { + args := m.Called(b) + return args.Error(0) +} diff --git a/syscall_linux.go b/syscall_linux.go index de341937..65daad43 100644 --- a/syscall_linux.go +++ b/syscall_linux.go @@ -6,6 +6,7 @@ import ( type _syscall interface { Mmap(fd int, offset int64, length int, prot int, flags int) (data []byte, err error) + Munmap([]byte) error } type syssyscall struct{} @@ -14,3 +15,7 @@ func (o *syssyscall) Mmap(fd int, offset int64, length int, prot int, flags int) // err = (EACCES, EBADF, EINVAL, ENODEV, ENOMEM, ENXIO, EOVERFLOW) return syscall.Mmap(fd, offset, length, prot, flags) } + +func (o *syssyscall) Munmap(b []byte) error { + return syscall.Munmap(b) +} diff --git a/syscall_linux_test.go b/syscall_linux_test.go index 9e64cf73..6b468f63 100644 --- a/syscall_linux_test.go +++ b/syscall_linux_test.go @@ -12,3 +12,8 @@ func (m *mocksyscall) Mmap(fd int, offset int64, length int, prot int, flags int args := m.Called(fd, offset, length, prot, flags) return args.Get(0).([]byte), args.Error(1) } + +func (m *mocksyscall) Munmap(b []byte) error { + args := m.Called(b) + return args.Error(0) +}