Skip to content

Commit

Permalink
fix: failure to load chunks correctly for map-call over maps (#866)
Browse files Browse the repository at this point in the history
Ensures that any time we update a fork's ID, we also attempt to re-load the chunks data from stage defs.

Fixes a bug where map-called stages that map over a map type fail to load their chunks data. This results in those stages failing to reset their chunks when retry happens, which means the retried chunks happen in the original chunks directory instead of a fresh chunk directory. With this patch and added integration test, the chunks are correctly reset.

This patch also removes the args field from the Fork type which was set but never read.
  • Loading branch information
macklin-10x authored Jan 22, 2024
1 parent c068b26 commit cde0239
Show file tree
Hide file tree
Showing 138 changed files with 3,150 additions and 20 deletions.
6 changes: 3 additions & 3 deletions martian/core/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,17 +429,17 @@ func (self *Node) buildForks() {
// Build out argument permutations.
self.forkIds.MakeForkIds(self.call.ForkRoots(), self.top.types)
if len(self.forkIds.List) == 0 {
self.forks = []*Fork{NewFork(self, 0, nil, self.call.ResolvedInputs())}
self.forks = []*Fork{NewFork(self, 0, nil)}
} else {
self.forks = make([]*Fork, len(self.forkIds.List), cap(self.forkIds.List))
for i, id := range self.forkIds.List {
self.forks[i] = NewFork(self, i, id, self.call.ResolvedInputs())
self.forks[i] = NewFork(self, i, id)
}
}
}

func cloneFork(fork *Fork, id ForkId) *Fork {
nf := NewFork(fork.node, len(fork.node.forks), id, fork.args)
nf := NewFork(fork.node, len(fork.node.forks), id)
// Copy fileArgs
if len(fork.fileArgs) > 0 {
nf.fileArgs = make(
Expand Down
32 changes: 15 additions & 17 deletions martian/core/stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,6 @@ type Fork struct {
split_metadata *Metadata
join_metadata *Metadata
chunks []*Chunk
args map[string]*syntax.ResolvedBinding
stageDefs *StageDefs
perfCache *ForkPerfCache
lastPrint time.Time
Expand Down Expand Up @@ -392,29 +391,16 @@ type ForkPerfCache struct {
vdrKillReport *VDRKillReport
}

func NewFork(nodable Nodable, index int, id ForkId, args map[string]*syntax.ResolvedBinding) *Fork {
func NewFork(nodable Nodable, index int, id ForkId) *Fork {
self := &Fork{
node: nodable.getNode(),
index: index,
// By default, initialize stage defs with one empty chunk.
stageDefs: &StageDefs{ChunkDefs: []*ChunkDef{new(ChunkDef)}},
}
self.updateId(id)
self.args = args
self.split_has_run = false
self.join_has_run = false
self.lastPrint = time.Now()

// By default, initialize stage defs with one empty chunk.
self.stageDefs = &StageDefs{ChunkDefs: []*ChunkDef{new(ChunkDef)}}

if err := self.split_metadata.ReadInto(StageDefsFile, &self.stageDefs); err == nil {
width := util.WidthForInt(len(self.stageDefs.ChunkDefs))
self.chunks = make([]*Chunk, 0, len(self.stageDefs.ChunkDefs))
for i, chunkDef := range self.stageDefs.ChunkDefs {
chunk := NewChunk(self, i, chunkDef, width)
self.chunks = append(self.chunks, chunk)
}
}

return self
}

Expand All @@ -431,6 +417,7 @@ func (self *Fork) updateId(id ForkId) {
} else {
self.id = idx
}
oldPath := self.path
self.path = path.Join(self.node.path, self.id)
self.fqname = self.node.call.GetFqid() + "." + encodeJournalName.Replace(self.id)
self.metadata = NewMetadata(self.fqname, self.path)
Expand All @@ -447,6 +434,17 @@ func (self *Fork) updateId(id ForkId) {
self.join_metadata.finalFilePath = self.metadata.finalFilePath
self.join_metadata.discoverUniquify()
}
// If we updated the path, we should load stage defs and create chunks.
if self.path != oldPath {
if err := self.split_metadata.ReadInto(StageDefsFile, &self.stageDefs); err == nil {
width := util.WidthForInt(len(self.stageDefs.ChunkDefs))
self.chunks = make([]*Chunk, 0, len(self.stageDefs.ChunkDefs))
for i, chunkDef := range self.stageDefs.ChunkDefs {
chunk := NewChunk(self, i, chunkDef, width)
self.chunks = append(self.chunks, chunk)
}
}
}
}

func (self *Fork) Split() bool {
Expand Down
1 change: 1 addition & 0 deletions test/retry_map_call_map_test/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pipeline_test
40 changes: 40 additions & 0 deletions test/retry_map_call_map_test/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
load(
"//tools:mro_rules.bzl",
"mrf_test",
"mro_library",
"mro_test",
)
load("//test:integration_test.bzl", "integration_test")

mro_library(
name = "pipeline",
testonly = True,
srcs = ["pipeline.mro"],
deps = [
"//test/retry_map_call_map_test/stage",
],
)

mro_test(
name = "pipeline_test",
size = "small",
srcs = [":pipeline"],
)

mrf_test(
name = "pipeline_format",
srcs = [":pipeline"],
)

filegroup(
name = "pass_expect",
srcs = glob(["expected/**"]),
)

integration_test(
name = "retry_map_call_map_test",
config = "autoretry_pass.json",
expectation = [":pass_expect"],
pipeline = ":pipeline",
runner = "autoretry_pass.sh",
)
7 changes: 7 additions & 0 deletions test/retry_map_call_map_test/autoretry_pass.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"command": ["autoretry_pass.sh"],
"expected_return": 0,
"output_dir": "pipeline_test",
"contains_only_files": ["*"],
"contents_match": ["*"]
}
8 changes: 8 additions & 0 deletions test/retry_map_call_map_test/autoretry_pass.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#!/bin/bash
MROPATH=$PWD
if [ -z "$MROFLAGS" ]; then
export MROFLAGS="--disable-ui"
fi
PATH=../../bin:$PATH
mkdir -p ar_pass
mrp --autoretry=3 --vdrmode=strict pipeline.mro pipeline_test
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
2024-01-08 18:43:53
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
@include "pipeline.mro"

call BEGIN(
count = 2,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"sentinels": {
"test0": "/user/test/retry_map_call_map_test/retry_map_call_map_test.runfiles/martian/test/retry_map_call_map_test/pipeline_test/SHOULD_RESTART/BEGIN/fork0/chnk0-ue2ef9c4266/files/sentinel",
"test1": ""
},
"should_fail_next": {
"test0": true,
"test1": false
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"timestamp": "0001-01-01T00:00:00Z",
"paths": null,
"errors": null,
"events": [
{
"Timestamp": "2024-01-08T18:43:49.060592735Z",
"DeltaBytes": 3633
},
{
"Timestamp": "2024-01-08T18:43:53.344173786Z",
"DeltaBytes": 11866
}
],
"count": 0,
"size": 0
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"__mem_gb": 1,
"__threads": 1,
"__vmem_gb": 4,
"count": 2,
"sentinel": "/user/test/retry_map_call_map_test/retry_map_call_map_test.runfiles/martian/test/retry_map_call_map_test/pipeline_test/SHOULD_RESTART/BEGIN/fork0/split-ue2ef9c4265/files/sentinel",
"should_fail": true
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
2024-01-08 18:43:53
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
{
"name": "ID.pipeline_test.SHOULD_RESTART.BEGIN.fork0.chnk0",
"type": "local",
"cwd": "/user/test/retry_map_call_map_test/retry_map_call_map_test.runfiles/martian/test/retry_map_call_map_test/pipeline_test/SHOULD_RESTART/BEGIN/fork0/chnk0-ue2ef9c4266/files",
"python": {
"binpath": "/bin/python3",
"version": "3.7.16 (default, Aug 30 2023, 20:37:53) \n[GCC 7.3.1 20180712 (Red Hat 7.3.1-15)]"
},
"rusage": {
"self": {
"ru_maxrss": 13848,
"ru_ixrss": 0,
"ru_idrss": 0,
"ru_minflt": 365,
"ru_majflt": 0,
"ru_nswap": 0,
"ru_utime": 0.002404,
"ru_stime": 0.002404,
"ru_inblock": 0,
"ru_oublock": 40,
"ru_msgsnd": 0,
"ru_msgrcv": 0,
"ru_nsignals": 0,
"ru_nivcsw": 0
},
"children": {
"ru_maxrss": 15928,
"ru_ixrss": 0,
"ru_idrss": 0,
"ru_minflt": 3759,
"ru_majflt": 0,
"ru_nswap": 0,
"ru_utime": 0.047496,
"ru_stime": 0.015423,
"ru_inblock": 0,
"ru_oublock": 40,
"ru_msgsnd": 0,
"ru_msgrcv": 0,
"ru_nsignals": 0,
"ru_nivcsw": 2
}
},
"used_bytes": {
"rss": 14180352,
"shared": 3182592,
"vmem": 1641078784,
"text": 1187840,
"stack": 84197376,
"proc_count": 1
},
"io": {
"total": {
"read": {
"sysc": 6,
"bytes": 0
},
"write": {
"sysc": 0,
"bytes": 0
}
},
"max": {
"read": {
"sysc": 1588.093533415473,
"bytes": 0
},
"write": {
"sysc": 0,
"bytes": 0
}
},
"dev": {
"read": {
"sysc": 0,
"bytes": 0
},
"write": {
"sysc": 0,
"bytes": 0
}
}
},
"wallclock": {
"start": "2024-01-08T18:43:53.344173786Z",
"end": "2024-01-08T18:43:53.413277311Z",
"duration_seconds": 0.069103558
},
"profile_mode": "disable",
"stackvars_flag": "disable",
"monitor_flag": "disable",
"invocation": {
"call": "SHOULD_RESTART",
"args": {
"count": 2
},
"mro_file": "pipeline.mro"
},
"version": {
"martian": "\u003cversion not embedded\u003e",
"pipelines": "v4.0.11-29-g954ba393-dirty"
},
"host": "bespin1.fuzzplex.com",
"pid": 779123,
"threads": 1,
"memGB": 1,
"vmemGB": 4
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
2024-01-08 18:43:53 [time] __start__
2024-01-08 18:43:53 [monitor] cgroup memory limit of 9223372036854771712 bytes detected
2024-01-08 18:43:53 [time] __end__
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"sentinel": "/user/test/retry_map_call_map_test/retry_map_call_map_test.runfiles/martian/test/retry_map_call_map_test/pipeline_test/SHOULD_RESTART/BEGIN/fork0/chnk0-ue2ef9c4266/files/sentinel",
"sentinels": {},
"should_fail": true,
"should_fail_next": {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
[stderr]
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
[stdout]
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"__mem_gb": 1,
"__threads": 1,
"__vmem_gb": 4,
"count": 2,
"sentinel": "",
"should_fail": false
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
2024-01-08 18:43:49
Loading

0 comments on commit cde0239

Please sign in to comment.