Skip to content

Commit

Permalink
refactor(trace): Support gaps in trace data
Browse files Browse the repository at this point in the history
This change updates the implementation of the trace converter and
SimTrace implementation to support cases where there is a gap between
samples in the trace data.

This change allows users to specify what to do in case samples are
missing in the trace. The available options are specified in
`SimTrace.FillMode`. Currently, we support either carrying the previous
value forward or set the usage to zero.
  • Loading branch information
fabianishere committed Nov 2, 2021
1 parent a8e2d46 commit 79a2f31
Show file tree
Hide file tree
Showing 9 changed files with 117 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ public class ComputeWorkloadLoader(private val baseDir: File) {
val cores = reader.getInt(coresCol)
val cpuUsage = reader.getDouble(usageCol)

val timeMs = time.toEpochMilli()
val deadlineMs = timeMs + duration.toMillis()
val deadlineMs = time.toEpochMilli()
val timeMs = (time - duration).toEpochMilli()
val builder = fragments.computeIfAbsent(id) { Builder() }
builder.add(timeMs, deadlineMs, cpuUsage, cores)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,11 @@ class CapelinIntegrationTest {
{ assertEquals(0, serviceMetrics.serversActive, "All VMs should finish after a run") },
{ assertEquals(0, serviceMetrics.attemptsFailure, "No VM should be unscheduled") },
{ assertEquals(0, serviceMetrics.serversPending, "No VM should not be in the queue") },
{ assertEquals(223325655, this@CapelinIntegrationTest.exporter.idleTime) { "Incorrect idle time" } },
{ assertEquals(67006560, this@CapelinIntegrationTest.exporter.activeTime) { "Incorrect active time" } },
{ assertEquals(3159377, this@CapelinIntegrationTest.exporter.stealTime) { "Incorrect steal time" } },
{ assertEquals(223388307, this@CapelinIntegrationTest.exporter.idleTime) { "Incorrect idle time" } },
{ assertEquals(66977508, this@CapelinIntegrationTest.exporter.activeTime) { "Incorrect active time" } },
{ assertEquals(3160381, this@CapelinIntegrationTest.exporter.stealTime) { "Incorrect steal time" } },
{ assertEquals(0, this@CapelinIntegrationTest.exporter.lostTime) { "Incorrect lost time" } },
{ assertEquals(5.840862926294953E9, this@CapelinIntegrationTest.exporter.energyUsage, 0.01) { "Incorrect power draw" } },
{ assertEquals(5.840845430827075E9, this@CapelinIntegrationTest.exporter.energyUsage, 0.01) { "Incorrect power draw" } },
)
}

Expand Down Expand Up @@ -159,11 +159,11 @@ class CapelinIntegrationTest {

// Note that these values have been verified beforehand
assertAll(
{ assertEquals(10997726, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } },
{ assertEquals(9740289, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } },
{ assertEquals(10999208, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } },
{ assertEquals(9741207, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } },
{ assertEquals(0, this@CapelinIntegrationTest.exporter.stealTime) { "Steal time incorrect" } },
{ assertEquals(0, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } },
{ assertEquals(7.010642279990053E8, this@CapelinIntegrationTest.exporter.energyUsage, 0.01) { "Incorrect power draw" } }
{ assertEquals(7.011413569311495E8, this@CapelinIntegrationTest.exporter.energyUsage, 0.01) { "Incorrect power draw" } }
)
}

Expand Down Expand Up @@ -209,10 +209,10 @@ class CapelinIntegrationTest {

// Note that these values have been verified beforehand
assertAll(
{ assertEquals(6013515, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } },
{ assertEquals(14724500, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } },
{ assertEquals(12530742, this@CapelinIntegrationTest.exporter.stealTime) { "Steal time incorrect" } },
{ assertEquals(465088, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } }
{ assertEquals(6027666, this@CapelinIntegrationTest.exporter.idleTime) { "Idle time incorrect" } },
{ assertEquals(14712749, this@CapelinIntegrationTest.exporter.activeTime) { "Active time incorrect" } },
{ assertEquals(12532907, this@CapelinIntegrationTest.exporter.stealTime) { "Steal time incorrect" } },
{ assertEquals(468522, this@CapelinIntegrationTest.exporter.lostTime) { "Lost time incorrect" } }
)
}

