Skip to content

Commit

Permalink
feat: Log Signal Exp Config and Monitoring (#9947)
Browse files Browse the repository at this point in the history
Co-authored-by: Ryan <rb@hpe.com>
  • Loading branch information
jgongd and rb-determined-ai authored Oct 24, 2024
1 parent 06b0b31 commit c71617c
Show file tree
Hide file tree
Showing 48 changed files with 1,271 additions and 760 deletions.
31 changes: 31 additions & 0 deletions e2e_tests/tests/cluster/test_log_policies.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from determined.experimental import client
from tests import api_utils
from tests import experiment as exp
from tests.cluster import utils
from tests.experiment import noop


Expand Down Expand Up @@ -152,3 +153,33 @@ def test_log_policy_exclude_slurm(should_match: bool) -> None:
) # Job fails to start up the second restart since all nodes are excluded.
else:
assert times_ran == 2


@pytest.mark.e2e_cpu
@pytest.mark.parametrize("should_match", [True, False])
def test_log_policy_matched(should_match: bool) -> None:
sess = api_utils.user_session()
regex = r"executing.*action.*exit.*code.*7"
if not should_match:
regex = r"(.*) this should not match (.*)"

expected_policy = "Test"
config = {
"log_policies": [{"name": expected_policy, "pattern": regex}],
}

exp_ref = noop.create_experiment(sess, [noop.Exit(7)], config=config)
assert exp_ref.wait(interval=0.01) == client.ExperimentState.ERROR

searchRes = utils.get_run_by_exp_id(sess, exp_ref.id)
runPolicyMatched = searchRes.runs[0].logPolicyMatched

trialRes = bindings.get_GetTrial(sess, trialId=searchRes.runs[0].id)
trialPolicyMatched = trialRes.trial.logPolicyMatched

if should_match:
assert runPolicyMatched == expected_policy
assert trialPolicyMatched == expected_policy
else:
assert runPolicyMatched is None
assert trialPolicyMatched is None
28 changes: 28 additions & 0 deletions e2e_tests/tests/cluster/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from typing_extensions import Literal # noqa:I2041

from determined.common import api
from determined.common.api import bindings
from tests import command
from tests import config as conf
from tests import detproc
Expand Down Expand Up @@ -197,3 +198,30 @@ def set_master_port(config: str) -> None:
lc = conf.load_config(config_path=config)
port = get_master_port(lc)
conf.MASTER_PORT = port


