-
Notifications
You must be signed in to change notification settings - Fork 2
/
database.go
201 lines (177 loc) · 6.97 KB
/
database.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
// Copyright 2019 GRAIL, Inc. All rights reserved.
// Use of this source code is governed by the Apache 2.0
// license that can be found in the LICENSE file.
package diviner
import (
"context"
"errors"
"fmt"
"io"
"time"
)
// RunState describes the current state of a particular run.
type RunState int
const (
// Pending indicates that the run has not yet completed.
Pending RunState = 1 << iota
// Success indicates that the run has completed and represents
// a successful trial.
Success
// Failure indicates that the run failed.
Failure
// Any contains all run states.
Any = Pending | Success | Failure
)
// String returns a simple textual representation of a run state.
func (s RunState) String() string {
switch s {
case 0:
return "unknown"
case Pending:
return "pending"
case Success:
return "success"
case Failure:
return "failure"
default:
return "INVALID"
}
}
// A Run describes a single run, which, upon successful completion,
// represents a Trial. Runs are managed by a Database.
type Run struct {
// Values is the set of parameter values represented by this run.
Values
// Study is the name of the study serviced by this run.
Study string
// Seq is a sequence number assigned to each run in a study.
// Together, the study and sequence number uniquely names
// a run.
Seq uint64
// Replicate is the replicate of this run.
Replicate int
// State is the current state of the run. See RunState for
// descriptions of these.
State RunState
// Status is a human-consumable status indicating the status
// of the run.
Status string
// Config is the RunConfig for this run.
Config RunConfig
// Created is the time at which the run was created.
Created time.Time
// Updated is the last time the run's state was updated. Updated is
// used as a keepalive mechanism.
Updated time.Time
// Runtime is the runtime duration of the run.
Runtime time.Duration
// Number of times the run was retried.
Retries int
// Metrics is the history of metrics, in the order reported by the
// run.
//
// TODO(marius): include timestamps for these, or some other
// reference (e.g., runtime).
Metrics []Metrics
}
// ID returns this run's identifier.
func (r Run) ID() string {
return fmt.Sprintf("%s:%d", r.Study, r.Seq)
}
// Trial returns the Trial represented by this run.
//
// TODO(marius): allow other metric selection policies
// (e.g., minimize train and test loss difference)
func (r Run) Trial() Trial {
trial := Trial{Values: r.Values, Pending: r.State != Success, Runs: []Run{r}}
trial.Replicates.Set(r.Replicate)
if len(r.Metrics) > 0 {
trial.Metrics = r.Metrics[len(r.Metrics)-1]
}
return trial
}
// ErrNotExist is returned from a database when a study or run does not exist.
var ErrNotExist = errors.New("study or run does not exist")
// A Database is used to track and manage studies and runs.
type Database interface {
// CreateTable creates the underlying database table.
CreateTable(context.Context) error
// CreateStudyIfNotExist creates a new study from the provided Study value.
// If the study already exists, this is a no-op.
CreateStudyIfNotExist(ctx context.Context, study Study) (created bool, err error)
// LookupStudy returns the study with the provided name.
LookupStudy(ctx context.Context, name string) (Study, error)
// ListStudies returns the set of studies matching the provided prefix and whose
// last update time is not before the provided time.
ListStudies(ctx context.Context, prefix string, since time.Time) ([]Study, error)
// NextSeq reserves and returns the next run sequence number for the
// provided study.
NextSeq(ctx context.Context, study string) (uint64, error)
// InsertRun inserts the provided run into a study. The run's study,
// values, and config must be populated; other fields are ignored.
// If the sequence number is provided (>0), then it is assumed to
// have been reserved by NextSeq. The run's study must already
// exist, and the returned Run is assigned a sequence number, state,
// and creation time.
InsertRun(ctx context.Context, run Run) (Run, error)
// UpdateRun updates the run named by the provided study and
// sequence number with the given run state, message, runtime, and
// current retry sequence.
// UpdateRun is used also as a keepalive mechanism: runners must
// call UpdateRun frequently in order to have the run considered
// live by Diviner's tooling.
UpdateRun(ctx context.Context, study string, seq uint64, state RunState, message string, runtime time.Duration, retry int) error
// AppendRunMetrics reports a new set of metrics to the run named by the provided
// study and sequence number.
AppendRunMetrics(ctx context.Context, study string, seq uint64, metrics Metrics) error
// ListRuns returns the set of runs in the provided study matching the queried
// run states. ListRuns only returns runs that have been updated since the provided
// time.
ListRuns(ctx context.Context, study string, states RunState, since time.Time) ([]Run, error)
// LookupRun returns the run named by the provided study and sequence number.
LookupRun(ctx context.Context, study string, seq uint64) (Run, error)
// Log obtains a reader for the logs emitted by the run named by the study and
// sequence number. If !since.IsZero(), show messages added at or after the
// given time. If follow is true, the returned reader is a perpetual stream,
// updated as new log entries are appended.
Log(study string, seq uint64, since time.Time, follow bool) io.Reader
// Logger returns an io.WriteCloser, to which log messages can be written,
// for the run named by a study and sequence number.
Logger(study string, seq uint64) io.WriteCloser
}
// Trials queries the database db for all runs in the provided study,
// and returns a set of composite trials for each replicate of a
// value set. The returned map maps value sets to these composite
// trials.
//
// Trial metrics are averaged across runs in the states as indicated
// by the provided run states; flags are set on the returned trials
// to indicate which replicates they comprise and whether any pending
// results were used.
//
// TODO(marius): this is a reasonable approach for some metrics, but
// not for others. We should provide a way for users to (e.g., as
// part of a study definition) to define their own means of defining
// composite metrics, e.g., by intepreting metrics from each run, or
// their outputs directly (e.g., predictions from an evaluation run).
func Trials(ctx context.Context, db Database, study Study, states RunState) (*Map, error) {
runs, err := db.ListRuns(ctx, study.Name, states, time.Time{})
if err != nil && err != ErrNotExist {
return nil, err
}
replicates := NewMap()
for i := range runs {
var trials []Trial
if v, ok := replicates.Get(runs[i].Values); ok {
trials = v.([]Trial)
}
trials = append(trials, runs[i].Trial())
replicates.Put(runs[i].Values, trials)
}
trials := NewMap()
replicates.Range(func(key Value, v interface{}) {
values := key.(Values)
trials.Put(&values, ReplicatedTrial(v.([]Trial)))
})
return trials, nil
}