Skip to content
This repository has been archived by the owner on Mar 9, 2019. It is now read-only.

Remapping #24

Merged
merged 1 commit into from
Feb 13, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 79 additions & 11 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ const (

const minPageSize = 0x1000

const minMmapSize = 1 << 22 // 4MB
const maxMmapStep = 1 << 30 // 1GB

type DB struct {
os _os
syscall _syscall
Expand Down Expand Up @@ -98,7 +101,7 @@ func (db *DB) Open(path string, mode os.FileMode) error {
}

// Memory map the data file.
if err := db.mmap(); err != nil {
if err := db.mmap(0); err != nil {
db.close()
return err
}
Expand All @@ -113,16 +116,32 @@ func (db *DB) Open(path string, mode os.FileMode) error {
}

// mmap opens the underlying memory-mapped file and initializes the meta references.
func (db *DB) mmap() error {
// minsz is the minimum size that the new mmap can be.
func (db *DB) mmap(minsz int) error {
db.mmaplock.Lock()
defer db.mmaplock.Unlock()

// Dereference all mmap references before unmapping.
if db.rwtransaction != nil {
db.rwtransaction.dereference()
}

// Unmap existing data before continuing.
db.munmap()

info, err := db.file.Stat()
if err != nil {
return &Error{"mmap stat error", err}
} else if int(info.Size()) < db.pageSize*2 {
return &Error{"file size too small", err}
}

// TODO(benbjohnson): Determine appropriate mmap size by db size.
size := 2 << 30
// Ensure the size is at least the minimum size.
var size = int(info.Size())
if size < minsz {
size = minsz
}
size = db.mmapSize(minsz)

// Memory-map the data file as a byte slice.
if db.data, err = db.syscall.Mmap(int(db.file.Fd()), 0, size, syscall.PROT_READ, syscall.MAP_SHARED); err != nil {
Expand All @@ -144,6 +163,35 @@ func (db *DB) mmap() error {
return nil
}

// munmap unmaps the data file from memory.
func (db *DB) munmap() {
if db.data != nil {
if err := db.syscall.Munmap(db.data); err != nil {
panic("unmap error: " + err.Error())
}
db.data = nil
}
}

// mmapSize determines the appropriate size for the mmap given the current size
// of the database. The minimum size is 4MB and doubles until it reaches 1GB.
func (db *DB) mmapSize(size int) int {
if size < minMmapSize {
return minMmapSize
} else if size < maxMmapStep {
size *= 2
} else {
size += maxMmapStep
}

// Ensure that the mmap size is a multiple of the page size.
if (size % db.pageSize) != 0 {
size = ((size / db.pageSize) + 1) * db.pageSize
}

return size
}

// init creates a new database file and initializes its meta pages.
func (db *DB) init() error {
// Set the page size to the OS page size.
Expand Down Expand Up @@ -196,8 +244,14 @@ func (db *DB) Close() {
}

func (db *DB) close() {
// TODO: Undo everything in Open().
// Wait for pending transactions before closing and unmapping the data.
// db.mmaplock.Lock()
// defer db.mmaplock.Unlock()

// TODO(benbjohnson): Undo everything in Open().
db.freelist = nil

db.munmap()
}

// Transaction creates a read-only transaction.
Expand All @@ -206,6 +260,11 @@ func (db *DB) Transaction() (*Transaction, error) {
db.metalock.Lock()
defer db.metalock.Unlock()

// Obtain a read-only lock on the mmap. When the mmap is remapped it will
// obtain a write lock so all transactions must finish before it can be
// remapped.
db.mmaplock.RLock()

// Exit if the database is not open yet.
if !db.opened {
return nil, DatabaseNotOpenError
Expand Down Expand Up @@ -260,6 +319,9 @@ func (db *DB) removeTransaction(t *Transaction) {
db.metalock.Lock()
defer db.metalock.Unlock()

// Release the read lock on the mmap.
db.mmaplock.RUnlock()

// Remove the transaction.
for i, txn := range db.transactions {
if txn == t {
Expand Down Expand Up @@ -412,24 +474,30 @@ func (db *DB) meta() *meta {
}

// allocate returns a contiguous block of memory starting at a given page.
func (db *DB) allocate(count int) *page {
func (db *DB) allocate(count int) (*page, error) {
// Allocate a temporary buffer for the page.
buf := make([]byte, count*db.pageSize)
p := (*page)(unsafe.Pointer(&buf[0]))
p.overflow = uint32(count - 1)

// Use pages from the freelist if they are available.
if p.id = db.freelist.allocate(count); p.id != 0 {
return p
return p, nil
}

// TODO(benbjohnson): Resize mmap().

// If there are no free pages then allocate from the end of the file.
// Resize mmap() if we're at the end.
p.id = db.rwtransaction.meta.pgid
var minsz = int((p.id+pgid(count))+1) * db.pageSize
if minsz >= len(db.data) {
if err := db.mmap(minsz); err != nil {
return nil, &Error{"mmap allocate error", err}
}
}

// Move the page id high water mark.
db.rwtransaction.meta.pgid += pgid(count)

return p
return p, nil
}

// sync flushes the file descriptor to disk.
Expand Down
12 changes: 12 additions & 0 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,18 @@ func TestDBWriteFail(t *testing.T) {
t.Skip("pending") // TODO(benbjohnson)
}

// Ensure that the mmap grows appropriately.
func TestDBMmapSize(t *testing.T) {
db := &DB{pageSize: 4096}
assert.Equal(t, db.mmapSize(0), minMmapSize)
assert.Equal(t, db.mmapSize(16384), minMmapSize)
assert.Equal(t, db.mmapSize(minMmapSize-1), minMmapSize)
assert.Equal(t, db.mmapSize(minMmapSize), minMmapSize*2)
assert.Equal(t, db.mmapSize(10000000), 20000768)
assert.Equal(t, db.mmapSize((1<<30)-1), 2147483648)
assert.Equal(t, db.mmapSize(1<<30), 1<<31)
}

// withDB executes a function with a database reference.
func withDB(fn func(*DB, string)) {
f, _ := ioutil.TempFile("", "bolt-")
Expand Down
24 changes: 20 additions & 4 deletions node.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,8 @@ func (n *node) write(p *page) {
// Initialize page.
if n.isLeaf {
p.flags |= p_leaf
// warn("∑", p.id, "leaf")
} else {
p.flags |= p_branch
// warn("∑", p.id, "branch")
}
p.count = uint16(len(n.inodes))

Expand All @@ -177,13 +175,11 @@ func (n *node) write(p *page) {
elem.pos = uint32(uintptr(unsafe.Pointer(&b[0])) - uintptr(unsafe.Pointer(elem)))
elem.ksize = uint32(len(item.key))
elem.vsize = uint32(len(item.value))
// warn(" »", string(item.key), "->", string(item.value))
} else {
elem := p.branchPageElement(uint16(i))
elem.pos = uint32(uintptr(unsafe.Pointer(&b[0])) - uintptr(unsafe.Pointer(elem)))
elem.ksize = uint32(len(item.key))
elem.pgid = item.pgid
// warn(" »", string(item.key))
}

// Write data for the element to the end of the page.
Expand Down Expand Up @@ -341,6 +337,26 @@ func (n *node) rebalance() {
n.parent.rebalance()
}

// dereference causes the node to copy all its inode key/value references to heap memory.
// This is required when the mmap is reallocated so inodes are not pointing to stale data.
func (n *node) dereference() {
key := make([]byte, len(n.key))
copy(key, n.key)
n.key = key

for i, _ := range n.inodes {
inode := &n.inodes[i]

key := make([]byte, len(inode.key))
copy(key, inode.key)
inode.key = key

value := make([]byte, len(inode.value))
copy(value, inode.value)
inode.value = value
}
}

// nodesByDepth sorts a list of branches by deepest first.
type nodesByDepth []*node

Expand Down
52 changes: 40 additions & 12 deletions rwtransaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ import (
// Only one read/write transaction can be active for a DB at a time.
type RWTransaction struct {
Transaction
nodes map[pgid]*node
nodes map[pgid]*node
pending []*node
}

// init initializes the transaction.
Expand All @@ -35,7 +36,10 @@ func (t *RWTransaction) CreateBucket(name string) error {
}

// Create a blank root leaf page.
p := t.allocate(1)
p, err := t.allocate(1)
if err != nil {
return err
}
p.flags = p_leaf

// Add bucket to buckets page.
Expand Down Expand Up @@ -100,14 +104,15 @@ func (t *RWTransaction) Commit() error {

// TODO(benbjohnson): Use vectorized I/O to write out dirty pages.

// TODO(benbjohnson): Move rebalancing to occur immediately after deletion (?).

// Rebalance and spill data onto dirty pages.
t.rebalance()
t.spill()

// Spill buckets page.
p := t.allocate((t.buckets.size() / t.db.pageSize) + 1)
p, err := t.allocate((t.buckets.size() / t.db.pageSize) + 1)
if err != nil {
return err
}
t.buckets.write(p)

// Write dirty pages to disk.
Expand Down Expand Up @@ -135,13 +140,16 @@ func (t *RWTransaction) close() {
}

// allocate returns a contiguous block of memory starting at a given page.
func (t *RWTransaction) allocate(count int) *page {
p := t.db.allocate(count)
func (t *RWTransaction) allocate(count int) (*page, error) {
p, err := t.db.allocate(count)
if err != nil {
return nil, err
}

// Save to our page cache.
t.pages[p.id] = p

return p
return p, nil
}

// rebalance attempts to balance all nodes.
Expand All @@ -152,7 +160,7 @@ func (t *RWTransaction) rebalance() {
}

// spill writes all the nodes to dirty pages.
func (t *RWTransaction) spill() {
func (t *RWTransaction) spill() error {
// Keep track of the current root nodes.
// We will update this at the end once all nodes are created.
type root struct {
Expand Down Expand Up @@ -180,6 +188,7 @@ func (t *RWTransaction) spill() {
// Split nodes into appropriate sized nodes.
// The first node in this list will be a reference to n to preserve ancestry.
newNodes := n.split(t.db.pageSize)
t.pending = newNodes

// If this is a root node that split then create a parent node.
if n.parent == nil && len(newNodes) > 1 {
Expand All @@ -195,7 +204,10 @@ func (t *RWTransaction) spill() {
// Write nodes to dirty pages.
for i, newNode := range newNodes {
// Allocate contiguous space for the node.
p := t.allocate((newNode.size() / t.db.pageSize) + 1)
p, err := t.allocate((newNode.size() / t.db.pageSize) + 1)
if err != nil {
return err
}

// Write the node to the page.
newNode.write(p)
Expand All @@ -215,6 +227,8 @@ func (t *RWTransaction) spill() {
newNode.parent.put(oldKey, newNode.inodes[0].key, nil, newNode.pgid)
}
}

t.pending = nil
}

// Update roots with new roots.
Expand All @@ -224,12 +238,12 @@ func (t *RWTransaction) spill() {

// Clear out nodes now that they are all spilled.
t.nodes = make(map[pgid]*node)

return nil
}

// write writes any dirty pages to disk.
func (t *RWTransaction) write() error {
// TODO(benbjohnson): If our last page id is greater than the mmap size then lock the DB and resize.

// Sort pages by id.
pages := make(pages, 0, len(t.pages))
for _, p := range t.pages {
Expand All @@ -247,6 +261,9 @@ func (t *RWTransaction) write() error {
}
}

// Clear out page cache.
t.pages = make(map[pgid]*page)

return nil
}

Expand Down Expand Up @@ -280,3 +297,14 @@ func (t *RWTransaction) node(pgid pgid, parent *node) *node {

return n
}

// dereference removes all references to the old mmap.
func (t *RWTransaction) dereference() {
for _, n := range t.nodes {
n.dereference()
}

for _, n := range t.pending {
n.dereference()
}
}
6 changes: 5 additions & 1 deletion syscall_darwin.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,15 @@ import (

type _syscall interface {
Mmap(fd int, offset int64, length int, prot int, flags int) (data []byte, err error)
Munmap([]byte) error
}

type syssyscall struct{}

func (o *syssyscall) Mmap(fd int, offset int64, length int, prot int, flags int) (data []byte, err error) {
// err = (EACCES, EBADF, EINVAL, ENODEV, ENOMEM, ENXIO, EOVERFLOW)
return syscall.Mmap(fd, offset, length, prot, flags)
}

func (o *syssyscall) Munmap(b []byte) error {
return syscall.Munmap(b)
}
Loading