def get_run_by_exp_id(sess: api.Session, exp_id: int) -> bindings.v1SearchRunsResponse:
return bindings.post_SearchRuns(
sess,
body=bindings.v1SearchRunsRequest(
limit=1,
filter="""{
"filterGroup": {
"children": [
{
"columnName": "experimentId",
"kind": "field",
"location": "LOCATION_TYPE_RUN",
"operator": "=",
"type": "COLUMN_TYPE_NUMBER",
"value": %s
}
],
"conjunction": "and",
"kind": "group"
},
"showArchived": false
}"""
% exp_id,
),
)
32 changes: 16 additions & 16 deletions harness/determined/common/api/bindings.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion master/internal/api_experiment.go
Original file line number Diff line number Diff line change
Expand Up @@ -2741,7 +2741,7 @@ func (a *apiServer) SearchExperiments(
Column("searcher_metric_value").
Column("trials.external_trial_id").
ColumnExpr("nullif(trials.metadata, 'null') as metadata").
ColumnExpr("NULL as log_signal").
ColumnExpr("NULL as log_policy_matched").
Join("LEFT JOIN validations bv ON trials.best_validation_id = bv.id").
Join("LEFT JOIN validations lv ON trials.latest_validation_id = lv.id").
Join("LEFT JOIN checkpoints_v2 new_ckpt ON new_ckpt.id = trials.warm_start_checkpoint_id").
Expand Down
2 changes: 1 addition & 1 deletion master/internal/api_runs.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func getRunsColumns(q *bun.SelectQuery) *bun.SelectQuery {
'pachyderm_integration', NULLIF(e.config#>'{integrations,pachyderm}', 'null'),
'id', e.id) AS experiment`).
ColumnExpr("rm.metadata AS metadata").
ColumnExpr("r.log_signal AS log_signal").
ColumnExpr("r.log_policy_matched AS log_policy_matched").
Join("LEFT JOIN experiments AS e ON r.experiment_id=e.id").
Join("LEFT JOIN runs_metadata AS rm ON r.id=rm.run_id").
Join("LEFT JOIN users u ON e.owner_id = u.id").
Expand Down
85 changes: 79 additions & 6 deletions master/internal/api_tasks_intg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,12 +219,14 @@ func TestPostTaskLogsLogPattern(t *testing.T) {
activeConfig, err := api.m.db.ActiveExperimentConfig(trial.ExperimentID)
require.NoError(t, err)
activeConfig.RawLogPolicies = expconf.LogPoliciesConfig{
expconf.LogPolicy{RawPattern: "sub", RawAction: expconf.LogAction{
RawCancelRetries: &expconf.LogActionCancelRetries{},
}},
expconf.LogPolicy{RawPattern: `\d{5}$`, RawAction: expconf.LogAction{
RawExcludeNode: &expconf.LogActionExcludeNode{},
}},
expconf.LogPolicy{
RawPattern: ptrs.Ptr("sub"),
RawAction: &expconf.LogActionV0{Type: expconf.LogActionTypeCancelRetries},
},
expconf.LogPolicy{
RawPattern: ptrs.Ptr(`\d{5}$`),
RawAction: &expconf.LogActionV0{Type: expconf.LogActionTypeExcludeNode},
},
}

v, err := json.Marshal(activeConfig)
Expand Down Expand Up @@ -443,3 +445,74 @@ func TestGetAllocationAcceleratorData(t *testing.T) {
require.Equal(t, resp.AcceleratorData[0].ResourcePool,
a1.ResourcePool, "failed to get the correct allocation's resource pool data")
}

func TestPostTaskLogsLogSignalDataSaving(t *testing.T) {
api, curUser, ctx := setupAPITest(t, nil)
trial, task := createTestTrial(t, api, curUser)

activeConfig, err := api.m.db.ActiveExperimentConfig(trial.ExperimentID)
require.NoError(t, err)

activeConfig.RawLogPolicies = expconf.LogPoliciesConfig{
expconf.LogPolicy{
RawName: ptrs.Ptr("test"),
RawPattern: ptrs.Ptr("sub"),
},
}

v, err := json.Marshal(activeConfig)
require.NoError(t, err)

var m map[string]any
require.NoError(t, json.Unmarshal(v, &m))

_, err = db.Bun().NewUpdate().Table("experiments").
Where("id = ?", trial.ExperimentID).
Set("config = ?", m).
Exec(ctx)
require.NoError(t, err)

_, err = api.PostTaskLogs(ctx, &apiv1.PostTaskLogsRequest{
Logs: []*taskv1.TaskLog{
{
TaskId: string(task.TaskID),
AgentId: ptrs.Ptr("a1"),
Log: "stringsubstring",
},
{
TaskId: string(task.TaskID),
AgentId: ptrs.Ptr("a1"),
Log: "12345",
},
},
})
require.NoError(t, err)

runsOut := struct {
bun.BaseModel `bun:"table:runs"`
LogPolicyMatched *string `db:"log_policy_matched"`
}{}

err = db.Bun().NewSelect().Model(&runsOut).
Where("id = ?", trial.ID).
Scan(ctx)
require.NoError(t, err)
require.NotNil(t, runsOut)
require.NotNil(t, runsOut.LogPolicyMatched)

require.Equal(t, "test", *runsOut.LogPolicyMatched)

tasksOut := struct {
bun.BaseModel `bun:"table:tasks"`
LogPolicyMatched *string `db:"log_policy_matched"`
}{}
err = db.Bun().NewSelect().Model(&tasksOut).
Join("LEFT JOIN run_id_task_id AS rt on tasks.task_id = rt.task_id").
Where("run_id = ?", trial.ID).
Scan(ctx)
require.NoError(t, err)
require.NotNil(t, tasksOut)
require.NotNil(t, tasksOut.LogPolicyMatched)

require.Equal(t, "test", *tasksOut.LogPolicyMatched)
}
7 changes: 1 addition & 6 deletions master/internal/configpolicy/postgres_task_config_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,7 @@ const (
// DefaultInvariantConfigStr is the default invariant config val used for tests.
DefaultInvariantConfigStr = `{
"description": "random description",
"resources": {"slots": 4, "max_slots": 8},
"log_policies": [
{
"pattern": "nonrepeat"
}
]
"resources": {"slots": 4, "max_slots": 8}
}`
// DefaultConstraintsStr is the default constraints val used for tests.
DefaultConstraintsStr = `{"priority_limit": 10, "resources": {"max_slots": 8}}`
Expand Down
13 changes: 8 additions & 5 deletions master/internal/configpolicy/task_config_policy_intg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,11 +471,6 @@ func TestMergeWithInvariantExperimentConfigs(t *testing.T) {
"read_only": true,
"propagation": "cluster-wide"
}
],
"log_policies": [
{
"pattern": "nonrepeat"
}
]
}`

Expand Down Expand Up @@ -663,6 +658,7 @@ func TestMergeSlicesAndMaps(t *testing.T) {
},
"log_policies": [
{
"name": "nonrepeat policy",
"pattern": "nonrepeat"
}
]
Expand Down Expand Up @@ -702,6 +698,7 @@ func TestMergeSlicesAndMaps(t *testing.T) {
},
"log_policies": [
{
"name": "repeat policy",
"pattern": "repeat"
}
]
Expand Down Expand Up @@ -769,9 +766,11 @@ func TestMergeSlicesAndMaps(t *testing.T) {
},
"log_policies": [
{
"name": "nonrepeat policy",
"pattern": "nonrepeat"
},
{
"name": "repeat policy",
"pattern": "repeat"
}
]
Expand Down Expand Up @@ -811,6 +810,7 @@ func TestMergeSlicesAndMaps(t *testing.T) {
},
"log_policies": [
{
"name": "global repeat policy",
"pattern": "gloablrepeat"
}
]
Expand Down Expand Up @@ -894,12 +894,15 @@ func TestMergeSlicesAndMaps(t *testing.T) {
},
"log_policies": [
{
"name": "nonrepeat policy",
"pattern": "nonrepeat"
},
{
"name": "repeat policy",
"pattern": "repeat"
},
{
"name": "global repeat policy",
"pattern": "gloablrepeat"
}
]
Expand Down
Loading

0 comments on commit c71617c

Please sign in to comment.