Skip to content
This repository has been archived by the owner on Oct 30, 2024. It is now read-only.

add: flow-global options #73

Merged
merged 1 commit into from
Aug 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions examples/chatgpt-filesearch.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
flows:
chatgpt:
default: true
globals:
ingestion:
textsplitter:
chunkSize: 800
chunkOverlap: 400
ingestion:
- filetypes: ["*"]
textsplitter:
name: text
options:
chunkSize: 811

7 changes: 0 additions & 7 deletions pkg/client/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,13 @@ import (
"log/slog"
"os"
"path/filepath"
"slices"
"strings"
)

func isIgnored(ignore gitignore.Matcher, path string) bool {
return ignore.Match(strings.Split(path, string(filepath.Separator)), false)
}

func checkIgnored(path string, ignoreExtensions []string) bool {
ext := filepath.Ext(path)
slog.Debug("checking path", "path", path, "ext", ext, "ignore", ignoreExtensions)
return slices.Contains(ignoreExtensions, ext)
}

func readIgnoreFile(path string) ([]gitignore.Pattern, error) {
stat, err := os.Stat(path)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/askdir.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (s *ClientAskDir) Run(cmd *cobra.Command, args []string) error {
}

for _, ingestionFlowConfig := range flow.Ingestion {
ingestionFlow, err := ingestionFlowConfig.AsIngestionFlow()
ingestionFlow, err := ingestionFlowConfig.AsIngestionFlow(&flow.Globals.Ingestion)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type Client struct {
Server string `usage:"URL of the Knowledge API Server" default:"" env:"KNOW_SERVER_URL"`
datastoreArchive string

EmbeddingModelProvider string `usage:"Embedding model provider" default:"openai" env:"KNOW_EMBEDDING_MODEL_PROVIDER" name:"embedding-model-provider" default:"openai" koanf:"provider"`
EmbeddingModelProvider string `usage:"Embedding model provider" env:"KNOW_EMBEDDING_MODEL_PROVIDER" name:"embedding-model-provider" default:"openai" koanf:"provider"`
ConfigFile string `usage:"Path to the configuration file" env:"KNOW_CONFIG_FILE" default:"" short:"c"`

config.DatabaseConfig
Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (s *ClientIngest) Run(cmd *cobra.Command, args []string) error {
}

for _, ingestionFlowConfig := range flow.Ingestion {
ingestionFlow, err := ingestionFlowConfig.AsIngestionFlow()
ingestionFlow, err := ingestionFlowConfig.AsIngestionFlow(&flow.Globals.Ingestion)
if err != nil {
return err
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/datastore/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,11 @@ func (s *Datastore) Ingest(ctx context.Context, datasetID string, content []byte
break
}
}
ingestionFlow.FillDefaults(filetype, opts.TextSplitterOpts)

if err := ingestionFlow.FillDefaults(filetype, opts.TextSplitterOpts); err != nil {
return nil, err
}

if ingestionFlow.Load == nil {
return nil, fmt.Errorf("unsupported filetype %q (file %q)", filetype, opts.FileMetadata.AbsolutePath)
}
Expand Down
7 changes: 3 additions & 4 deletions pkg/datastore/ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,9 @@ func TestExtractPDF(t *testing.T) {

filetype := ".pdf"

ingestionFlow := flows.NewDefaultIngestionFlow(filetype, &textSplitterOpts)
if ingestionFlow.Load == nil {
t.Fatalf("ingestionFlow.Load is nil")
}
ingestionFlow, err := flows.NewDefaultIngestionFlow(filetype, &textSplitterOpts)
require.NoError(t, err, "NewDefaultIngestionFlow() error = %v", err)
require.NotNil(t, ingestionFlow.Load, "ingestionFlow.Load is nil")

// Mandatory Transformation: Add filename to metadata
em := &transformers.ExtraMetadata{Metadata: map[string]any{"filename": d.Name()}}
Expand Down
17 changes: 9 additions & 8 deletions pkg/datastore/textsplitter/textsplitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package textsplitter

import (
"fmt"
dstypes "github.com/gptscript-ai/knowledge/pkg/datastore/types"
"log/slog"

"dario.cat/mergo"
Expand All @@ -16,7 +17,7 @@ type SplitterFunc func([]vs.Document) ([]vs.Document, error)
type TextSplitterOpts struct {
ChunkSize int `json:"chunkSize" mapstructure:"chunkSize" usage:"Textsplitter Chunk Size" default:"1024" env:"KNOW_TEXTSPLITTER_CHUNK_SIZE" name:"textsplitter-chunk-size"`
ChunkOverlap int `json:"chunkOverlap" mapstructure:"chunkOverlap" usage:"Textsplitter Chunk Overlap" default:"256" env:"KNOW_TEXTSPLITTER_CHUNK_OVERLAP" name:"textsplitter-chunk-overlap"`
ModelName string `json:"modelName" mapstructure:"modelName" usage:"Textsplitter Model Name" default:"gpt-4" env:"KNOW_TEXTSPLITTER_MODEL_NAME" name:"textsplitter-model-name"`
ModelName string `json:"modelName" mapstructure:"modelName" usage:"Textsplitter Model Name" default:"gpt-4o" env:"KNOW_TEXTSPLITTER_MODEL_NAME" name:"textsplitter-model-name"`
EncodingName string `json:"encodingName" mapstructure:"encodingName" usage:"Textsplitter Encoding Name" default:"cl100k_base" env:"KNOW_TEXTSPLITTER_ENCODING_NAME" name:"textsplitter-encoding-name"`
}

Expand Down Expand Up @@ -61,7 +62,7 @@ func GetTextSplitterConfig(name string) (any, error) {
}
}

func GetTextSplitterFunc(name string, config any) (SplitterFunc, error) {
func GetTextSplitter(name string, config any) (dstypes.TextSplitter, error) {
switch name {
case "text":
cfg := NewTextSplitterOpts()
Expand All @@ -70,29 +71,29 @@ func GetTextSplitterFunc(name string, config any) (SplitterFunc, error) {
if err := mapstructure.Decode(config, &customCfg); err != nil {
return nil, fmt.Errorf("failed to decode text splitter configuration: %w", err)
}
slog.Debug("GetTextSplitterFunc Text (before merge)", "config", customCfg)
slog.Debug("GetTextSplitter Text (before merge)", "config", customCfg)
if err := mergo.Merge(&customCfg, cfg); err != nil {
return nil, fmt.Errorf("failed to merge text splitter configuration: %w", err)
}
cfg = customCfg
}
slog.Debug("TextSplitterFunc", "config", cfg)
return FromLangchain(NewLcgoTextSplitter(cfg)).SplitDocuments, nil
slog.Debug("TextSplitter", "config", cfg)
return FromLangchain(NewLcgoTextSplitter(cfg)), nil
case "markdown":
cfg := NewTextSplitterOpts()
if config != nil {
var customCfg TextSplitterOpts
if err := mapstructure.Decode(config, &customCfg); err != nil {
return nil, fmt.Errorf("failed to decode text splitter configuration: %w", err)
}
slog.Debug("GetTextSplitterFunc Markdown (before merge)", "config", customCfg)
slog.Debug("GetTextSplitter Markdown (before merge)", "config", customCfg)
if err := mergo.Merge(&customCfg, cfg); err != nil {
return nil, fmt.Errorf("failed to merge text splitter configuration: %w", err)
}
cfg = customCfg
}
slog.Debug("MarkdownSplitterFunc", "config", cfg)
return FromLangchain(NewLcgoMarkdownSplitter(cfg)).SplitDocuments, nil
slog.Debug("MarkdownSplitter", "config", cfg)
return FromLangchain(NewLcgoMarkdownSplitter(cfg)), nil
default:
return nil, fmt.Errorf("unknown text splitter %q", name)
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/datastore/textsplitter/textsplitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,21 @@ func TestGetTextSplitterConfigWithInvalidName(t *testing.T) {
}

func TestGetTextSplitterFuncWithValidNameAndNilConfig(t *testing.T) {
_, err := GetTextSplitterFunc("text", nil)
_, err := GetTextSplitter("text", nil)
assert.NoError(t, err)
}

func TestGetTextSplitterFuncWithValidNameAndInvalidConfig(t *testing.T) {
_, err := GetTextSplitterFunc("text", "invalid")
_, err := GetTextSplitter("text", "invalid")
assert.Error(t, err)
}

func TestGetTextSplitterFuncWithValidNameAndValidConfig(t *testing.T) {
_, err := GetTextSplitterFunc("text", NewTextSplitterOpts())
_, err := GetTextSplitter("text", NewTextSplitterOpts())
assert.NoError(t, err)
}

func TestGetTextSplitterFuncWithInvalidName(t *testing.T) {
_, err := GetTextSplitterFunc("invalid", nil)
_, err := GetTextSplitter("invalid", nil)
assert.Error(t, err)
}
42 changes: 29 additions & 13 deletions pkg/flows/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,18 @@ type FlowConfig struct {
}

type FlowConfigEntry struct {
Default bool `json:"default,omitempty" yaml:"default" mapstructure:"default"`
Ingestion []IngestionFlowConfig `json:"ingestion,omitempty" yaml:"ingestion" mapstructure:"ingestion"`
Retrieval *RetrievalFlowConfig `json:"retrieval,omitempty" yaml:"retrieval" mapstructure:"retrieval"`
Default bool `json:"default,omitempty" yaml:"default" mapstructure:"default"`
Globals FlowConfigEntryGlobalOpts `json:"globals,omitempty" yaml:"globals" mapstructure:"globals"`
Ingestion []IngestionFlowConfig `json:"ingestion,omitempty" yaml:"ingestion" mapstructure:"ingestion"`
Retrieval *RetrievalFlowConfig `json:"retrieval,omitempty" yaml:"retrieval" mapstructure:"retrieval"`
}

type FlowConfigEntryGlobalOpts struct {
Ingestion FlowConfigGlobalsIngestion `json:"ingestion,omitempty" yaml:"ingestion" mapstructure:"ingestion"`
}

type FlowConfigGlobalsIngestion struct {
Textsplitter map[string]any `json:"textsplitter,omitempty" yaml:"textsplitter" mapstructure:"textsplitter"`
}

type IngestionFlowConfig struct {
Expand Down Expand Up @@ -144,10 +153,14 @@ func (f *FlowConfig) GetFlow(name string) (*FlowConfigEntry, error) {
}

// AsIngestionFlow converts an IngestionFlowConfig to an actual flows.IngestionFlow.
func (i *IngestionFlowConfig) AsIngestionFlow() (*flows.IngestionFlow, error) {
func (i *IngestionFlowConfig) AsIngestionFlow(globals *FlowConfigGlobalsIngestion) (*flows.IngestionFlow, error) {
flow := &flows.IngestionFlow{
Filetypes: i.Filetypes,
Globals: flows.IngestionFlowGlobals{
SplitterOpts: globals.Textsplitter,
},
}

if i.DocumentLoader.Name != "" {
name := strings.ToLower(strings.Trim(i.DocumentLoader.Name, " "))
cfg, err := documentloader.GetDocumentLoaderConfig(name)
Expand Down Expand Up @@ -177,21 +190,24 @@ func (i *IngestionFlowConfig) AsIngestionFlow() (*flows.IngestionFlow, error) {
if err != nil {
return nil, err
}
if len(i.TextSplitter.Options) > 0 {
jsondata, err := json.Marshal(i.TextSplitter.Options)
if err != nil {
return nil, err

if len(globals.Textsplitter) > 0 {
if err := mapstructure.Decode(globals.Textsplitter, &cfg); err != nil {
return nil, fmt.Errorf("failed to decode text splitter global configuration: %w", err)
}
err = json.Unmarshal(jsondata, &cfg)
if err != nil {
return nil, err
}

if len(i.TextSplitter.Options) > 0 {
if err := mapstructure.Decode(i.TextSplitter.Options, &cfg); err != nil {
return nil, fmt.Errorf("failed to decode text splitter %q configuration: %w", i.TextSplitter.Name, err)
}
}
splitterFunc, err := textsplitter.GetTextSplitterFunc(name, cfg)

splitterFunc, err := textsplitter.GetTextSplitter(name, cfg)
if err != nil {
return nil, err
}
flow.Split = splitterFunc
flow.Splitter = splitterFunc
}

if len(i.Transformers) > 0 {
Expand Down
40 changes: 31 additions & 9 deletions pkg/flows/flows.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package flows
import (
"context"
"fmt"
"github.com/acorn-io/z"
"github.com/gptscript-ai/knowledge/pkg/datastore/store"
"github.com/mitchellh/mapstructure"
"github.com/philippgille/chromem-go"
"io"
"log/slog"
Expand All @@ -19,10 +21,15 @@ import (
vs "github.com/gptscript-ai/knowledge/pkg/vectorstore"
)

type IngestionFlowGlobals struct {
SplitterOpts map[string]any
}

type IngestionFlow struct {
Globals IngestionFlowGlobals
Filetypes []string
Load documentloader.LoaderFunc
Split textsplitter.SplitterFunc
Splitter dstypes.TextSplitter
Transformations []dstypes.DocumentTransformer
}

Expand All @@ -37,28 +44,43 @@ func (f *IngestionFlow) Transform(ctx context.Context, docs []vs.Document) ([]vs
return docs, nil
}

func NewDefaultIngestionFlow(filetype string, textsplitterOpts *textsplitter.TextSplitterOpts) IngestionFlow {
func NewDefaultIngestionFlow(filetype string, textsplitterOpts *textsplitter.TextSplitterOpts) (IngestionFlow, error) {
ingestionFlow := IngestionFlow{
Filetypes: []string{filetype},
}
ingestionFlow.FillDefaults(filetype, textsplitterOpts)
return ingestionFlow
if err := ingestionFlow.FillDefaults(filetype, textsplitterOpts); err != nil {
return IngestionFlow{}, err
}
return ingestionFlow, nil
}

func (f *IngestionFlow) SupportsFiletype(filetype string) bool {
return slices.Contains(f.Filetypes, filetype)
return slices.Contains(f.Filetypes, filetype) || slices.Contains(f.Filetypes, "*")
}

func (f *IngestionFlow) FillDefaults(filetype string, textsplitterOpts *textsplitter.TextSplitterOpts) {
func (f *IngestionFlow) FillDefaults(filetype string, textsplitterOpts *textsplitter.TextSplitterOpts) error {
if f.Load == nil {
f.Load = documentloader.DefaultDocLoaderFunc(filetype)
}
if f.Split == nil {
f.Split = textsplitter.DefaultTextSplitter(filetype, textsplitterOpts).SplitDocuments
if f.Splitter == nil {
if textsplitterOpts == nil {
textsplitterOpts = z.Pointer(textsplitter.NewTextSplitterOpts())
}
slog.Debug("Using default text splitter", "filetype", filetype, "textSplitterOpts", textsplitterOpts)

if len(f.Globals.SplitterOpts) > 0 {
if err := mapstructure.Decode(f.Globals.SplitterOpts, textsplitterOpts); err != nil {
return fmt.Errorf("failed to decode globals.SplitterOpts configuration: %w", err)
}
slog.Debug("Overriding text splitter options with globals from flows config", "filetype", filetype, "textSplitterOpts", textsplitterOpts)
}

f.Splitter = textsplitter.DefaultTextSplitter(filetype, textsplitterOpts)
}
if len(f.Transformations) == 0 {
f.Transformations = transformers.DefaultDocumentTransformers(filetype)
}
return nil
}

func (f *IngestionFlow) Run(ctx context.Context, reader io.Reader) ([]vs.Document, error) {
Expand All @@ -84,7 +106,7 @@ func (f *IngestionFlow) Run(ctx context.Context, reader io.Reader) ([]vs.Documen
/*
* Split documents - Chunking
*/
docs, err = f.Split(docs)
docs, err = f.Splitter.SplitDocuments(docs)
if err != nil {
slog.Error("Failed to split documents", "error", err)
return nil, fmt.Errorf("failed to split documents: %w", err)
Expand Down