Expand Down Expand Up @@ -252,11 +252,11 @@ class CapelinIntegrationTest {

// Note that these values have been verified beforehand
assertAll(
{ assertEquals(10865478, exporter.idleTime) { "Idle time incorrect" } },
{ assertEquals(9606177, exporter.activeTime) { "Active time incorrect" } },
{ assertEquals(10866961, exporter.idleTime) { "Idle time incorrect" } },
{ assertEquals(9607095, exporter.activeTime) { "Active time incorrect" } },
{ assertEquals(0, exporter.stealTime) { "Steal time incorrect" } },
{ assertEquals(0, exporter.lostTime) { "Lost time incorrect" } },
{ assertEquals(2559005056, exporter.uptime) { "Uptime incorrect" } }
{ assertEquals(2559305056, exporter.uptime) { "Uptime incorrect" } }
)
}

Expand Down
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,28 @@ public class SimTrace(

/**
* Construct a new [FlowSource] for the specified [cpu].
*
* @param cpu The [ProcessingUnit] for which to create the source.
* @param offset The time offset to use for the trace.
* @param fillMode The [FillMode] for filling missing data.
*/
public fun newSource(cpu: ProcessingUnit, offset: Long): FlowSource {
return CpuConsumer(cpu, offset, usageCol, timestampCol, deadlineCol, coresCol, size)
public fun newSource(cpu: ProcessingUnit, offset: Long, fillMode: FillMode = FillMode.None): FlowSource {
return CpuConsumer(cpu, offset, fillMode, usageCol, timestampCol, deadlineCol, coresCol, size)
}

/**
* An enumeration describing the modes for filling missing data.
*/
public enum class FillMode {
/**
* When a gap in the trace data occurs, the CPU usage will be set to zero.
*/
None,

/**
* When a gap in the trace data occurs, the previous CPU usage will be used.
*/
Previous
}

/**
Expand Down Expand Up @@ -183,6 +202,7 @@ public class SimTrace(
private class CpuConsumer(
cpu: ProcessingUnit,
private val offset: Long,
private val fillMode: FillMode,
private val usageCol: DoubleArray,
private val timestampCol: LongArray,
private val deadlineCol: LongArray,
Expand Down Expand Up @@ -217,9 +237,12 @@ public class SimTrace(
_idx = idx
val timestamp = timestampCol[idx]

// Fragment is in the future
// There is a gap in the trace, since the next fragment starts in the future.
if (timestamp > nowOffset) {
conn.push(0.0)
when (fillMode) {
FillMode.None -> conn.push(0.0) // Reset rate to zero
FillMode.Previous -> {} // Keep previous rate
}
return timestamp - nowOffset
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import java.time.Duration
import java.time.Instant

/**
* Timestamp for the state.
* The timestamp at which the state was recorded.
*/
@JvmField
public val RESOURCE_STATE_TIMESTAMP: TableColumn<Instant> = column("resource_state:timestamp")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,16 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") {
*/
private val logger = KotlinLogging.logger {}

/**
* The interval at which the samples where taken.
*/
private val SAMPLE_INTERVAL = Duration.ofMinutes(5)

/**
* The difference in CPU usage for the algorithm to cascade samples.
*/
private val SAMPLE_CASCADE_DIFF = 0.1

override fun convertResources(trace: Trace, writer: TableWriter, samplingOptions: SamplingOptions?): Map<String, Resource> {
val random = samplingOptions?.let { Random(it.seed) }
val samplingFraction = samplingOptions?.fraction ?: 1.0
Expand Down Expand Up @@ -242,7 +252,7 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") {

logger.info { "Selecting VM $id" }

val startInstant = Instant.ofEpochMilli(startTime)
val startInstant = Instant.ofEpochMilli(startTime) - SAMPLE_INTERVAL // Offset by sample interval
val stopInstant = Instant.ofEpochMilli(stopTime)

selectedVms.computeIfAbsent(id) {
Expand All @@ -264,6 +274,7 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") {

override fun convertResourceStates(trace: Trace, writer: TableWriter, selected: Map<String, Resource>): Int {
val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader()
val sampleInterval = SAMPLE_INTERVAL.toMillis()

val idCol = reader.resolve(RESOURCE_ID)
val timestampCol = reader.resolve(RESOURCE_STATE_TIMESTAMP)
Expand All @@ -272,8 +283,6 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") {

var hasNextRow = reader.nextRow()
var count = 0
var lastId: String? = null
var lastTimestamp = 0L

while (hasNextRow) {
val id = reader.get(idCol) as String
Expand All @@ -287,41 +296,43 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") {
val cpuUsage = reader.getDouble(cpuUsageCol)

val startTimestamp = (reader.get(timestampCol) as Instant).toEpochMilli()
var timestamp = startTimestamp
var duration: Long
var timestamp: Long = startTimestamp
var duration: Long = sampleInterval

// Check whether the previous entry is from a different VM
if (id != lastId) {
lastTimestamp = timestamp - 5 * 60 * 1000L
}
// Attempt to cascade further samples into one if they share the same CPU usage
while (reader.nextRow().also { hasNextRow = it }) {
val shouldCascade = id == reader.get(idCol) &&
abs(cpuUsage - reader.getDouble(cpuUsageCol)) < SAMPLE_CASCADE_DIFF &&
cpuCount == reader.getInt(cpuCountCol)

do {
timestamp = (reader.get(timestampCol) as Instant).toEpochMilli()
// Check whether the next sample can be cascaded with the current sample:
// (1) The VM identifier of both samples matches
// (2) The CPU usage is almost identical (lower than `SAMPLE_CASCADE_DIFF`
// (3) The CPU count of both samples is identical
if (!shouldCascade) {
break
}

duration = timestamp - lastTimestamp
hasNextRow = reader.nextRow()
val nextTimestamp = (reader.get(timestampCol) as Instant).toEpochMilli()

if (!hasNextRow) {
// Check whether the interval between both samples is not higher than `SAMPLE_INTERVAL`
if ((nextTimestamp - timestamp) > sampleInterval) {
break
}

val shouldContinue = id == reader.get(idCol) &&
abs(cpuUsage - reader.getDouble(cpuUsageCol)) < 0.01 &&
cpuCount == reader.getInt(cpuCountCol)
} while (shouldContinue)
duration += nextTimestamp - timestamp
timestamp = nextTimestamp
}

writer.startRow()
writer.set(RESOURCE_ID, id)
writer.set(RESOURCE_STATE_TIMESTAMP, Instant.ofEpochMilli(startTimestamp))
writer.set(RESOURCE_STATE_TIMESTAMP, Instant.ofEpochMilli(timestamp))
writer.set(RESOURCE_STATE_DURATION, Duration.ofMillis(duration))
writer.setInt(RESOURCE_CPU_COUNT, cpuCount)
writer.setDouble(RESOURCE_STATE_CPU_USAGE, cpuUsage)
writer.endRow()

count++

lastId = id
lastTimestamp = timestamp
}

return count
Expand All @@ -342,6 +353,16 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") {
*/
private val CPU_CAPACITY = 2500.0

/**
* The interval at which the samples where taken.
*/
private val SAMPLE_INTERVAL = Duration.ofMinutes(5)

/**
* The difference in CPU usage for the algorithm to cascade samples.
*/
private val SAMPLE_CASCADE_DIFF = 0.1

override fun convertResources(trace: Trace, writer: TableWriter, samplingOptions: SamplingOptions?): Map<String, Resource> {
val random = samplingOptions?.let { Random(it.seed) }
val samplingFraction = samplingOptions?.fraction ?: 1.0
Expand Down Expand Up @@ -379,10 +400,10 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") {

writer.startRow()
writer.set(RESOURCE_ID, id)
writer.set(RESOURCE_START_TIME, Instant.ofEpochMilli(startTime))
writer.set(RESOURCE_STOP_TIME, Instant.ofEpochMilli(stopTime))
writer.set(RESOURCE_START_TIME, startInstant)
writer.set(RESOURCE_STOP_TIME, stopInstant)
writer.setInt(RESOURCE_CPU_COUNT, cpuCount)
writer.setDouble(RESOURCE_CPU_CAPACITY, cpuCount * CPU_CAPACITY)
writer.setDouble(RESOURCE_CPU_CAPACITY, cpuCapacity)
writer.setDouble(RESOURCE_MEM_CAPACITY, memCapacity)
writer.endRow()
}
Expand All @@ -393,6 +414,7 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") {
override fun convertResourceStates(trace: Trace, writer: TableWriter, selected: Map<String, Resource>): Int {
val reader = checkNotNull(trace.getTable(TABLE_RESOURCE_STATES)).newReader()
val states = HashMap<String, State>()
val sampleInterval = SAMPLE_INTERVAL.toMillis()

val idCol = reader.resolve(RESOURCE_ID)
val timestampCol = reader.resolve(RESOURCE_STATE_TIMESTAMP)
Expand All @@ -405,39 +427,49 @@ internal class TraceConverterCli : CliktCommand(name = "trace-converter") {
val resource = selected[id] ?: continue

val cpuUsage = reader.getDouble(cpuUsageCol) * CPU_CAPACITY // MHz
val state = states.computeIfAbsent(id) { State(resource, cpuUsage) }
val state = states.computeIfAbsent(id) { State(resource, cpuUsage, sampleInterval) }
val timestamp = (reader.get(timestampCol) as Instant).toEpochMilli()
val duration = (timestamp - state.startTime)

state.duration = duration

if (abs(cpuUsage - state.cpuUsage) > 0.01) {
state.write(writer)

state.startTime = timestamp
state.duration = 0
state.cpuUsage = cpuUsage
val delta = (timestamp - state.time)

// Check whether the next sample can be cascaded with the current sample:
// (1) The CPU usage is almost identical (lower than `SAMPLE_CASCADE_DIFF`)
// (2) The interval between both samples is not higher than `SAMPLE_INTERVAL`
if (abs(cpuUsage - state.cpuUsage) <= SAMPLE_CASCADE_DIFF && delta <= sampleInterval) {
state.time = timestamp
state.duration += delta
continue
}

state.write(writer)
// Reset the state fields
state.time = timestamp
state.duration = sampleInterval
// Count write
count++
}

for ((_, state) in states) {
state.duration += state.startTime - state.resource.stopTime.toEpochMilli()
state.write(writer)
count++
}

return count
}

private class State(@JvmField val resource: Resource, @JvmField var cpuUsage: Double) {
@JvmField var startTime: Long = resource.startTime.toEpochMilli()
@JvmField var duration: Long = 30000L
private class State(@JvmField val resource: Resource, @JvmField var cpuUsage: Double, @JvmField var duration: Long) {
@JvmField var time: Long = resource.startTime.toEpochMilli()
private var lastWrite: Long = Long.MIN_VALUE

fun write(writer: TableWriter) {
// Check whether this timestamp was already written
if (lastWrite == time) {
return
}
lastWrite = time

writer.startRow()
writer.set(RESOURCE_ID, resource.id)
writer.set(RESOURCE_STATE_TIMESTAMP, Instant.ofEpochMilli(startTime))
writer.set(RESOURCE_STATE_TIMESTAMP, Instant.ofEpochMilli(time))
writer.set(RESOURCE_STATE_DURATION, Duration.ofMillis(duration))
writer.setDouble(RESOURCE_STATE_CPU_USAGE, cpuUsage)
writer.setInt(RESOURCE_CPU_COUNT, resource.cpuCount)
Expand Down
Binary file modified traces/bitbrains-small/meta.parquet
Binary file not shown.
Binary file modified traces/bitbrains-small/trace.parquet
Binary file not shown.

0 comments on commit 79a2f31

Please sign in to comment.