From 08351246303ab2c56a1a0f5581b0b794512ed9ec Mon Sep 17 00:00:00 2001 From: Jeremy Lewi Date: Sat, 21 Sep 2024 12:52:44 -0700 Subject: [PATCH] An initial version of ResultManager. Write the SQL for use with generatec. --- app/api/experiment.go | 3 + app/pkg/analyze/fsql/eval_query.sql | 9 + app/pkg/analyze/fsql/eval_query.sql.go | 41 ++ app/pkg/analyze/fsql/models.go | 6 + app/pkg/analyze/fsql/query.sql | 2 +- app/pkg/analyze/fsql/schema.sql | 11 + app/pkg/analyze/fsql/sqlc.yaml | 4 +- app/pkg/analyze/session_manager.go | 5 +- app/pkg/eval/assertor.go | 250 ++++----- app/pkg/eval/distance.go | 3 + app/pkg/eval/evaluator.go | 670 ++++++++++--------------- app/pkg/eval/evaluator_test.go | 187 ++++--- app/pkg/eval/reconcilers.go | 83 --- app/pkg/eval/results_manager.go | 173 +++---- app/pkg/eval/results_manager_test.go | 58 +++ app/pkg/eval/service.go | 9 +- app/pkg/eval/service_test.go | 25 +- 17 files changed, 729 insertions(+), 810 deletions(-) create mode 100644 app/pkg/analyze/fsql/eval_query.sql create mode 100644 app/pkg/analyze/fsql/eval_query.sql.go delete mode 100644 app/pkg/eval/reconcilers.go create mode 100644 app/pkg/eval/results_manager_test.go diff --git a/app/api/experiment.go b/app/api/experiment.go index 3a8595cb..179f0c6c 100644 --- a/app/api/experiment.go +++ b/app/api/experiment.go @@ -13,6 +13,9 @@ type Experiment struct { } type ExperimentSpec struct { + // AgentAddress is the address of the agent to use to generate completions + AgentAddress string `json:"agentAddress" yaml:"agentAddress"` + // EvalDir is the directory containing the evaluation the evaluation input EvalDir string `json:"evalDir" yaml:"evalDir"` diff --git a/app/pkg/analyze/fsql/eval_query.sql b/app/pkg/analyze/fsql/eval_query.sql new file mode 100644 index 00000000..a5473eb9 --- /dev/null +++ b/app/pkg/analyze/fsql/eval_query.sql @@ -0,0 +1,9 @@ +-- name: UpdateResult :exec +INSERT OR REPLACE INTO results +(id, time, proto_json) +VALUES +(?, ?, ?); + +-- name: GetResult :one +SELECT * FROM results +WHERE id = ?; \ No newline at end of file diff --git a/app/pkg/analyze/fsql/eval_query.sql.go b/app/pkg/analyze/fsql/eval_query.sql.go new file mode 100644 index 00000000..3f4f83db --- /dev/null +++ b/app/pkg/analyze/fsql/eval_query.sql.go @@ -0,0 +1,41 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.27.0 +// source: eval_query.sql + +package fsql + +import ( + "context" + "time" +) + +const getResult = `-- name: GetResult :one +SELECT id, time, proto_json FROM results +WHERE id = ? +` + +func (q *Queries) GetResult(ctx context.Context, id string) (Result, error) { + row := q.db.QueryRowContext(ctx, getResult, id) + var i Result + err := row.Scan(&i.ID, &i.Time, &i.ProtoJson) + return i, err +} + +const updateResult = `-- name: UpdateResult :exec +INSERT OR REPLACE INTO results +(id, time, proto_json) +VALUES +(?, ?, ?) +` + +type UpdateResultParams struct { + ID string + Time time.Time + ProtoJson string +} + +func (q *Queries) UpdateResult(ctx context.Context, arg UpdateResultParams) error { + _, err := q.db.ExecContext(ctx, updateResult, arg.ID, arg.Time, arg.ProtoJson) + return err +} diff --git a/app/pkg/analyze/fsql/models.go b/app/pkg/analyze/fsql/models.go index e9290994..acf66881 100644 --- a/app/pkg/analyze/fsql/models.go +++ b/app/pkg/analyze/fsql/models.go @@ -8,6 +8,12 @@ import ( "time" ) +type Result struct { + ID string + Time time.Time + ProtoJson string +} + type Session struct { Contextid string Starttime time.Time diff --git a/app/pkg/analyze/fsql/query.sql b/app/pkg/analyze/fsql/query.sql index 36804361..b0782bbc 100644 --- a/app/pkg/analyze/fsql/query.sql +++ b/app/pkg/analyze/fsql/query.sql @@ -20,4 +20,4 @@ WHERE contextID = ?; INSERT OR REPLACE INTO sessions (contextID, startTime, endTime, selectedId, selectedKind, total_input_tokens, total_output_tokens, num_generate_traces, proto) VALUES -(?, ?, ?, ?, ?, ?, ?, ?, ?); \ No newline at end of file +(?, ?, ?, ?, ?, ?, ?, ?, ?); diff --git a/app/pkg/analyze/fsql/schema.sql b/app/pkg/analyze/fsql/schema.sql index f23de11a..38d3106c 100644 --- a/app/pkg/analyze/fsql/schema.sql +++ b/app/pkg/analyze/fsql/schema.sql @@ -23,3 +23,14 @@ CREATE TABLE IF NOT EXISTS sessions ( -- TODO(jeremy): Should we store the proto in JSON format so that we can run SQL queries on values in it? proto BLOB ); + +-- Results contains evaluation results +CREATE TABLE IF NOT EXISTS results ( + id VARCHAR(255) PRIMARY KEY, + -- time is the time of the evaluation example + -- protobufs can't have null timestamps so no point allowing nulls + time TIMESTAMP NOT NULL, + + -- The JSON serialization of the proto. + proto_json TEXT NOT NULL +); \ No newline at end of file diff --git a/app/pkg/analyze/fsql/sqlc.yaml b/app/pkg/analyze/fsql/sqlc.yaml index 67caeb53..311cce97 100644 --- a/app/pkg/analyze/fsql/sqlc.yaml +++ b/app/pkg/analyze/fsql/sqlc.yaml @@ -1,7 +1,9 @@ version: "2" sql: - engine: "sqlite" - queries: "query.sql" + queries: + - "eval_query.sql" + - "query.sql" schema: "schema.sql" gen: go: diff --git a/app/pkg/analyze/session_manager.go b/app/pkg/analyze/session_manager.go index 41d9e577..a6a6a594 100644 --- a/app/pkg/analyze/session_manager.go +++ b/app/pkg/analyze/session_manager.go @@ -283,6 +283,10 @@ func (m *SessionsManager) DumpExamples(ctx context.Context, request *connect.Req } // protoToRow converts from the proto representation of a session to the database row representation. +// +// TODO(jeremy): I think it would be better to make the return type fsql.UpdateSessionParams. Right now the only +// place this function gets called is in the Update method and the returned value is immediately converted to +// fsql.UpdateSessionParams. func protoToRow(session *logspb.Session) (*fsql.Session, error) { log := logs.NewLogger() protoBytes, err := proto.Marshal(session) @@ -303,7 +307,6 @@ func protoToRow(session *logspb.Session) (*fsql.Session, error) { } } - // TODO: How do we deal with the end/starttime? In sqlc should we specify the type as timestamp? return &fsql.Session{ Contextid: session.ContextId, Starttime: session.StartTime.AsTime(), diff --git a/app/pkg/eval/assertor.go b/app/pkg/eval/assertor.go index 548b15f6..30c8dbc4 100644 --- a/app/pkg/eval/assertor.go +++ b/app/pkg/eval/assertor.go @@ -3,23 +3,16 @@ package eval import ( "context" "crypto/tls" - "net" - "net/http" - "os" - - "github.com/jlewi/foyle/app/pkg/dbutil" - "github.com/jlewi/foyle/app/pkg/docs" - "github.com/cockroachdb/pebble" "github.com/jlewi/foyle/app/api" "github.com/jlewi/foyle/app/pkg/config" "github.com/jlewi/foyle/app/pkg/logs" - "github.com/jlewi/foyle/protos/go/foyle/v1alpha1" "github.com/jlewi/foyle/protos/go/foyle/v1alpha1/v1alpha1connect" "github.com/jlewi/monogo/helpers" "github.com/pkg/errors" "golang.org/x/net/http2" - "google.golang.org/protobuf/proto" + "net" + "net/http" "sigs.k8s.io/kustomize/kyaml/yaml" ) @@ -43,7 +36,8 @@ func NewAssertRunner(config config.Config) (*AssertRunner, error) { func newHTTPClient() *http.Client { // N.B. We need to use HTTP2 if we want to support bidirectional streaming - //http.DefaultClient, + // TODO(jeremy): Add the OTEL transport so we report OTEL metrics? See + // https://github.com/connectrpc/otelconnect-go?tab=readme-ov-file#configuration-for-internal-services return &http.Client{ Transport: &http2.Transport{ AllowHTTP: true, @@ -54,6 +48,7 @@ func newHTTPClient() *http.Client { }, } } + func newGenerateClient(baseURL string) v1alpha1connect.GenerateServiceClient { // Create a new client client := v1alpha1connect.NewGenerateServiceClient( @@ -89,7 +84,7 @@ func (r *AssertRunner) Reconcile(ctx context.Context, job api.AssertJob) error { return errors.New("Sources must be specified") } - client := newGenerateClient(job.Spec.AgentAddress) + // client := newGenerateClient(job.Spec.AgentAddress) // Process all the sources for _, source := range job.Spec.Sources { @@ -117,10 +112,13 @@ func (r *AssertRunner) Reconcile(ctx context.Context, job api.AssertJob) error { } } + // TODO(jeremy): Should we merge this with the evaluator? How should we update this code to work now that we are + // doing simulations? + return errors.New("Not implemented; code needs to be updated to work with the new protos and the new DB schema") // Now generate predictions for any results that are missing them. - if err := reconcilePredictions(ctx, db, client); err != nil { - return err - } + //if err := reconcilePredictions(ctx, db, client); err != nil { + // return err + //} if err := reconcileAssertions(ctx, r.assertions, db); err != nil { return err @@ -130,120 +128,122 @@ func (r *AssertRunner) Reconcile(ctx context.Context, job api.AssertJob) error { // reconcileAssertions reconciles the assertions with the results func reconcileAssertions(ctx context.Context, assertions []Assertion, db *pebble.DB) error { - olog := logs.FromContext(ctx) - iter, err := db.NewIterWithContext(ctx, nil) - if err != nil { - return err - } - defer iter.Close() - - for iter.First(); iter.Valid(); iter.Next() { - key := iter.Key() - if key == nil { - break - } - - log := olog.WithValues("id", string(key)) - value, err := iter.ValueAndErr() - if err != nil { - return errors.Wrapf(err, "Failed to read value for key %s", string(key)) - } - - result := &v1alpha1.EvalResult{} - if err := proto.Unmarshal(value, result); err != nil { - return errors.Wrapf(err, "Failed to unmarshal value for key %s", string(key)) - } - - actual := make(map[string]bool) - for _, a := range result.GetAssertions() { - actual[a.GetName()] = true - } - - if result.Assertions == nil { - result.Assertions = make([]*v1alpha1.Assertion, 0, len(assertions)) - } - - for _, a := range assertions { - if _, ok := actual[a.Name()]; ok { - continue - } - - // Run the assertion - newA, err := a.Assert(ctx, result.Example.Query, nil, result.Actual) - - if err != nil { - log.Error(err, "Failed to run assertion", "name", a.Name()) - } - - result.Assertions = append(result.Assertions, newA) - } - - if err := updateResult(ctx, string(key), result, db); err != nil { - return err - } - } - return nil + return errors.New("This code needs to be updated to work with the new protos and the new DB schema") + //olog := logs.FromContext(ctx) + //iter, err := db.NewIterWithContext(ctx, nil) + //if err != nil { + // return err + //} + //defer iter.Close() + // + //for iter.First(); iter.Valid(); iter.Next() { + // key := iter.Key() + // if key == nil { + // break + // } + // + // log := olog.WithValues("id", string(key)) + // value, err := iter.ValueAndErr() + // if err != nil { + // return errors.Wrapf(err, "Failed to read value for key %s", string(key)) + // } + // + // result := &v1alpha1.EvalResult{} + // if err := proto.Unmarshal(value, result); err != nil { + // return errors.Wrapf(err, "Failed to unmarshal value for key %s", string(key)) + // } + // + // actual := make(map[string]bool) + // for _, a := range result.GetAssertions() { + // actual[a.GetName()] = true + // } + // + // if result.Assertions == nil { + // result.Assertions = make([]*v1alpha1.Assertion, 0, len(assertions)) + // } + // + // for _, a := range assertions { + // if _, ok := actual[a.Name()]; ok { + // continue + // } + // + // // Run the assertion + // newA, err := a.Assert(ctx, result.Example.Query, nil, result.Actual) + // + // if err != nil { + // log.Error(err, "Failed to run assertion", "name", a.Name()) + // } + // + // result.Assertions = append(result.Assertions, newA) + // } + // + // if err := updateResult(ctx, string(key), result, db); err != nil { + // return err + // } + //} + //return nil } // loadMarkdownFiles loads a bunch of markdown files into example protos. // Unlike loadMarkdownAnswerFiles this function doesn't load any answers. func loadMarkdownFiles(ctx context.Context, db *pebble.DB, files []string) error { - oLog := logs.FromContext(ctx) - - allErrors := &helpers.ListOfErrors{} - for _, path := range files { - log := oLog.WithValues("path", path) - log.Info("Processing file") - - contents, err := os.ReadFile(path) - if err != nil { - log.Error(err, "Failed to read file") - allErrors.AddCause(err) - // Keep going - continue - } - - doc := &v1alpha1.Doc{} - - blocks, err := docs.MarkdownToBlocks(string(contents)) - if err != nil { - log.Error(err, "Failed to convert markdown to blocks") - allErrors.AddCause(err) - // Keep going - continue - } - - doc.Blocks = blocks - - if len(doc.GetBlocks()) < 2 { - log.Info("Skipping doc; too few blocks; at least two are required") - continue - } - - // We generate a stable ID for the example by hashing the contents of the document. - example := &v1alpha1.Example{ - Query: doc, - } - example.Id = HashExample(example) - - result := &v1alpha1.EvalResult{ - Example: example, - ExampleFile: path, - // initialize distance to a negative value so we can tell when it hasn't been computed - Distance: uninitializedDistance, - } - - if err := dbutil.SetProto(db, example.GetId(), result); err != nil { - log.Error(err, "Failed to write result to DB") - allErrors.AddCause(err) - // Keep going - continue - } - } - - if len(allErrors.Causes) > 0 { - return allErrors - } - - return nil + return errors.New("This function should no longer be needed with the new protos and use of sqllit") + //oLog := logs.FromContext(ctx) + // + //allErrors := &helpers.ListOfErrors{} + //for _, path := range files { + // log := oLog.WithValues("path", path) + // log.Info("Processing file") + // + // contents, err := os.ReadFile(path) + // if err != nil { + // log.Error(err, "Failed to read file") + // allErrors.AddCause(err) + // // Keep going + // continue + // } + // + // doc := &v1alpha1.Doc{} + // + // blocks, err := docs.MarkdownToBlocks(string(contents)) + // if err != nil { + // log.Error(err, "Failed to convert markdown to blocks") + // allErrors.AddCause(err) + // // Keep going + // continue + // } + // + // doc.Blocks = blocks + // + // if len(doc.GetBlocks()) < 2 { + // log.Info("Skipping doc; too few blocks; at least two are required") + // continue + // } + // + // // We generate a stable ID for the example by hashing the contents of the document. + // example := &v1alpha1.Example{ + // Query: doc, + // } + // example.Id = HashExample(example) + // + // result := &v1alpha1.EvalResult{ + // Example: example, + // ExampleFile: path, + // // initialize distance to a negative value so we can tell when it hasn't been computed + // Distance: uninitializedDistance, + // } + // + // if err := dbutil.SetProto(db, example.GetId(), result); err != nil { + // log.Error(err, "Failed to write result to DB") + // allErrors.AddCause(err) + // // Keep going + // continue + // } + //} + // + //if len(allErrors.Causes) > 0 { + // return allErrors + //} + // + //return nil } diff --git a/app/pkg/eval/distance.go b/app/pkg/eval/distance.go index a863e020..fb3bf023 100644 --- a/app/pkg/eval/distance.go +++ b/app/pkg/eval/distance.go @@ -24,6 +24,9 @@ type DistanceResult struct { Normalized float32 } +// TODO(jeremy): Should we delete the distance code? I don't think it is useful as a metric given that code cells +// are often mini programs that violate the assumptions of the distance metric. + // Distance computes the distance between two instructions // // For details refer to tn003_learning_eval.md. diff --git a/app/pkg/eval/evaluator.go b/app/pkg/eval/evaluator.go index ab7d3944..7402cef5 100644 --- a/app/pkg/eval/evaluator.go +++ b/app/pkg/eval/evaluator.go @@ -9,29 +9,16 @@ import ( "sort" "time" - logspb "github.com/jlewi/foyle/protos/go/foyle/logs" - - "github.com/go-cmd/cmd" - - "github.com/jlewi/foyle/app/pkg/dbutil" - "github.com/jlewi/foyle/app/api" - "github.com/jlewi/foyle/app/pkg/agent" - "github.com/jlewi/foyle/app/pkg/oai" "sigs.k8s.io/kustomize/kyaml/yaml" "github.com/cockroachdb/pebble" "github.com/jlewi/foyle/app/pkg/config" - "github.com/jlewi/foyle/app/pkg/docs" "github.com/jlewi/foyle/app/pkg/executor" "github.com/jlewi/foyle/app/pkg/logs" "github.com/jlewi/foyle/protos/go/foyle/v1alpha1" "github.com/jlewi/monogo/helpers" "github.com/pkg/errors" - "google.golang.org/api/googleapi" - "google.golang.org/api/impersonate" - "google.golang.org/api/option" - "google.golang.org/api/sheets/v4" "google.golang.org/protobuf/proto" ) @@ -85,10 +72,7 @@ func (e *Evaluator) Reconcile(ctx context.Context, experiment api.Experiment) er if experiment.Spec.Agent == nil { return errors.New("Agent is required") } - agent, err := e.setupAgent(ctx, *experiment.Spec.Agent) - if err != nil { - return err - } + aiClient := newAIServiceClient(experiment.Spec.AgentAddress) // Find all the binary protobuf files in the eval directory. // This should contain EvalExample protos. @@ -131,7 +115,7 @@ func (e *Evaluator) Reconcile(ctx context.Context, experiment api.Experiment) er sortEvalExamplesInTime(examples) // Now generate predictions for any results that are missing them. - if err := e.processExamples(ctx, db, agent); err != nil { + if err := e.processExamples(ctx, examples, lastProcessedTime, aiClient); err != nil { return err } @@ -147,59 +131,12 @@ func (e *Evaluator) Reconcile(ctx context.Context, experiment api.Experiment) er return err } - // Compute the distance - if err := e.reconcileDistance(ctx, db); err != nil { - return err - } - - // Update the Google Sheet - if err := e.updateGoogleSheet(ctx, experiment, db); err != nil { - return err - } return nil } -func (e *Evaluator) setupAgent(ctx context.Context, agentConfig api.AgentConfig) (*agent.Agent, error) { - cfg := e.config.DeepCopy() - - // Swap out the AgentConfig - cfg.Agent = &agentConfig - - // Ensure we are in evaluation mode. - cfg.Agent.EvalMode = true - - client, err := oai.NewClient(cfg) - if err != nil { - return nil, err - } - - // TODO(jeremy): This will need to be updated when we support other configurations. - completer, err := oai.NewCompleter(cfg, client) - if err != nil { - return nil, err - } - - log := logs.FromContext(ctx) - log.Info("Creating agen without inMemoryExampleDB", "config", cfg.Agent) - if cfg.Agent.RAG != nil && cfg.Agent.RAG.Enabled { - return nil, errors.New("RAG is enabled but eval code needs to be updated to ddeal with streaming logs") - } - - // TODO(jeremy): How should we construct inMemoryExampleDB? In the eval case? - agent, err := agent.NewAgent(cfg, completer, nil) - - if err != nil { - return nil, err - } - return agent, nil -} - -func (e *Evaluator) processExamples(ctx context.Context, examples []*v1alpha1.EvalExample, lastProcessedTime time.Time) error { +func (e *Evaluator) processExamples(ctx context.Context, examples []*v1alpha1.EvalExample, lastProcessedTime time.Time, client v1alpha1connect.AIServiceClient) error { oLog := logs.FromContext(ctx) - // TODO(jeremy): where should this actually be created and set - var client v1alpha1connect.AIServiceClient - // Where do we set this var manager *ResultsManager @@ -216,12 +153,9 @@ func (e *Evaluator) processExamples(ctx context.Context, examples []*v1alpha1.Ev Notebook: example.GetFullContext().GetNotebook(), } - result := &v1alpha1.EvalResult{} - resp, err := client.GenerateCells(ctx, connect.NewRequest(request)) if err != nil { log.Error(err, "Failed to generate cells") - result.Error = err.Error() uErr := manager.Update(ctx, example.GetId(), func(result *v1alpha1.EvalResult) error { result.Error = err.Error() return nil @@ -232,39 +166,22 @@ func (e *Evaluator) processExamples(ctx context.Context, examples []*v1alpha1.Ev continue } - // Left off editing here - - if len(result.GetActual()) > 0 { - log.Info("Skipping; already have answer", "path", result.ExampleFile) - // We have the answer so we don't need to generate it. - continue + uErr := manager.Update(ctx, example.GetId(), func(result *v1alpha1.EvalResult) error { + result.ActualCells = resp.Msg.GetCells() + return nil + }) + if uErr != nil { + log.Error(uErr, "Failed to update result") } - if len(result.Actual) == 0 { - // Initialize a trace - resp, err := func() (*v1alpha1.GenerateResponse, error) { - newCtx, span := tracer().Start(ctx, "(*Evaluator).reconcilePredictions") - defer span.End() - - // We need to generate the answer. - return agent.Generate(newCtx, &v1alpha1.GenerateRequest{ - Doc: result.Example.Query, - }) - }() - if err != nil { - result.Error = err.Error() - result.Status = v1alpha1.EvalResultStatus_ERROR - continue - } - - result.Actual = resp.GetBlocks() - result.GenTraceId = resp.GetTraceId() + // TODO(jeremy): We should set the traceId based on OTEL. + // There's a couple of ways we could do this. + // 1. We could have the client set the traceId but then we'd have to configure the server to trust the client + // trace per https://github.com/connectrpc/otelconnect-go?tab=readme-ov-file#configuration-for-internal-services + // 2. The server could set the trace id and I believe it should be in the response? and then the client can + // get it? + // result.GenTraceId = resp.Msg.GetTraceId() - log.Info("Writing result to DB") - if err := updateResult(ctx, string(key), result, db); err != nil { - return errors.Wrapf(err, "Failed to write result to DB") - } - } } return nil } @@ -280,310 +197,191 @@ func (e *Evaluator) processExamples(ctx context.Context, examples []*v1alpha1.Ev // return nil //} -func (e *Evaluator) reconcileDistance(ctx context.Context, db *pebble.DB) error { - olog := logs.FromContext(ctx) - iter, err := db.NewIterWithContext(ctx, nil) - if err != nil { - return err - } - defer iter.Close() - - for iter.First(); iter.Valid(); iter.Next() { - key := iter.Key() - if key == nil { - break - } - - log := olog.WithValues("id", string(key)) - value, err := iter.ValueAndErr() - if err != nil { - return errors.Wrapf(err, "Failed to read value for key %s", string(key)) - } - - result := &v1alpha1.EvalResult{} - if err := proto.Unmarshal(value, result); err != nil { - return errors.Wrapf(err, "Failed to unmarshal value for key %s", string(key)) - } - - if result.Distance >= 0 && result.Status != v1alpha1.EvalResultStatus_UNKNOWN_EVAL_RESULT_STATUS { - log.Info("Skipping; distance already computed") - continue - } - - updateEvalResultDistance(ctx, e.parser, result) - log.Info("Updating distance", "distance", result.Distance) - if err := updateResult(ctx, string(key), result, db); err != nil { - log.Error(err, "Failed to update result") - } - } - return nil -} +//func (e *Evaluator) reconcileDistance(ctx context.Context, db *pebble.DB) error { +// olog := logs.FromContext(ctx) +// iter, err := db.NewIterWithContext(ctx, nil) +// if err != nil { +// return err +// } +// defer iter.Close() +// +// for iter.First(); iter.Valid(); iter.Next() { +// key := iter.Key() +// if key == nil { +// break +// } +// +// log := olog.WithValues("id", string(key)) +// value, err := iter.ValueAndErr() +// if err != nil { +// return errors.Wrapf(err, "Failed to read value for key %s", string(key)) +// } +// +// result := &v1alpha1.EvalResult{} +// if err := proto.Unmarshal(value, result); err != nil { +// return errors.Wrapf(err, "Failed to unmarshal value for key %s", string(key)) +// } +// +// if result.Distance >= 0 && result.Status != v1alpha1.EvalResultStatus_UNKNOWN_EVAL_RESULT_STATUS { +// log.Info("Skipping; distance already computed") +// continue +// } +// +// updateEvalResultDistance(ctx, e.parser, result) +// log.Info("Updating distance", "distance", result.Distance) +// if err := updateResult(ctx, string(key), result, db); err != nil { +// log.Error(err, "Failed to update result") +// } +// } +// return nil +//} func (e *Evaluator) reconcileBestRAGResult(ctx context.Context, db *pebble.DB, traces *pebble.DB) error { - olog := logs.FromContext(ctx) - iter, err := db.NewIterWithContext(ctx, nil) - if err != nil { - return err - } - defer iter.Close() - - for iter.First(); iter.Valid(); iter.Next() { - key := iter.Key() - if key == nil { - break - } - - log := olog.WithValues("id", string(key)) - value, err := iter.ValueAndErr() - if err != nil { - return errors.Wrapf(err, "Failed to read value for key %s", string(key)) - } - - result := &v1alpha1.EvalResult{} - if err := proto.Unmarshal(value, result); err != nil { - return errors.Wrapf(err, "Failed to unmarshal value for key %s", string(key)) - } - - // TODO(jeremy): How do we skip this step in the case where the experiment didn't involve RAG - if result.BestRagResult != nil { - log.Info("Skipping; best RAG result already computed") - continue - } - - genTrace := &logspb.Trace{} - if err := dbutil.GetProto(traces, result.GenTraceId, genTrace); err != nil { - log.Error(err, "Failed to read gen trace", "id", result.GenTraceId) - continue - } - - for _, span := range genTrace.Spans { - if span.GetRag() == nil { - continue - } - rag := span.GetRag() - if rag.Results == nil { - continue - } - - for _, ragResult := range rag.Results { - if ragResult.Example == nil { - continue - } - if result.BestRagResult == nil { - result.BestRagResult = ragResult - continue - } - - if result.BestRagResult.Score < ragResult.Score { - result.BestRagResult = ragResult - } - } - } - - if result.BestRagResult == nil { - continue - } - if err := updateResult(ctx, string(key), result, db); err != nil { - log.Error(err, "Failed to update result") - } - } - return nil -} - -func updateEvalResultDistance(ctx context.Context, parser *executor.BashishParser, result *v1alpha1.EvalResult) { - log := logs.FromContext(ctx).WithValues("id", result.GetExample().GetId()) - var actualBlock *v1alpha1.Block - - for _, b := range result.Actual { - if b.Kind == v1alpha1.BlockKind_CODE { - actualBlock = b - break - } - } - - if len(result.Example.GetAnswer()) > 1 { - log.Info("Warning; expected answer more than one answer block. Only the first is used") - } - - expected, err := parser.Parse(result.Example.Answer[0].GetContents()) - if err != nil { - log.Error(err, "Failed to parse expected answer to command") - result.Error = err.Error() - result.Status = v1alpha1.EvalResultStatus_ERROR - return - } - - var actual []executor.Instruction - if actualBlock != nil { - parsed, err := parser.Parse(actualBlock.GetContents()) - if err != nil { - log.Error(err, "Failed to parse actual answer to command") - result.Error = err.Error() - result.Status = v1alpha1.EvalResultStatus_ERROR - return - } - actual = parsed - } else { - // Since there is no code block. Initialize actual to an empty command. - // This will cause the distance computed to be the maximum possible distance which is what we want - actual = []executor.Instruction{ - { - Command: cmd.NewCmd(""), - }, - } - } - - distance, err := Distance(expected[0], actual[0]) - - if err != nil { - log.Error(err, "Failed to compute distance") - result.Error = err.Error() - result.Status = v1alpha1.EvalResultStatus_ERROR - return - } - - if distance.Max < distance.Distance { - log.Error(errors.New("Distance is greater than max distance"), "Distance is greater than max distance", "distance", distance.Distance, "max", distance.Max) - } - - result.Distance = int32(distance.Distance) - result.NormalizedDistance = distance.Normalized - result.Status = v1alpha1.EvalResultStatus_DONE -} - -func (e *Evaluator) updateGoogleSheet(ctx context.Context, experiment api.Experiment, db *pebble.DB) error { - log := logs.FromContext(ctx) - if e.config.Eval == nil || e.config.Eval.GCPServiceAccount == "" { - return errors.New("GCPServiceAccount is required to update Google Sheet") - } - - sheetName := experiment.Spec.SheetName - sheetID := experiment.Spec.SheetID - - if sheetID == "" { - return errors.New("SheetID is required to update Google Sheet") - } - - if sheetName == "" { - return errors.New("SheetName is required to update Google Sheet") - } - - log = log.WithValues("spreadsheetID", sheetID, "sheetName", sheetName) - log.Info("Updating Google Sheet") - credentialsConfig := &impersonate.CredentialsConfig{ - TargetPrincipal: e.config.Eval.GCPServiceAccount, - Scopes: []string{"https://www.googleapis.com/auth/spreadsheets", "https://www.googleapis.com/auth/drive"}, - } - - credentials, err := impersonate.CredentialsTokenSource(ctx, *credentialsConfig) - if err != nil { - log.Error(err, "Unable to create impersonated credentials") - return err - } - - srv, err := sheets.NewService(ctx, option.WithTokenSource(credentials)) - if err != nil { - log.Error(err, "Unable to retrieve Sheets client") - return err - } - - // Create the sheet if it doesn't exist - batchUpdateRequest := &sheets.BatchUpdateSpreadsheetRequest{ - Requests: []*sheets.Request{ - { - AddSheet: &sheets.AddSheetRequest{ - Properties: &sheets.SheetProperties{ - Title: experiment.Spec.SheetName, - }, - }, - }, - }, - } - - _, err = srv.Spreadsheets.BatchUpdate(experiment.Spec.SheetID, batchUpdateRequest).Context(ctx).Do() - if err != nil { - apiErr, ok := err.(*googleapi.Error) - if ok { - if apiErr.Code == 400 { - log.V(1).Info("Sheet already exists") - } else { - log.Error(err, "Unable to create new sheet ") - return errors.Wrapf(err, "Unable to create new sheet named: %s", sheetName) - } - } else { - return errors.Wrapf(err, "Unable to create new sheet named: %s", sheetName) - } - } - - // Prepare the value range to write - writeRange := sheetName - values := [][]interface{}{{"id", "file", "prompt", "actual", "expected", "distance", "normalized_distance", "best_rag"}} - - iter, err := db.NewIterWithContext(ctx, nil) - if err != nil { - return err - } - defer iter.Close() - - for iter.First(); iter.Valid(); iter.Next() { - key := iter.Key() - if key == nil { - break - } - - value, err := iter.ValueAndErr() - if err != nil { - return errors.Wrapf(err, "Failed to read value for key %s", string(key)) - } - - result := &v1alpha1.EvalResult{} - if err := proto.Unmarshal(value, result); err != nil { - return errors.Wrapf(err, "Failed to unmarshal value for key %s", string(key)) - } - - prompt := docs.DocToMarkdown(result.Example.Query) - row := []interface{}{result.Example.Id, result.ExampleFile, prompt, docs.BlocksToMarkdown(result.Actual), docs.BlocksToMarkdown(result.Example.Answer), result.Distance, result.NormalizedDistance} - - bestRAG := "" - if result.BestRagResult != nil { - if result.BestRagResult.Example.Query != nil { - bestRAG = docs.DocToMarkdown(result.BestRagResult.Example.Query) - } - } - row = append(row, bestRAG) - values = append(values, row) - } - valueRange := &sheets.ValueRange{ - Values: values, - } - - // Write the value range to the sheet - _, err = srv.Spreadsheets.Values.Update(sheetID, writeRange, valueRange). - ValueInputOption("USER_ENTERED"). - Context(ctx). - Do() - if err != nil { - log.Error(err, "Unable to write data to sheet") - return errors.Wrapf(err, "Unable to write data to sheet") - } - - return nil + return errors.New("This code needs to be updated to work with the new protos and the new DB schema") + //olog := logs.FromContext(ctx) + //iter, err := db.NewIterWithContext(ctx, nil) + //if err != nil { + // return err + //} + //defer iter.Close() + // + //for iter.First(); iter.Valid(); iter.Next() { + // key := iter.Key() + // if key == nil { + // break + // } + // + // log := olog.WithValues("id", string(key)) + // value, err := iter.ValueAndErr() + // if err != nil { + // return errors.Wrapf(err, "Failed to read value for key %s", string(key)) + // } + // + // result := &v1alpha1.EvalResult{} + // if err := proto.Unmarshal(value, result); err != nil { + // return errors.Wrapf(err, "Failed to unmarshal value for key %s", string(key)) + // } + // + // // TODO(jeremy): How do we skip this step in the case where the experiment didn't involve RAG + // if result.BestRagResult != nil { + // log.Info("Skipping; best RAG result already computed") + // continue + // } + // + // genTrace := &logspb.Trace{} + // if err := dbutil.GetProto(traces, result.GenTraceId, genTrace); err != nil { + // log.Error(err, "Failed to read gen trace", "id", result.GenTraceId) + // continue + // } + // + // for _, span := range genTrace.Spans { + // if span.GetRag() == nil { + // continue + // } + // rag := span.GetRag() + // if rag.Results == nil { + // continue + // } + // + // for _, ragResult := range rag.Results { + // if ragResult.Example == nil { + // continue + // } + // if result.BestRagResult == nil { + // result.BestRagResult = ragResult + // continue + // } + // + // if result.BestRagResult.Score < ragResult.Score { + // result.BestRagResult = ragResult + // } + // } + // } + // + // if result.BestRagResult == nil { + // continue + // } + // if err := updateResult(ctx, string(key), result, db); err != nil { + // log.Error(err, "Failed to update result") + // } + //} + //return nil } -//func findUnloadedFiles(ctx context.Context, db *pebble.DB, files []string) ([]string, error) { -// unprocessed := map[string]bool{} +//func (e *Evaluator) updateGoogleSheet(ctx context.Context, experiment api.Experiment, db *pebble.DB) error { +// log := logs.FromContext(ctx) +// if e.config.Eval == nil || e.config.Eval.GCPServiceAccount == "" { +// return errors.New("GCPServiceAccount is required to update Google Sheet") +// } // -// iter, err := db.NewIterWithContext(ctx, nil) +// sheetName := experiment.Spec.SheetName +// sheetID := experiment.Spec.SheetID +// +// if sheetID == "" { +// return errors.New("SheetID is required to update Google Sheet") +// } +// +// if sheetName == "" { +// return errors.New("SheetName is required to update Google Sheet") +// } +// +// log = log.WithValues("spreadsheetID", sheetID, "sheetName", sheetName) +// log.Info("Updating Google Sheet") +// credentialsConfig := &impersonate.CredentialsConfig{ +// TargetPrincipal: e.config.Eval.GCPServiceAccount, +// Scopes: []string{"https://www.googleapis.com/auth/spreadsheets", "https://www.googleapis.com/auth/drive"}, +// } +// +// credentials, err := impersonate.CredentialsTokenSource(ctx, *credentialsConfig) // if err != nil { -// return nil, err +// log.Error(err, "Unable to create impersonated credentials") +// return err // } -// defer iter.Close() // -// for _, file := range files { -// unprocessed[file] = true +// srv, err := sheets.NewService(ctx, option.WithTokenSource(credentials)) +// if err != nil { +// log.Error(err, "Unable to retrieve Sheets client") +// return err +// } +// +// // Create the sheet if it doesn't exist +// batchUpdateRequest := &sheets.BatchUpdateSpreadsheetRequest{ +// Requests: []*sheets.Request{ +// { +// AddSheet: &sheets.AddSheetRequest{ +// Properties: &sheets.SheetProperties{ +// Title: experiment.Spec.SheetName, +// }, +// }, +// }, +// }, +// } +// +// _, err = srv.Spreadsheets.BatchUpdate(experiment.Spec.SheetID, batchUpdateRequest).Context(ctx).Do() +// if err != nil { +// apiErr, ok := err.(*googleapi.Error) +// if ok { +// if apiErr.Code == 400 { +// log.V(1).Info("Sheet already exists") +// } else { +// log.Error(err, "Unable to create new sheet ") +// return errors.Wrapf(err, "Unable to create new sheet named: %s", sheetName) +// } +// } else { +// return errors.Wrapf(err, "Unable to create new sheet named: %s", sheetName) +// } +// } +// +// // Prepare the value range to write +// writeRange := sheetName +// values := [][]interface{}{{"id", "file", "prompt", "actual", "expected", "distance", "normalized_distance", "best_rag"}} +// +// iter, err := db.NewIterWithContext(ctx, nil) +// if err != nil { +// return err // } +// defer iter.Close() // -// // Iterate over the files in the DB and remove them from the list of files to load. // for iter.First(); iter.Valid(); iter.Next() { // key := iter.Key() // if key == nil { @@ -592,27 +390,88 @@ func (e *Evaluator) updateGoogleSheet(ctx context.Context, experiment api.Experi // // value, err := iter.ValueAndErr() // if err != nil { -// // Should we ignore the error? -// return nil, errors.Wrapf(err, "Failed to read value for key %s", string(key)) +// return errors.Wrapf(err, "Failed to read value for key %s", string(key)) // } // // result := &v1alpha1.EvalResult{} // if err := proto.Unmarshal(value, result); err != nil { -// return nil, errors.Wrapf(err, "Failed to unmarshal value for key %s", string(key)) +// return errors.Wrapf(err, "Failed to unmarshal value for key %s", string(key)) // } // -// delete(unprocessed, result.ExampleFile) +// prompt := docs.DocToMarkdown(result.Example.Query) +// row := []interface{}{result.Example.Id, result.ExampleFile, prompt, docs.BlocksToMarkdown(result.Actual), docs.BlocksToMarkdown(result.Example.Answer), result.Distance, result.NormalizedDistance} // +// bestRAG := "" +// if result.BestRagResult != nil { +// if result.BestRagResult.Example.Query != nil { +// bestRAG = docs.DocToMarkdown(result.BestRagResult.Example.Query) +// } +// } +// row = append(row, bestRAG) +// values = append(values, row) +// } +// valueRange := &sheets.ValueRange{ +// Values: values, // } // -// toProcess := make([]string, 0, len(unprocessed)) -// for file := range unprocessed { -// toProcess = append(toProcess, file) +// // Write the value range to the sheet +// _, err = srv.Spreadsheets.Values.Update(sheetID, writeRange, valueRange). +// ValueInputOption("USER_ENTERED"). +// Context(ctx). +// Do() +// if err != nil { +// log.Error(err, "Unable to write data to sheet") +// return errors.Wrapf(err, "Unable to write data to sheet") // } // -// return toProcess, nil +// return nil //} +// TODO(jeremy): We should get rid of this function and one that calls it +func findUnloadedFiles(ctx context.Context, db *pebble.DB, files []string) ([]string, error) { + return nil, errors.New("findUnloadedFiles needs to be updated to work with new DB and protos") + //unprocessed := map[string]bool{} + // + //iter, err := db.NewIterWithContext(ctx, nil) + //if err != nil { + // return nil, err + //} + //defer iter.Close() + // + //for _, file := range files { + // unprocessed[file] = true + //} + // + //// Iterate over the files in the DB and remove them from the list of files to load. + //for iter.First(); iter.Valid(); iter.Next() { + // key := iter.Key() + // if key == nil { + // break + // } + // + // value, err := iter.ValueAndErr() + // if err != nil { + // // Should we ignore the error? + // return nil, errors.Wrapf(err, "Failed to read value for key %s", string(key)) + // } + // + // result := &v1alpha1.EvalResult{} + // if err := proto.Unmarshal(value, result); err != nil { + // return nil, errors.Wrapf(err, "Failed to unmarshal value for key %s", string(key)) + // } + // + // delete(unprocessed, result.ExampleFile) + // + //} + // + //toProcess := make([]string, 0, len(unprocessed)) + //for file := range unprocessed { + // toProcess = append(toProcess, file) + //} + // + //return toProcess, nil +} + // listEvalFiles returns a list of the all the binary protobuf files in the directory evalDir. func listEvalFiles(ctx context.Context, evalDir string) ([]string, error) { examples := make([]string, 0, 100) @@ -713,3 +572,12 @@ func sortEvalExamplesInTime(examples []*v1alpha1.EvalExample) { return timeI.Before(timeJ) }) } + +func newAIServiceClient(baseURL string) v1alpha1connect.AIServiceClient { + // Create a new client + client := v1alpha1connect.NewAIServiceClient( + newHTTPClient(), + baseURL, + ) + return client +} diff --git a/app/pkg/eval/evaluator_test.go b/app/pkg/eval/evaluator_test.go index c77135d4..b000409e 100644 --- a/app/pkg/eval/evaluator_test.go +++ b/app/pkg/eval/evaluator_test.go @@ -6,15 +6,10 @@ import ( "path/filepath" "testing" - "github.com/jlewi/foyle/app/pkg/executor" - "github.com/jlewi/foyle/protos/go/foyle/v1alpha1" - "github.com/jlewi/foyle/app/api" "github.com/pkg/errors" - "github.com/cockroachdb/pebble" "github.com/jlewi/foyle/app/pkg/config" - "github.com/jlewi/monogo/helpers" "go.uber.org/zap" ) @@ -51,43 +46,43 @@ func Test_Evaluator(t *testing.T) { } } -func Test_Evaluator_Google_Sheets(t *testing.T) { - if os.Getenv("GITHUB_ACTIONS") != "" { - t.Skipf("Test is skipped in GitHub actions") - } - - t.Fatalf("Evaluator test needs to be updated per https://github.com/jlewi/foyle/issues/140") - - log, err := zap.NewDevelopmentConfig().Build() - if err != nil { - t.Fatalf("Error creating logger; %v", err) - } - zap.ReplaceGlobals(log) - - if err := config.InitViper(nil); err != nil { - t.Fatalf("Error initializing Viper; %v", err) - } - cfg := config.GetConfig() - - e, err := NewEvaluator(*cfg) - if err != nil { - t.Fatalf("Error creating evaluator; %v", err) - } - - experiment, err := experimentForTesting() - if err != nil { - t.Fatalf("Error creating experiment; %v", err) - } - - db, err := pebble.Open(experiment.Spec.DBDir, &pebble.Options{}) - if err != nil { - t.Fatalf("Error opening DB; %v", err) - } - defer helpers.DeferIgnoreError(db.Close) - if err := e.updateGoogleSheet(context.Background(), *experiment, db); err != nil { - t.Fatalf("Error updating Google Sheet; %v", err) - } -} +//func Test_Evaluator_Google_Sheets(t *testing.T) { +// if os.Getenv("GITHUB_ACTIONS") != "" { +// t.Skipf("Test is skipped in GitHub actions") +// } +// +// t.Fatalf("Evaluator test needs to be updated per https://github.com/jlewi/foyle/issues/140") +// +// log, err := zap.NewDevelopmentConfig().Build() +// if err != nil { +// t.Fatalf("Error creating logger; %v", err) +// } +// zap.ReplaceGlobals(log) +// +// if err := config.InitViper(nil); err != nil { +// t.Fatalf("Error initializing Viper; %v", err) +// } +// cfg := config.GetConfig() +// +// e, err := NewEvaluator(*cfg) +// if err != nil { +// t.Fatalf("Error creating evaluator; %v", err) +// } +// +// experiment, err := experimentForTesting() +// if err != nil { +// t.Fatalf("Error creating experiment; %v", err) +// } +// +// db, err := pebble.Open(experiment.Spec.DBDir, &pebble.Options{}) +// if err != nil { +// t.Fatalf("Error opening DB; %v", err) +// } +// defer helpers.DeferIgnoreError(db.Close) +// //if err := e.updateGoogleSheet(context.Background(), *experiment, db); err != nil { +// // t.Fatalf("Error updating Google Sheet; %v", err) +// //} +//} func experimentForTesting() (*api.Experiment, error) { cwd, err := os.Getwd() @@ -116,57 +111,57 @@ func experimentForTesting() (*api.Experiment, error) { }, nil } -func Test_updateEvalResultDistance(t *testing.T) { - type testCase struct { - name string - result *v1alpha1.EvalResult - expectedDistance int32 - expectedNormalized float32 - } - - cases := []testCase{ - { - // Test the case where the actual answer contains no codeblocks - name: "nocodeblocks", - result: &v1alpha1.EvalResult{ - Example: &v1alpha1.Example{ - Id: "1234", - Answer: []*v1alpha1.Block{ - { - Kind: v1alpha1.BlockKind_CODE, - Contents: "gcloud builds list", - }, - }, - }, - ExampleFile: "", - Actual: []*v1alpha1.Block{ - { - Kind: v1alpha1.BlockKind_MARKUP, - Contents: "Not a code cell", - }, - }, - }, - expectedDistance: 3, - expectedNormalized: 1.0, - }, - } - parser, err := executor.NewBashishParser() - if err != nil { - t.Fatalf("Error creating parser; %v", err) - } - - for _, c := range cases { - t.Run(c.name, func(t *testing.T) { - updateEvalResultDistance(context.Background(), parser, c.result) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - if c.result.Distance != c.expectedDistance { - t.Errorf("Expected distance %d but got %d", c.expectedDistance, c.result.Distance) - } - if c.result.NormalizedDistance != c.expectedNormalized { - t.Errorf("Expected normalized distance %f but got %f", c.expectedNormalized, c.result.NormalizedDistance) - } - }) - } -} +//func Test_updateEvalResultDistance(t *testing.T) { +// type testCase struct { +// name string +// result *v1alpha1.EvalResult +// expectedDistance int32 +// expectedNormalized float32 +// } +// +// cases := []testCase{ +// { +// // Test the case where the actual answer contains no codeblocks +// name: "nocodeblocks", +// result: &v1alpha1.EvalResult{ +// Example: &v1alpha1.EvalExample{ +// Id: "1234", +// Answer: []*v1alpha1.Block{ +// { +// Kind: v1alpha1.BlockKind_CODE, +// Contents: "gcloud builds list", +// }, +// }, +// }, +// ExampleFile: "", +// Actual: []*v1alpha1.Block{ +// { +// Kind: v1alpha1.BlockKind_MARKUP, +// Contents: "Not a code cell", +// }, +// }, +// }, +// expectedDistance: 3, +// expectedNormalized: 1.0, +// }, +// } +// parser, err := executor.NewBashishParser() +// if err != nil { +// t.Fatalf("Error creating parser; %v", err) +// } +// +// for _, c := range cases { +// t.Run(c.name, func(t *testing.T) { +// updateEvalResultDistance(context.Background(), parser, c.result) +// if err != nil { +// t.Fatalf("Unexpected error: %v", err) +// } +// if c.result.Distance != c.expectedDistance { +// t.Errorf("Expected distance %d but got %d", c.expectedDistance, c.result.Distance) +// } +// if c.result.NormalizedDistance != c.expectedNormalized { +// t.Errorf("Expected normalized distance %f but got %f", c.expectedNormalized, c.result.NormalizedDistance) +// } +// }) +// } +//} diff --git a/app/pkg/eval/reconcilers.go b/app/pkg/eval/reconcilers.go deleted file mode 100644 index ba84cb5b..00000000 --- a/app/pkg/eval/reconcilers.go +++ /dev/null @@ -1,83 +0,0 @@ -package eval - -import ( - "context" - - "connectrpc.com/connect" - "github.com/cockroachdb/pebble" - "github.com/jlewi/foyle/app/pkg/logs" - "github.com/jlewi/foyle/protos/go/foyle/v1alpha1" - "github.com/jlewi/foyle/protos/go/foyle/v1alpha1/v1alpha1connect" - "github.com/pkg/errors" - "google.golang.org/protobuf/proto" -) - -// reconcilePredictions reconciles predictions for examples in the database. -func reconcilePredictions(ctx context.Context, db *pebble.DB, client v1alpha1connect.GenerateServiceClient) error { - olog := logs.FromContext(ctx) - iter, err := db.NewIterWithContext(ctx, nil) - if err != nil { - return err - } - defer iter.Close() - - for iter.First(); iter.Valid(); iter.Next() { - key := iter.Key() - if key == nil { - break - } - - log := olog.WithValues("id", string(key)) - value, err := iter.ValueAndErr() - if err != nil { - return errors.Wrapf(err, "Failed to read value for key %s", string(key)) - } - - result := &v1alpha1.EvalResult{} - if err := proto.Unmarshal(value, result); err != nil { - return errors.Wrapf(err, "Failed to unmarshal value for key %s", string(key)) - } - - if len(result.GetActual()) > 0 { - log.V(logs.Debug).Info("not generating a completion; already have answer", "path", result.ExampleFile) - // We have the answer so we don't need to generate it. - continue - } - - if len(result.Actual) == 0 { - // Initialize a trace - resp, err := func() (*connect.Response[v1alpha1.GenerateResponse], error) { - newCtx, span := tracer().Start(ctx, "(*Evaluator).reconcilePredictions") - defer span.End() - - req := connect.NewRequest(&v1alpha1.GenerateRequest{ - Doc: result.Example.Query, - }) - // We need to generate the answer. - return client.Generate(newCtx, req) - }() - - if err != nil { - connectErr, ok := err.(*connect.Error) - if ok { - // If this is a permanent error we want to abort with an error - if connectErr.Code() == connect.CodeUnavailable || connectErr.Code() == connect.CodeUnimplemented { - return errors.Wrap(err, "Unable to connect to the agent.") - } - } - result.Error = err.Error() - result.Status = v1alpha1.EvalResultStatus_ERROR - continue - } - - result.Actual = resp.Msg.GetBlocks() - result.GenTraceId = resp.Msg.GetTraceId() - - log.Info("Writing result to DB") - if err := updateResult(ctx, string(key), result, db); err != nil { - return errors.Wrapf(err, "Failed to write result to DB") - } - } - } - return nil -} diff --git a/app/pkg/eval/results_manager.go b/app/pkg/eval/results_manager.go index dd768c44..42a86eca 100644 --- a/app/pkg/eval/results_manager.go +++ b/app/pkg/eval/results_manager.go @@ -3,13 +3,17 @@ package eval import ( "context" "database/sql" + "github.com/jlewi/foyle/app/pkg/analyze/fsql" + "github.com/jlewi/foyle/app/pkg/logs" "github.com/jlewi/foyle/protos/go/foyle/v1alpha1" + "github.com/pkg/errors" + "google.golang.org/protobuf/encoding/protojson" ) // ResultsManager manages the database containing the evaluation results type ResultsManager struct { - //queries *fsql.Queries - db *sql.DB + queries *fsql.Queries + db *sql.DB } // EvalResultUpdater is a function that updates an evaluation result. @@ -21,87 +25,86 @@ type EvalResultUpdater func(result *v1alpha1.EvalResult) error // If the given id doesn't exist then an empty Session is passed to updateFunc and the result will be // inserted if the updateFunc returns nil. If the session result exists then the result is passed to updateFunc // and the updated value is then written to the database -func (db *ResultsManager) Update(ctx context.Context, resultID string, updateFunc EvalResultUpdater) error { - //log := logs.FromContext(ctx) - //if contextID == "" { - // return errors.WithStack(errors.New("contextID must be non-empty")) - //} - //log = log.WithValues("contextId", contextID) - // - //tx, err := db.db.BeginTx(ctx, &sql.TxOptions{}) - //if err != nil { - // return errors.Wrapf(err, "Failed to start transaction") - //} - // - //queries := db.queries.WithTx(tx) - //// Read the record - //sessRow, err := queries.GetSession(ctx, contextID) - // - //// If the session doesn't exist then we do nothing because session is initializeed to empty session - //session := &logspb.Session{ - // ContextId: contextID, - //} - //if err != nil { - // if err != sql.ErrNoRows { - // if txErr := tx.Rollback(); txErr != nil { - // log.Error(txErr, "Failed to rollback transaction") - // } - // return errors.Wrapf(err, "Failed to get session with id %v", contextID) - // } - //} else { - // // Deserialize the proto - // if err := proto.Unmarshal(sessRow.Proto, session); err != nil { - // if txErr := tx.Rollback(); txErr != nil { - // log.Error(txErr, "Failed to rollback transaction") - // } - // return errors.Wrapf(err, "Failed to deserialize session") - // } - //} - // - //if err := updateFunc(session); err != nil { - // if txErr := tx.Rollback(); txErr != nil { - // log.Error(txErr, "Failed to rollback transaction") - // } - // return errors.Wrapf(err, "Failed to update session") - //} - // - //newRow, err := protoToRow(session) - //if err != nil { - // if txErr := tx.Rollback(); txErr != nil { - // log.Error(txErr, "Failed to rollback transaction") - // } - // return errors.Wrapf(err, "Failed to convert session proto to table row") - //} - // - //if newRow.Contextid != contextID { - // if txErr := tx.Rollback(); txErr != nil { - // log.Error(txErr, "Failed to rollback transaction") - // } - // return errors.WithStack(errors.Errorf("contextID in session doesn't match contextID. Update was called with contextID: %v but session has contextID: %v", contextID, newRow.Contextid)) - //} - // - //update := fsql.UpdateSessionParams{ - // Contextid: contextID, - // Proto: newRow.Proto, - // Starttime: newRow.Starttime, - // Endtime: newRow.Endtime, - // Selectedid: newRow.Selectedid, - // Selectedkind: newRow.Selectedkind, - // TotalInputTokens: newRow.TotalInputTokens, - // TotalOutputTokens: newRow.TotalOutputTokens, - // NumGenerateTraces: newRow.NumGenerateTraces, - //} - // - //if err := queries.UpdateSession(ctx, update); err != nil { - // if txErr := tx.Rollback(); txErr != nil { - // log.Error(txErr, "Failed to rollback transaction") - // } - // return errors.Wrapf(err, "Failed to update session") - //} - // - //if err := tx.Commit(); err != nil { - // return errors.Wrapf(err, "Failed to commit transaction") - //} - // - //return nil +func (db *ResultsManager) Update(ctx context.Context, id string, updateFunc EvalResultUpdater) error { + log := logs.FromContext(ctx) + if id == "" { + return errors.WithStack(errors.New("id must be non-empty")) + } + log = log.WithValues("exampleId", id) + + tx, err := db.db.BeginTx(ctx, &sql.TxOptions{}) + if err != nil { + return errors.Wrapf(err, "Failed to start transaction") + } + + queries := db.queries.WithTx(tx) + // Read the record + row, err := queries.GetResult(ctx, id) + + // If the session doesn't exist then we do nothing because session is initializeed to empty session + rowPb := &v1alpha1.EvalResult{} + if err != nil { + if err != sql.ErrNoRows { + if txErr := tx.Rollback(); txErr != nil { + log.Error(txErr, "Failed to rollback transaction") + } + return errors.Wrapf(err, "Failed to get result with id %v", id) + } + } else { + // Deserialize the proto + if err := protojson.Unmarshal([]byte(row.ProtoJson), rowPb); err != nil { + if txErr := tx.Rollback(); txErr != nil { + log.Error(txErr, "Failed to rollback transaction") + } + return errors.Wrapf(err, "Failed to deserialize result") + } + } + + if err := updateFunc(rowPb); err != nil { + if txErr := tx.Rollback(); txErr != nil { + log.Error(txErr, "Failed to rollback transaction") + } + return errors.Wrapf(err, "Failed to update result") + } + + update, err := protoToRowUpdate(rowPb) + if err != nil { + if txErr := tx.Rollback(); txErr != nil { + log.Error(txErr, "Failed to rollback transaction") + } + return errors.Wrapf(err, "Failed to convert EvalResult proto to table row") + } + + if update.ID != id { + if txErr := tx.Rollback(); txErr != nil { + log.Error(txErr, "Failed to rollback transaction") + } + return errors.WithStack(errors.Errorf("id in EvalResult doesn't match id. Update was called with ID: %v but session has ID: %v", id, update.ID)) + } + + if err := queries.UpdateResult(ctx, *update); err != nil { + if txErr := tx.Rollback(); txErr != nil { + log.Error(txErr, "Failed to rollback transaction") + } + return errors.Wrapf(err, "Failed to update session") + } + + if err := tx.Commit(); err != nil { + return errors.Wrapf(err, "Failed to commit transaction") + } + + return nil +} + +func protoToRowUpdate(result *v1alpha1.EvalResult) (*fsql.UpdateResultParams, error) { + protoJson, err := protojson.Marshal(result) + if err != nil { + return nil, errors.Wrapf(err, "Failed to serialize EvalResult to JSON") + } + + return &fsql.UpdateResultParams{ + ID: result.GetExample().GetId(), + Time: result.Example.GetTime().AsTime(), + ProtoJson: string(protoJson), + }, nil } diff --git a/app/pkg/eval/results_manager_test.go b/app/pkg/eval/results_manager_test.go new file mode 100644 index 00000000..431d870b --- /dev/null +++ b/app/pkg/eval/results_manager_test.go @@ -0,0 +1,58 @@ +package eval + +import ( + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "github.com/jlewi/foyle/protos/go/foyle/v1alpha1" + "google.golang.org/protobuf/encoding/protojson" + "testing" + "time" +) + +func Test_protoToRowUpdate(t *testing.T) { + + type testCase struct { + name string + result *v1alpha1.EvalResult + } + + cases := []testCase{ + { + name: "Basic", + result: &v1alpha1.EvalResult{ + Example: &v1alpha1.EvalExample{ + Id: "1", + }, + }, + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + actual, err := protoToRowUpdate(c.result) + if err != nil { + t.Fatalf("Error converting EvalResult to row update: %v", err) + } + + if actual.ID != c.result.Example.Id { + t.Fatalf("Expected ID to be %v but got %v", c.result.Example.Id, actual.ID) + } + + if actual.Time != c.result.Example.Time.AsTime() { + t.Fatalf("Expected Time to be %v but got %v", c.result.Example.Time, actual.Time) + } + + // We can't compare the serialized protos because the JSON serialization is non-deterministic + + actualPB := &v1alpha1.EvalResult{} + if err := protojson.Unmarshal([]byte(actual.ProtoJson), actualPB); err != nil { + t.Fatalf("Error deserializing actual result: %v", err) + } + + comparer := cmpopts.IgnoreUnexported(v1alpha1.EvalResult{}, v1alpha1.EvalExample{}, time.Time{}) + if d := cmp.Diff(c.result, actualPB, comparer); d != "" { + t.Fatalf("Unexpected diff between expected and actual EvalResults:\n%v", d) + } + }) + } +} diff --git a/app/pkg/eval/service.go b/app/pkg/eval/service.go index 22b097b8..6162ed61 100644 --- a/app/pkg/eval/service.go +++ b/app/pkg/eval/service.go @@ -4,7 +4,6 @@ import ( "context" "github.com/go-logr/zapr" - "github.com/jlewi/foyle/app/pkg/docs" "go.uber.org/zap" "connectrpc.com/connect" @@ -139,12 +138,12 @@ func toAssertionRow(result *v1alpha1.EvalResult) (*v1alpha1.AssertionRow, error) log := zapr.NewLogger(zap.L()) row := &v1alpha1.AssertionRow{ - Id: result.Example.GetId(), - ExampleFile: result.GetExampleFile(), + Id: result.Example.GetId(), + // ExampleFile: result.GetExampleFile(), } - row.DocMd = docs.DocToMarkdown(result.GetExample().GetQuery()) - row.AnswerMd = docs.BlocksToMarkdown(result.GetActual()) + //row.DocMd = docs.DocToMarkdown(result.GetExample().GetQuery()) + //row.AnswerMd = docs.BlocksToMarkdown(result.GetActual()) for _, a := range result.GetAssertions() { switch a.Name { diff --git a/app/pkg/eval/service_test.go b/app/pkg/eval/service_test.go index e4e30bda..031edd2f 100644 --- a/app/pkg/eval/service_test.go +++ b/app/pkg/eval/service_test.go @@ -2,6 +2,7 @@ package eval import ( "fmt" + parserv1 "github.com/stateful/runme/v3/pkg/api/gen/proto/go/runme/parser/v1" "testing" "github.com/jlewi/foyle/protos/go/foyle/v1alpha1" @@ -16,21 +17,21 @@ func Test_ToAssertRow(t *testing.T) { cases := []testCase{ { evalResult: &v1alpha1.EvalResult{ - Example: &v1alpha1.Example{ + Example: &v1alpha1.EvalExample{ Id: "1234", - Query: &v1alpha1.Doc{ - Blocks: []*v1alpha1.Block{ - { - Kind: v1alpha1.BlockKind_MARKUP, - Contents: "Hello world", - }, - }, - }, + //Query: &v1alpha1.Doc{ + // Blocks: []*v1alpha1.Block{ + // { + // Kind: v1alpha1.BlockKind_MARKUP, + // Contents: "Hello world", + // }, + // }, + //}, }, - Actual: []*v1alpha1.Block{ + ActualCells: []*parserv1.Cell{ { - Kind: v1alpha1.BlockKind_MARKUP, - Contents: "word", + Kind: parserv1.CellKind_CELL_KIND_MARKUP, + Value: "word", }, }, Assertions: []*v1alpha1.Assertion{