Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -505,38 +505,27 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
}

test("in-memory LRU storage") {
store = makeBlockManager(12000)
val a1 = new Array[Byte](4000)
val a2 = new Array[Byte](4000)
val a3 = new Array[Byte](4000)
store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY)
store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY)
assert(store.getSingle("a2").isDefined, "a2 was not in store")
assert(store.getSingle("a3").isDefined, "a3 was not in store")
assert(store.getSingle("a1") === None, "a1 was in store")
assert(store.getSingle("a2").isDefined, "a2 was not in store")
// At this point a2 was gotten last, so LRU will getSingle rid of a3
store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
assert(store.getSingle("a1").isDefined, "a1 was not in store")
assert(store.getSingle("a2").isDefined, "a2 was not in store")
assert(store.getSingle("a3") === None, "a3 was in store")
testInMemoryLRUStorage(StorageLevel.MEMORY_ONLY)
}

test("in-memory LRU storage with serialization") {
testInMemoryLRUStorage(StorageLevel.MEMORY_ONLY_SER)
}

private def testInMemoryLRUStorage(storageLevel: StorageLevel): Unit = {
store = makeBlockManager(12000)
val a1 = new Array[Byte](4000)
val a2 = new Array[Byte](4000)
val a3 = new Array[Byte](4000)
store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_SER)
store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_SER)
store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY_SER)
store.putSingle("a1", a1, storageLevel)
store.putSingle("a2", a2, storageLevel)
store.putSingle("a3", a3, storageLevel)
assert(store.getSingle("a2").isDefined, "a2 was not in store")
assert(store.getSingle("a3").isDefined, "a3 was not in store")
assert(store.getSingle("a1") === None, "a1 was in store")
assert(store.getSingle("a2").isDefined, "a2 was not in store")
// At this point a2 was gotten last, so LRU will getSingle rid of a3
store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_SER)
store.putSingle("a1", a1, storageLevel)
assert(store.getSingle("a1").isDefined, "a1 was not in store")
assert(store.getSingle("a2").isDefined, "a2 was not in store")
assert(store.getSingle("a3") === None, "a3 was in store")
Expand Down Expand Up @@ -618,62 +607,35 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
}

test("disk and memory storage") {
store = makeBlockManager(12000)
val a1 = new Array[Byte](4000)
val a2 = new Array[Byte](4000)
val a3 = new Array[Byte](4000)
store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK)
store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK)
store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK)
assert(store.getSingle("a2").isDefined, "a2 was not in store")
assert(store.getSingle("a3").isDefined, "a3 was not in store")
assert(store.memoryStore.getValues("a1") == None, "a1 was in memory store")
assert(store.getSingle("a1").isDefined, "a1 was not in store")
assert(store.memoryStore.getValues("a1").isDefined, "a1 was not in memory store")
testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK, _.getSingle)
}

test("disk and memory storage with getLocalBytes") {
store = makeBlockManager(12000)
val a1 = new Array[Byte](4000)
val a2 = new Array[Byte](4000)
val a3 = new Array[Byte](4000)
store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK)
store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK)
store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK)
assert(store.getLocalBytes("a2").isDefined, "a2 was not in store")
assert(store.getLocalBytes("a3").isDefined, "a3 was not in store")
assert(store.memoryStore.getValues("a1") == None, "a1 was in memory store")
assert(store.getLocalBytes("a1").isDefined, "a1 was not in store")
assert(store.memoryStore.getValues("a1").isDefined, "a1 was not in memory store")
testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK, _.getLocalBytes)
}

test("disk and memory storage with serialization") {
store = makeBlockManager(12000)
val a1 = new Array[Byte](4000)
val a2 = new Array[Byte](4000)
val a3 = new Array[Byte](4000)
store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK_SER)
store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK_SER)
store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK_SER)
assert(store.getSingle("a2").isDefined, "a2 was not in store")
assert(store.getSingle("a3").isDefined, "a3 was not in store")
assert(store.memoryStore.getValues("a1") == None, "a1 was in memory store")
assert(store.getSingle("a1").isDefined, "a1 was not in store")
assert(store.memoryStore.getValues("a1").isDefined, "a1 was not in memory store")
testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK_SER, _.getSingle)
}

test("disk and memory storage with serialization and getLocalBytes") {
testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK_SER, _.getLocalBytes)
}

def testDiskAndMemoryStorage(
storageLevel: StorageLevel,
accessMethod: BlockManager => BlockId => Option[_]): Unit = {
store = makeBlockManager(12000)
val a1 = new Array[Byte](4000)
val a2 = new Array[Byte](4000)
val a3 = new Array[Byte](4000)
store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK_SER)
store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK_SER)
store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK_SER)
assert(store.getLocalBytes("a2").isDefined, "a2 was not in store")
assert(store.getLocalBytes("a3").isDefined, "a3 was not in store")
assert(store.memoryStore.getValues("a1") == None, "a1 was in memory store")
assert(store.getLocalBytes("a1").isDefined, "a1 was not in store")
store.putSingle("a1", a1, storageLevel)
store.putSingle("a2", a2, storageLevel)
store.putSingle("a3", a3, storageLevel)
assert(accessMethod(store)("a2").isDefined, "a2 was not in store")
assert(accessMethod(store)("a3").isDefined, "a3 was not in store")
assert(store.memoryStore.getValues("a1").isEmpty, "a1 was in memory store")
assert(accessMethod(store)("a1").isDefined, "a1 was not in store")
assert(store.memoryStore.getValues("a1").isDefined, "a1 was not in memory store")
}

Expand Down