From c9bff15a53723e6d3c6b8c4ae7fa15134a93297f Mon Sep 17 00:00:00 2001 From: shindakioku Date: Sun, 25 Feb 2024 12:54:27 +0200 Subject: [PATCH 1/6] flow: basic implementation. --- bot.go | 12 +- flow/bus.go | 216 +++++++++++++++++++++++++++++ flow/factory.go | 106 ++++++++++++++ flow/flow.go | 17 +++ flow/machine.go | 121 ++++++++++++++++ flow/state.go | 60 ++++++++ flow/step.go | 27 ++++ flow_simple_manual_testing/main.go | 211 ++++++++++++++++++++++++++++ 8 files changed, 764 insertions(+), 6 deletions(-) create mode 100644 flow/bus.go create mode 100644 flow/factory.go create mode 100644 flow/flow.go create mode 100644 flow/machine.go create mode 100644 flow/state.go create mode 100644 flow/step.go create mode 100644 flow_simple_manual_testing/main.go diff --git a/bot.go b/bot.go index 0473280d..f5b5afba 100644 --- a/bot.go +++ b/bot.go @@ -163,13 +163,13 @@ var ( // // Example: // -// b.Handle("/start", func (c tele.Context) error { -// return c.Reply("Hello!") -// }) +// b.Handle("/start", func (c tele.Context) error { +// return c.Reply("Hello!") +// }) // -// b.Handle(&inlineButton, func (c tele.Context) error { -// return c.Respond(&tele.CallbackResponse{Text: "Hello!"}) -// }) +// b.Handle(&inlineButton, func (c tele.Context) error { +// return c.Respond(&tele.CallbackResponse{Text: "Hello!"}) +// }) // // Middleware usage: // diff --git a/flow/bus.go b/flow/bus.go new file mode 100644 index 00000000..615c69c7 --- /dev/null +++ b/flow/bus.go @@ -0,0 +1,216 @@ +package flow + +import ( + "errors" + "gopkg.in/telebot.v3" + "sync" + "time" +) + +var ( + UserDoesNotHaveActiveFlow = errors.New("flow: user does not have active flow") + NoStepsDefined = errors.New("flow: no steps defined") +) + +// The Bus handles user actions, such as [telebot.OnText, telebot.OnMedia, etc]. +// Flow processing involves handling any user prompt after the user has begun the flow process. +// We offer this contract to give developers more control, avoiding reliance on obscure mechanisms. +type Bus interface { + // UserInFlow returns true if the user is currently engaged in flow processing. + // + // Example: + // bot.Handle("/start", func(c telebot.Context) error { + // if flowBus.UserInFlow(c.Sender().ID) { // Reply with an error message. } + // + // return c.Reply("Hello!") + // }) + UserInFlow(userID int64) bool + // UserContinueFlow initiates or continues the flow process for a user if one is already in progress. + // + // Example: + // bot.Handle("/start", func(c telebot.Context) error { + // if flowBus.UserInFlow(c.Sender().ID) { flowBus.UserContinueFlow(c.Sender().ID) } + // + // return c.Reply("Hello!") + // }) + UserContinueFlow(userID int64, c telebot.Context) error + // UserContinueFlowOrCustom calls [UserContinueFlow] if the flow process for a user is in progress. + // Otherwise, it calls a custom function. + // For instance, you may need to call this function to define a custom handler for any action required by the flow. + // + // Example: + // + // bot.Handle(telebot.OnText, flowBus.ProcessUserToFlowOrCustom(func (c telebot.Context) error { + // // Called only if the user hasn't begun the flow. + // + // return nil + // })) + UserContinueFlowOrCustom(telebot.HandlerFunc) telebot.HandlerFunc + // Handle implements any message handler. + // This function checks if the user is continuing work on their active flow and processes it if so. + // + // Example: + // bot.Handle(telebot.OnText, flowBus.Handle) + Handle(telebot.Context) error + + // Flow initiates flow configuration + Flow(factory *Factory) telebot.HandlerFunc +} + +// describes the state to the [SimpleBus.states] value +type state struct { + // User's flow + flow *Flow + state *State +} + +// SimpleBus implementation for the [Bus] contract +type SimpleBus struct { + // Stores the active user flows by their IDs. + // Key - user id (int64) + // Value - the [state] instance + states sync.Map + // We don't need to keep active flows indefinitely. + // This setting defines the maximum lifespan for each flow. + // Background process will remove flows that have been alive longer than the defined duration. + // @TODO: Provide a callback handler for every deletion process. + flowSessionIsAvailableFor time.Duration +} + +func (b *SimpleBus) UserInFlow(userID int64) bool { + _, exists := b.states.Load(userID) + + return exists +} + +func (b *SimpleBus) UserContinueFlow(userID int64, c telebot.Context) error { + //flow, exists := b.states.Load(userID) + _, exists := b.states.Load(userID) + if !exists { + return UserDoesNotHaveActiveFlow + } + + // @TODO: call machine + return nil +} + +func (b *SimpleBus) UserContinueFlowOrCustom(handler telebot.HandlerFunc) telebot.HandlerFunc { + return func(c telebot.Context) error { + if b.UserInFlow(c.Sender().ID) { + return b.UserContinueFlow(c.Sender().ID, c) + } + + return handler(c) + } +} + +func (b *SimpleBus) Handle(c telebot.Context) error { + stV, exists := b.states.Load(c.Sender().ID) + if !exists { + return UserDoesNotHaveActiveFlow + } + + st := stV.(*state) + // Update context for the state + // @TODO: do we need to persist the latest context every time? + st.state.Context = c + // Get active step + step := st.flow.steps[st.state.Machine.ActiveStep()] + // Call validators if it defined + validators := step.validators + if len(validators) > 0 { + for _, validator := range validators { + err := validator.Validate(st.state) + if err != nil { + if st.flow.useValidatorErrorsAsUserResponse { + return c.Reply(err.Error()) + } else { + return err + } + } + } + } + + // Call [assign] + if step.assign != nil { + if err := step.assign(st.state); err != nil { + return err + } + } + + // Call [success] event if it's defined + if step.success != nil { + if err := step.success(st.state); err != nil { + return err + } + } + + // It was the last step. Call the [success] handler + if len(st.flow.steps) <= st.state.Machine.ActiveStep()+1 { + b.removeState(c.Sender().ID) + + return st.state.Machine.Success(st.state) + } + + // Process to the next step + err := st.state.Machine.Next(st.state) + if err != nil { + // Remove flow on any error occurring within flow logic. + // We need to call the [Fail] function because, typically, + // that handler should send something to the user like [Try again]. + b.removeState(c.Sender().ID) + + return st.state.Machine.Fail(st.state, err) + } + + return nil +} + +// Remove [state] from the [SimpleBus.states] +func (b *SimpleBus) removeState(userID int64) { + b.states.Delete(userID) +} + +func (b *SimpleBus) Flow(factory *Factory) telebot.HandlerFunc { + return func(c telebot.Context) error { + if len(factory.flow.steps) == 0 { + return NoStepsDefined + } + + // If the user already has a flow, we need to recall the active step. + stV, exists := b.states.Load(c.Sender().ID) + if exists { + st := stV.(*state) + st.state.Context = c + + return st.state.Machine.ToStep(st.state.Machine.ActiveStep(), st.state) + } + + machine := NewMachine(factory.flow) + // Register flow for the user + st := state{ + flow: factory.flow, + state: NewState(machine, c, factory.userState), + } + b.states.Store(c.Sender().ID, &st) + // Call the machine for the start the first step + return machine.ToStep(0, st.state) + } +} + +// Removes flows that have been active for longer than [flowSessionIsAvailableFor] time. +func (b *SimpleBus) removeIdleFlows() { + // @TODO: Provide an API for clients. + // For example, a developer may want to notify a user that their session has expired. +} + +func NewBus(bot *telebot.Bot, flowSessionIsAvailableFor time.Duration) Bus { + bus := &SimpleBus{ + flowSessionIsAvailableFor: flowSessionIsAvailableFor, + } + + // @TODO: do we need to create an API for users to interact with this? + go bus.removeIdleFlows() + + return bus +} diff --git a/flow/factory.go b/flow/factory.go new file mode 100644 index 00000000..406c3ca9 --- /dev/null +++ b/flow/factory.go @@ -0,0 +1,106 @@ +package flow + +// This package contains factories for describing flows. +// Factories generate a flow object in a simple manner. + +// Factory for creating a [Flow] object. +type Factory struct { + flow *Flow + // Represents any user state with [State.userState]. + userState map[interface{}]interface{} +} + +// AddState adds a state to the [Factory.userState] +func (f *Factory) AddState(key interface{}, value interface{}) *Factory { + f.userState[key] = value + + return f +} + +// WithState sets a value for [Factory.userState] +func (f *Factory) WithState(userState map[interface{}]interface{}) *Factory { + f.userState = userState + + return f +} + +// Success sets a handler for the [Flow.Success] event. +func (f *Factory) Success(handler StateHandler) *Factory { + f.flow.success = handler + + return f +} + +// Fail sets a handler for the [Flow.Fail] event. +func (f *Factory) Fail(handler FailHandler) *Factory { + f.flow.fail = handler + + return f +} + +// Step adds a step to the [Flow.Steps] +func (f *Factory) Step(step *StepFactory) *Factory { + f.flow.steps = append(f.flow.steps, *step.step) + + return f +} + +// UseValidatorErrorsAsUserResponse sets a value for the [Flow.useValidatorErrorsAsUserResponse]. +func (f *Factory) UseValidatorErrorsAsUserResponse(value bool) *Factory { + f.flow.useValidatorErrorsAsUserResponse = value + + return f +} + +// New start describing the flow. +func New() *Factory { + return &Factory{ + flow: &Flow{}, + userState: make(map[interface{}]interface{}), + } +} + +// StepFactory for creating a [Step] object. +type StepFactory struct { + step *Step +} + +// Begin sets a handler for the [Step.begin] event. +func (f *StepFactory) Begin(handler StateHandler) *StepFactory { + f.step.begin = handler + + return f +} + +// Name sets a value for the [Step.name]. +func (f *StepFactory) Name(name int) *StepFactory { + f.step.name = name + + return f +} + +// Validate sets values for the [Step.validators]. +func (f *StepFactory) Validate(validators ...StepValidator) *StepFactory { + f.step.validators = validators + + return f +} + +// Assign sets a value for the [Step.assign]. +func (f *StepFactory) Assign(assign StateHandler) *StepFactory { + f.step.assign = assign + + return f +} + +// Success sets a value for the [Step.success]. +func (f *StepFactory) Success(success StateHandler) *StepFactory { + f.step.success = success + + return f +} + +// NewStep initiates the description of a step for the flow. +func NewStep() *StepFactory { + return &StepFactory{step: &Step{}} +} diff --git a/flow/flow.go b/flow/flow.go new file mode 100644 index 00000000..6b863f17 --- /dev/null +++ b/flow/flow.go @@ -0,0 +1,17 @@ +package flow + +type FailHandler func(*State, error) error + +// Flow describes a process from beginning to end. It retains all defined steps, the user's final handler, and more. +// Additionally, it offers a straightforward interface to access internal storage for marshaling and saving elsewhere. +type Flow struct { + // User's defined steps + steps []Step + // Calls after successfully passing full flow + success StateHandler + // Calls when user trigger fail step + fail FailHandler + // Determines whether we need to send errors from a validator to the user as a response. + // If true, errors from a validator are responded, otherwise, no response is sent. + useValidatorErrorsAsUserResponse bool +} diff --git a/flow/machine.go b/flow/machine.go new file mode 100644 index 00000000..d648bb12 --- /dev/null +++ b/flow/machine.go @@ -0,0 +1,121 @@ +package flow + +import ( + "errors" + "fmt" +) + +// Machine describes the contract for the flow handling +type Machine interface { + // Back move backward by step + Back(state *State) error + // Next move forward by step + Next(state *State) error + // ToStep Move to the step + ToStep(step int, state *State) error + // Success stop processing and call the final function + Success(state *State) error + // Fail stop processing and call the fail function + Fail(state *State, err error) error + // ActiveStep returns the current step + ActiveStep() int +} + +// SimpleMachine implements the [Machine] contract +type SimpleMachine struct { + // User defined flow + flow *Flow + + // Active step for the user + activeStep int + + // Sets to true if failure was called. + failed bool +} + +func (m *SimpleMachine) Back(state *State) error { + if m.activeStep-1 <= 0 { + return errors.New("already first step") + } + + m.activeStep -= 1 + + return m.run(state) +} + +func (m *SimpleMachine) Next(state *State) error { + if m.activeStep+1 >= len(m.flow.steps) { + return errors.New("already last step") + } + + m.activeStep += 1 + + return m.run(state) +} + +func (m *SimpleMachine) ToStep(step int, state *State) error { + if step < 0 { + return errors.New("step cannot be less than zero") + } + + if step > len(m.flow.steps) { + return errors.New("step cannot be greater than steps count") + } + + m.activeStep = step + + return m.run(state) +} + +func (m *SimpleMachine) Success(state *State) error { + if m.failed { + return errors.New("flow was already failed") + } + + if m.flow.success != nil { + return m.flow.success(state) + } + + return nil +} + +func (m *SimpleMachine) Fail(state *State, err error) error { + m.failed = true + + if m.flow.fail != nil { + return m.flow.fail(state, err) + } + + return nil +} + +func (m *SimpleMachine) ActiveStep() int { + return m.activeStep +} + +// Run the current step (this function should be called by [Back]/[Next]/[ToStep] functions). +func (m *SimpleMachine) run(state *State) error { + if m.failed { + return errors.New("flow was already failed") + } + + if len(m.flow.steps) < m.activeStep { + return errors.New(fmt.Sprintf("step isn't defined (%d)", m.activeStep)) + } + + step := m.flow.steps[m.activeStep] + if step.begin != nil { + return step.begin(state) + } + + return nil +} + +//func (m *SimpleMachine) Continue() {} + +func NewMachine(flow *Flow) Machine { + return &SimpleMachine{ + flow: flow, + activeStep: 0, + } +} diff --git a/flow/state.go b/flow/state.go new file mode 100644 index 00000000..d52b42a9 --- /dev/null +++ b/flow/state.go @@ -0,0 +1,60 @@ +package flow + +import ( + "gopkg.in/telebot.v3" +) + +// StateHandler is a common handler for global processes, such as finally, fail, step, and so on. +type StateHandler func(state *State) error + +// State defines the user's state and persists common elements, such as the bot instance, flow handler instance, etc. +type State struct { + // Instance for the user's flow. + Machine Machine + // Received message from a user. + Context telebot.Context + + // @TODO: We can provide a full history of every step, + // including contexts, validation results, and so on. + // contextHistory map[string][]StepHistory + + // @TODO: make it concurrently safe? + // User state represents any custom data for a user. + // It's simply a container that you can use within steps. + // For instance, you may use it if you want to populate a struct at each step + // and then use this data after the flow has successfully completed. + userState map[interface{}]interface{} +} + +// Set initializes the [userState] field. +func (s *State) Set(userState map[interface{}]interface{}) { + s.userState = userState +} + +// Get returns data corresponding to the provided key, along with a boolean value indicating if the key exists. +func (s *State) Get(key interface{}) (interface{}, bool) { + value, exists := s.userState[key] + + return value, exists +} + +// Put adds data to the storage. +// Caution: it does not check if the key already exists. It will overwrite any existing data associated with the key. +func (s *State) Put(key interface{}, value interface{}) { + s.userState[key] = value +} + +// Exists returns a boolean representing whether the key exists in the map. +func (s *State) Exists(key interface{}) bool { + _, exists := s.userState[key] + + return exists +} + +func NewState(machine Machine, c telebot.Context, userState map[interface{}]interface{}) *State { + return &State{ + Machine: machine, + Context: c, + userState: userState, + } +} diff --git a/flow/step.go b/flow/step.go new file mode 100644 index 00000000..40e51734 --- /dev/null +++ b/flow/step.go @@ -0,0 +1,27 @@ +package flow + +// StepValidator ensures that each step can be validated before the flow process progresses to the next step. +// Typically, users require simple validators, such as ensuring text is not empty or a photo is uploaded. +// Therefore, having a variety of different validators and describing them in a single function is not ideal. +type StepValidator interface { + // Validate is called after the user prompts anything. + Validate(state *State) error +} + +// Step describes a user's step within a flow. +type Step struct { + // This is the user's custom function called at the beginning of a step. + // There are no restrictions on logic; the handler is not required, and the user can even use an empty mock. + // Therefore, you can do whatever you want: move backward or forward steps, validate previously saved prompts, and so on. + begin StateHandler + // Step name (by default, it is filled by auto-increment, but the user is able to define a custom value). + name int + // Defined validators + validators []StepValidator + // Callback called after the validation process if successful. + // It can, for example, assign the user's prompt to a variable. + assign StateHandler + // Called if the step is successfully passed. + success StateHandler + // @TODO: provide an [OnFail] callback which is called only if validators fail. +} diff --git a/flow_simple_manual_testing/main.go b/flow_simple_manual_testing/main.go new file mode 100644 index 00000000..84648139 --- /dev/null +++ b/flow_simple_manual_testing/main.go @@ -0,0 +1,211 @@ +package main + +import ( + "errors" + tele "gopkg.in/telebot.v3" + "gopkg.in/telebot.v3/flow" + "log" + "reflect" + "strconv" + "time" +) + +// Validators: I'm certain that we need to implement some basic validators for users. + +type NonEmptyValidator struct{} + +func (NonEmptyValidator) Validate(state *flow.State) error { + if len(state.Context.Message().Text) < 2 { + return errors.New("message is required") + } + + return nil +} + +type BadValidator struct{} + +func (BadValidator) Validate(state *flow.State) error { + return nil + //return state.Machine.Fail(state) +} + +var ( + nonEmptyValidator = NonEmptyValidator{} + badValidator = BadValidator{} +) + +// TextAssigner I'm certain that we need to implement some basic assigner for users. +func TextAssigner(value interface{}) flow.StateHandler { + vai := reflect.ValueOf(value) + vai = vai.Elem() + + return func(state *flow.State) error { + text := state.Context.Text() + if len(text) == 0 { + return nil + } + + switch vai.Kind() { + case reflect.Int, reflect.Int8, reflect.Int16, + reflect.Int32, reflect.Int64: + n, err := strconv.ParseInt(text, 10, 32) + if err != nil { + return err + } + + vai.SetInt(n) + case reflect.Uint, reflect.Uint8, reflect.Uint16, + reflect.Uint32, reflect.Uint64, reflect.Uintptr: + n, err := strconv.ParseUint(text, 10, 32) + if err != nil { + return err + } + + vai.SetUint(n) + // ...floating-point and complex cases omitted for brevity... + case reflect.Bool: + n, err := strconv.ParseBool(text) + if err != nil { + return err + } + + vai.SetBool(n) + case reflect.String: + vai.SetString(text) + //case reflect.Chan, reflect.Func, reflect.Ptr, reflect.Slice, reflect.Map: + // return v.Type().String() + " 0x" + + // strconv.FormatUint(uint64(v.Pointer()), 16) + default: // reflect.Array, reflect.Struct, reflect.Interface + vai.SetString(text) + case reflect.Invalid: + return errors.New("invalid type") + } + + return nil + } +} + +func main() { + pref := tele.Settings{ + Token: "5931155624:AAGLxTOnMt2O3UYLGpZSZAacxBVJONO1UP4", + Poller: &tele.LongPoller{Timeout: 10 * time.Second}, + } + + b, err := tele.NewBot(pref) + if err != nil { + panic(err) + } + + sendUserMessage := func(message string) func(state *flow.State) error { + return func(state *flow.State) error { + return state.Context.Reply(message) + } + } + + // Configure flow bus + flowBus := flow.NewBus(b, 5*time.Minute) + // Handle any text by flow bus + b.Handle(tele.OnText, flowBus.Handle) + // First flow + var email string + b.Handle("/start", flowBus.Flow( + flow.New(). + Step( + flow.NewStep(). + Begin(sendUserMessage("Enter email:")). + Validate(nonEmptyValidator). + Assign(TextAssigner(&email)), + ). + Step( + flow.NewStep(). + Begin(sendUserMessage("Enter password:")). + Validate(badValidator). + Success(func(state *flow.State) error { + log.Println("Second step successfully passed!") + + return nil + }), + ). + Step( + flow.NewStep(). + Begin(func(state *flow.State) error { + return nil + //return state.Machine.Back(state) + //return state.Machine.ToStep(0, state) + //return state.Machine.Next(state) + }), + ). + UseValidatorErrorsAsUserResponse(true). + Fail(func(state *flow.State, err error) error { + log.Println("Something get wrong: ", err) + + return nil + }). + Success(func(state *flow.State) error { + log.Println(email) + + return state.Context.Reply("You have successfully completed all the steps!") + }), + )) + // Flow using state storage + type user struct { + email string + password string + } + userStorageKey := "user" + b.Handle("/start2", flowBus.Flow( + flow.New(). + AddState(userStorageKey, &user{}). + Step( + flow.NewStep(). + Begin(sendUserMessage("Enter email:")). + Validate(nonEmptyValidator). + Assign(func(state *flow.State) error { + value, _ := state.Get(userStorageKey) + userValue := value.(*user) + userValue.email = state.Context.Message().Text + + return nil + }), + ). + Step( + flow.NewStep(). + Begin(sendUserMessage("Enter password:")). + Validate(badValidator). + Assign(func(state *flow.State) error { + value, _ := state.Get(userStorageKey) + userValue := value.(*user) + userValue.password = state.Context.Message().Text + + return nil + }). + Success(func(state *flow.State) error { + log.Println("Second step successfully passed!") + + return nil + }), + ). + Step( + flow.NewStep(). + Begin(func(state *flow.State) error { + return state.Machine.Fail(state, errors.New("should be passed to the [Fail]")) + //return state.Machine.Back(state) + //return state.Machine.ToStep(0, state) + //return state.Machine.Next(state) + }), + ). + UseValidatorErrorsAsUserResponse(true). + Fail(func(state *flow.State, err error) error { + log.Println("Something get wrong: ", err) + + return nil + }). + Success(func(state *flow.State) error { + log.Println(state.Get(userStorageKey)) + + return state.Context.Reply("You have successfully completed all the steps!") + }), + )) + + b.Start() +} From e3f3e823d50ffde4e843f5a62d19385108aaa003 Mon Sep 17 00:00:00 2001 From: shindakioku Date: Sun, 25 Feb 2024 12:54:46 +0200 Subject: [PATCH 2/6] flow: basic implementation. --- flow_simple_manual_testing/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flow_simple_manual_testing/main.go b/flow_simple_manual_testing/main.go index 84648139..b64b1940 100644 --- a/flow_simple_manual_testing/main.go +++ b/flow_simple_manual_testing/main.go @@ -87,7 +87,7 @@ func TextAssigner(value interface{}) flow.StateHandler { func main() { pref := tele.Settings{ - Token: "5931155624:AAGLxTOnMt2O3UYLGpZSZAacxBVJONO1UP4", + Token: "", Poller: &tele.LongPoller{Timeout: 10 * time.Second}, } From e061a480178a142e2e0584f3ec063be4b0edce7e Mon Sep 17 00:00:00 2001 From: shindakioku Date: Sun, 25 Feb 2024 13:02:50 +0200 Subject: [PATCH 3/6] Revert documentation to the bot --- bot.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/bot.go b/bot.go index f5b5afba..0473280d 100644 --- a/bot.go +++ b/bot.go @@ -163,13 +163,13 @@ var ( // // Example: // -// b.Handle("/start", func (c tele.Context) error { -// return c.Reply("Hello!") -// }) +// b.Handle("/start", func (c tele.Context) error { +// return c.Reply("Hello!") +// }) // -// b.Handle(&inlineButton, func (c tele.Context) error { -// return c.Respond(&tele.CallbackResponse{Text: "Hello!"}) -// }) +// b.Handle(&inlineButton, func (c tele.Context) error { +// return c.Respond(&tele.CallbackResponse{Text: "Hello!"}) +// }) // // Middleware usage: // From 81c92db4591cf5a2b571b07f70dd726d56f03d65 Mon Sep 17 00:00:00 2001 From: shindakioku Date: Sun, 25 Feb 2024 17:50:36 +0200 Subject: [PATCH 4/6] Refactoring --- flow/bus.go | 81 +++++++++++-------- flow/factory.go | 50 ++++++------ flow/flow.go | 13 +-- flow/machine.go | 56 +++---------- flow/state.go | 61 ++++++++------- flow/step.go | 10 ++- flow_simple_manual_testing/main.go | 122 +++++++++++++---------------- 7 files changed, 186 insertions(+), 207 deletions(-) diff --git a/flow/bus.go b/flow/bus.go index 615c69c7..eb574e9f 100644 --- a/flow/bus.go +++ b/flow/bus.go @@ -58,10 +58,11 @@ type Bus interface { } // describes the state to the [SimpleBus.states] value -type state struct { +type flowState struct { // User's flow - flow *Flow - state *State + flow *Flow + state State + machine Machine } // SimpleBus implementation for the [Bus] contract @@ -110,19 +111,18 @@ func (b *SimpleBus) Handle(c telebot.Context) error { return UserDoesNotHaveActiveFlow } - st := stV.(*state) + st := stV.(flowState) // Update context for the state - // @TODO: do we need to persist the latest context every time? - st.state.Context = c + st.state.Add(StateContextKey, c) // Get active step - step := st.flow.steps[st.state.Machine.ActiveStep()] - // Call validators if it defined + step := st.flow.steps[st.machine.ActiveStep()] + // Call validators if they are defined validators := step.validators if len(validators) > 0 { for _, validator := range validators { err := validator.Validate(st.state) if err != nil { - if st.flow.useValidatorErrorsAsUserResponse { + if st.flow.UseValidatorErrorsAsUserResponse { return c.Reply(err.Error()) } else { return err @@ -138,29 +138,43 @@ func (b *SimpleBus) Handle(c telebot.Context) error { } } - // Call [success] event if it's defined - if step.success != nil { - if err := step.success(st.state); err != nil { + activeStep := st.machine.ActiveStep() + // Call [then] event if it's defined + if step.then != nil { + if err := step.then(st.state, &step); err != nil { return err } } - // It was the last step. Call the [success] handler - if len(st.flow.steps) <= st.state.Machine.ActiveStep()+1 { + // It was the last step. Call the [then] handler + if len(st.flow.steps) <= st.machine.ActiveStep()+1 { b.removeState(c.Sender().ID) - return st.state.Machine.Success(st.state) + if st.flow.then == nil { + return nil + } + + return st.flow.then(st.state) } - // Process to the next step - err := st.state.Machine.Next(st.state) - if err != nil { - // Remove flow on any error occurring within flow logic. - // We need to call the [Fail] function because, typically, - // that handler should send something to the user like [Try again]. - b.removeState(c.Sender().ID) + // Sometimes, the user may navigate through steps within handlers. + // If this occurs, we don't need to call the [next] function because navigating + // through the machine already triggers it. + if activeStep == st.machine.ActiveStep() { + // Process to the next step + err := st.machine.Next(st.state) + if err != nil { + // Remove flow on any error occurring within flow logic. + // We need to call the [Fail] function because, typically, + // that handler should send something to the user like [Try again]. + b.removeState(c.Sender().ID) + + if st.flow.catch == nil { + return nil + } - return st.state.Machine.Fail(st.state, err) + return st.flow.catch(st.state, err) + } } return nil @@ -180,19 +194,24 @@ func (b *SimpleBus) Flow(factory *Factory) telebot.HandlerFunc { // If the user already has a flow, we need to recall the active step. stV, exists := b.states.Load(c.Sender().ID) if exists { - st := stV.(*state) - st.state.Context = c + st := stV.(flowState) + // Update context + st.state.Add(StateContextKey, c) - return st.state.Machine.ToStep(st.state.Machine.ActiveStep(), st.state) + return st.machine.ToStep(st.machine.ActiveStep(), st.state) } machine := NewMachine(factory.flow) + state := NewRuntimeState(factory.userState). + Add(StateContextKey, c). + Add(StateMachineKey, machine) // Register flow for the user - st := state{ - flow: factory.flow, - state: NewState(machine, c, factory.userState), + st := flowState{ + flow: factory.flow, + state: state, + machine: machine, } - b.states.Store(c.Sender().ID, &st) + b.states.Store(c.Sender().ID, st) // Call the machine for the start the first step return machine.ToStep(0, st.state) } @@ -204,7 +223,7 @@ func (b *SimpleBus) removeIdleFlows() { // For example, a developer may want to notify a user that their session has expired. } -func NewBus(bot *telebot.Bot, flowSessionIsAvailableFor time.Duration) Bus { +func NewBus(flowSessionIsAvailableFor time.Duration) Bus { bus := &SimpleBus{ flowSessionIsAvailableFor: flowSessionIsAvailableFor, } diff --git a/flow/factory.go b/flow/factory.go index 406c3ca9..b7f3043f 100644 --- a/flow/factory.go +++ b/flow/factory.go @@ -24,30 +24,23 @@ func (f *Factory) WithState(userState map[interface{}]interface{}) *Factory { return f } -// Success sets a handler for the [Flow.Success] event. -func (f *Factory) Success(handler StateHandler) *Factory { - f.flow.success = handler - - return f -} - -// Fail sets a handler for the [Flow.Fail] event. -func (f *Factory) Fail(handler FailHandler) *Factory { - f.flow.fail = handler +// Next adds a step to the [Flow.Steps] +func (f *Factory) Next(step *StepFactory) *Factory { + f.flow.steps = append(f.flow.steps, *step.step) return f } -// Step adds a step to the [Flow.Steps] -func (f *Factory) Step(step *StepFactory) *Factory { - f.flow.steps = append(f.flow.steps, *step.step) +// Then sets a handler for the [Flow.Success] event. +func (f *Factory) Then(handler StateHandler) *Factory { + f.flow.then = handler return f } -// UseValidatorErrorsAsUserResponse sets a value for the [Flow.useValidatorErrorsAsUserResponse]. -func (f *Factory) UseValidatorErrorsAsUserResponse(value bool) *Factory { - f.flow.useValidatorErrorsAsUserResponse = value +// Catch sets a handler for the [Flow.Fail] event. +func (f *Factory) Catch(handler FailHandler) *Factory { + f.flow.catch = handler return f } @@ -60,18 +53,19 @@ func New() *Factory { } } +// NewWithConfiguration start describing the flow. +func NewWithConfiguration(flow Flow) *Factory { + return &Factory{ + flow: &flow, + userState: make(map[interface{}]interface{}), + } +} + // StepFactory for creating a [Step] object. type StepFactory struct { step *Step } -// Begin sets a handler for the [Step.begin] event. -func (f *StepFactory) Begin(handler StateHandler) *StepFactory { - f.step.begin = handler - - return f -} - // Name sets a value for the [Step.name]. func (f *StepFactory) Name(name int) *StepFactory { f.step.name = name @@ -93,14 +87,14 @@ func (f *StepFactory) Assign(assign StateHandler) *StepFactory { return f } -// Success sets a value for the [Step.success]. -func (f *StepFactory) Success(success StateHandler) *StepFactory { - f.step.success = success +// Then sets a value for the [Step.then]. +func (f *StepFactory) Then(handler StepThenHandler) *StepFactory { + f.step.then = handler return f } // NewStep initiates the description of a step for the flow. -func NewStep() *StepFactory { - return &StepFactory{step: &Step{}} +func NewStep(handler StateHandler) *StepFactory { + return &StepFactory{step: &Step{handler: handler}} } diff --git a/flow/flow.go b/flow/flow.go index 6b863f17..167c8c7a 100644 --- a/flow/flow.go +++ b/flow/flow.go @@ -1,6 +1,6 @@ package flow -type FailHandler func(*State, error) error +type FailHandler func(State, error) error // Flow describes a process from beginning to end. It retains all defined steps, the user's final handler, and more. // Additionally, it offers a straightforward interface to access internal storage for marshaling and saving elsewhere. @@ -8,10 +8,13 @@ type Flow struct { // User's defined steps steps []Step // Calls after successfully passing full flow - success StateHandler - // Calls when user trigger fail step - fail FailHandler + then StateHandler + // Calls on any error (@TODO: update the comment) + catch FailHandler + + // User options + // Determines whether we need to send errors from a validator to the user as a response. // If true, errors from a validator are responded, otherwise, no response is sent. - useValidatorErrorsAsUserResponse bool + UseValidatorErrorsAsUserResponse bool } diff --git a/flow/machine.go b/flow/machine.go index d648bb12..5a529352 100644 --- a/flow/machine.go +++ b/flow/machine.go @@ -8,15 +8,11 @@ import ( // Machine describes the contract for the flow handling type Machine interface { // Back move backward by step - Back(state *State) error + Back(state State) error // Next move forward by step - Next(state *State) error + Next(state State) error // ToStep Move to the step - ToStep(step int, state *State) error - // Success stop processing and call the final function - Success(state *State) error - // Fail stop processing and call the fail function - Fail(state *State, err error) error + ToStep(step int, state State) error // ActiveStep returns the current step ActiveStep() int } @@ -25,16 +21,12 @@ type Machine interface { type SimpleMachine struct { // User defined flow flow *Flow - // Active step for the user activeStep int - - // Sets to true if failure was called. - failed bool } -func (m *SimpleMachine) Back(state *State) error { - if m.activeStep-1 <= 0 { +func (m *SimpleMachine) Back(state State) error { + if m.activeStep <= 0 { return errors.New("already first step") } @@ -43,7 +35,7 @@ func (m *SimpleMachine) Back(state *State) error { return m.run(state) } -func (m *SimpleMachine) Next(state *State) error { +func (m *SimpleMachine) Next(state State) error { if m.activeStep+1 >= len(m.flow.steps) { return errors.New("already last step") } @@ -53,7 +45,7 @@ func (m *SimpleMachine) Next(state *State) error { return m.run(state) } -func (m *SimpleMachine) ToStep(step int, state *State) error { +func (m *SimpleMachine) ToStep(step int, state State) error { if step < 0 { return errors.New("step cannot be less than zero") } @@ -67,52 +59,24 @@ func (m *SimpleMachine) ToStep(step int, state *State) error { return m.run(state) } -func (m *SimpleMachine) Success(state *State) error { - if m.failed { - return errors.New("flow was already failed") - } - - if m.flow.success != nil { - return m.flow.success(state) - } - - return nil -} - -func (m *SimpleMachine) Fail(state *State, err error) error { - m.failed = true - - if m.flow.fail != nil { - return m.flow.fail(state, err) - } - - return nil -} - func (m *SimpleMachine) ActiveStep() int { return m.activeStep } // Run the current step (this function should be called by [Back]/[Next]/[ToStep] functions). -func (m *SimpleMachine) run(state *State) error { - if m.failed { - return errors.New("flow was already failed") - } - +func (m *SimpleMachine) run(state State) error { if len(m.flow.steps) < m.activeStep { return errors.New(fmt.Sprintf("step isn't defined (%d)", m.activeStep)) } step := m.flow.steps[m.activeStep] - if step.begin != nil { - return step.begin(state) + if step.handler != nil { + return step.handler(state) } return nil } -//func (m *SimpleMachine) Continue() {} - func NewMachine(flow *Flow) Machine { return &SimpleMachine{ flow: flow, diff --git a/flow/state.go b/flow/state.go index d52b42a9..0c068eaf 100644 --- a/flow/state.go +++ b/flow/state.go @@ -1,23 +1,33 @@ package flow -import ( - "gopkg.in/telebot.v3" +// Defines keys for the basic data that must be in the state +const ( + StateMachineKey = "machine" + StateContextKey = "context" ) // StateHandler is a common handler for global processes, such as finally, fail, step, and so on. -type StateHandler func(state *State) error +type StateHandler func(State) error // State defines the user's state and persists common elements, such as the bot instance, flow handler instance, etc. -type State struct { - // Instance for the user's flow. - Machine Machine - // Received message from a user. - Context telebot.Context - - // @TODO: We can provide a full history of every step, - // including contexts, validation results, and so on. - // contextHistory map[string][]StepHistory +type State interface { + // Set initializes the [userState] field. + Set(userState map[interface{}]interface{}) + // Get returns data corresponding to the provided key, along with a boolean value indicating if the key exists. + Get(key interface{}) (interface{}, bool) + // Read returns data corresponding to the provided key. + // You can especially use this for fast type assertion. + // + // state.Read("machine").(flow.Machine) + Read(key interface{}) interface{} + // Add adds data to the storage. + // Caution: it does not check if the key already exists. It will overwrite any existing data associated with the key. + Add(key interface{}, value interface{}) State + // Exists returns a boolean representing whether the key exists in the map. + Exists(key interface{}) bool +} +type RuntimeState struct { // @TODO: make it concurrently safe? // User state represents any custom data for a user. // It's simply a container that you can use within steps. @@ -26,35 +36,32 @@ type State struct { userState map[interface{}]interface{} } -// Set initializes the [userState] field. -func (s *State) Set(userState map[interface{}]interface{}) { +func (s *RuntimeState) Set(userState map[interface{}]interface{}) { s.userState = userState } -// Get returns data corresponding to the provided key, along with a boolean value indicating if the key exists. -func (s *State) Get(key interface{}) (interface{}, bool) { +func (s *RuntimeState) Get(key interface{}) (interface{}, bool) { value, exists := s.userState[key] return value, exists } -// Put adds data to the storage. -// Caution: it does not check if the key already exists. It will overwrite any existing data associated with the key. -func (s *State) Put(key interface{}, value interface{}) { +func (s *RuntimeState) Read(key interface{}) interface{} { + return s.userState[key] +} + +func (s *RuntimeState) Add(key interface{}, value interface{}) State { s.userState[key] = value + + return s } -// Exists returns a boolean representing whether the key exists in the map. -func (s *State) Exists(key interface{}) bool { +func (s *RuntimeState) Exists(key interface{}) bool { _, exists := s.userState[key] return exists } -func NewState(machine Machine, c telebot.Context, userState map[interface{}]interface{}) *State { - return &State{ - Machine: machine, - Context: c, - userState: userState, - } +func NewRuntimeState(userState map[interface{}]interface{}) State { + return &RuntimeState{userState: userState} } diff --git a/flow/step.go b/flow/step.go index 40e51734..e04e7103 100644 --- a/flow/step.go +++ b/flow/step.go @@ -1,11 +1,14 @@ package flow +// StepThenHandler handler for the successfully completed step +type StepThenHandler func(State, *Step) error + // StepValidator ensures that each step can be validated before the flow process progresses to the next step. // Typically, users require simple validators, such as ensuring text is not empty or a photo is uploaded. // Therefore, having a variety of different validators and describing them in a single function is not ideal. type StepValidator interface { // Validate is called after the user prompts anything. - Validate(state *State) error + Validate(State) error } // Step describes a user's step within a flow. @@ -13,7 +16,7 @@ type Step struct { // This is the user's custom function called at the beginning of a step. // There are no restrictions on logic; the handler is not required, and the user can even use an empty mock. // Therefore, you can do whatever you want: move backward or forward steps, validate previously saved prompts, and so on. - begin StateHandler + handler StateHandler // Step name (by default, it is filled by auto-increment, but the user is able to define a custom value). name int // Defined validators @@ -22,6 +25,5 @@ type Step struct { // It can, for example, assign the user's prompt to a variable. assign StateHandler // Called if the step is successfully passed. - success StateHandler - // @TODO: provide an [OnFail] callback which is called only if validators fail. + then StepThenHandler } diff --git a/flow_simple_manual_testing/main.go b/flow_simple_manual_testing/main.go index b64b1940..7f193245 100644 --- a/flow_simple_manual_testing/main.go +++ b/flow_simple_manual_testing/main.go @@ -14,8 +14,8 @@ import ( type NonEmptyValidator struct{} -func (NonEmptyValidator) Validate(state *flow.State) error { - if len(state.Context.Message().Text) < 2 { +func (NonEmptyValidator) Validate(state flow.State) error { + if len(state.Read(flow.StateContextKey).(tele.Context).Message().Text) < 2 { return errors.New("message is required") } @@ -24,8 +24,8 @@ func (NonEmptyValidator) Validate(state *flow.State) error { type BadValidator struct{} -func (BadValidator) Validate(state *flow.State) error { - return nil +func (BadValidator) Validate(state flow.State) error { + return errors.New("test") //return state.Machine.Fail(state) } @@ -39,8 +39,8 @@ func TextAssigner(value interface{}) flow.StateHandler { vai := reflect.ValueOf(value) vai = vai.Elem() - return func(state *flow.State) error { - text := state.Context.Text() + return func(state flow.State) error { + text := state.Read(flow.StateContextKey).(tele.Context).Text() if len(text) == 0 { return nil } @@ -96,57 +96,56 @@ func main() { panic(err) } - sendUserMessage := func(message string) func(state *flow.State) error { - return func(state *flow.State) error { - return state.Context.Reply(message) + sendUserMessage := func(message string) func(flow.State) error { + return func(state flow.State) error { + return state.Read(flow.StateContextKey).(tele.Context).Reply(message) } } + stepCompletedLogging := func(state flow.State, step *flow.Step) error { + log.Println("Step completed") + + return nil + } // Configure flow bus - flowBus := flow.NewBus(b, 5*time.Minute) + flowBus := flow.NewBus(5 * time.Minute) // Handle any text by flow bus b.Handle(tele.OnText, flowBus.Handle) // First flow var email string b.Handle("/start", flowBus.Flow( flow.New(). - Step( - flow.NewStep(). - Begin(sendUserMessage("Enter email:")). + Next( + flow.NewStep(sendUserMessage("Enter email:")). + Validate(nonEmptyValidator). + Assign(TextAssigner(&email)). + Then(stepCompletedLogging), + ). + Next( + flow.NewStep(sendUserMessage("Enter password:")). Validate(nonEmptyValidator). Assign(TextAssigner(&email)), ). - Step( - flow.NewStep(). - Begin(sendUserMessage("Enter password:")). - Validate(badValidator). - Success(func(state *flow.State) error { - log.Println("Second step successfully passed!") + Next( + flow.NewStep(sendUserMessage("Third step:")). + Then(func(state flow.State, step *flow.Step) error { + //return state.Read(flow.StateMachineKey).(flow.Machine).ToStep(0, state) return nil }), ). - Step( - flow.NewStep(). - Begin(func(state *flow.State) error { - return nil - //return state.Machine.Back(state) - //return state.Machine.ToStep(0, state) - //return state.Machine.Next(state) - }), - ). - UseValidatorErrorsAsUserResponse(true). - Fail(func(state *flow.State, err error) error { - log.Println("Something get wrong: ", err) + Then(func(state flow.State) error { + log.Println("Steps are completed!") - return nil + return state.Read(flow.StateContextKey).(tele.Context).Reply("Done") }). - Success(func(state *flow.State) error { - log.Println(email) + Catch(func(state flow.State, err error) error { + log.Println("FAILED: ", err) - return state.Context.Reply("You have successfully completed all the steps!") + return nil }), )) + // Flow using state storage type user struct { email string @@ -156,54 +155,45 @@ func main() { b.Handle("/start2", flowBus.Flow( flow.New(). AddState(userStorageKey, &user{}). - Step( - flow.NewStep(). - Begin(sendUserMessage("Enter email:")). + Next( + flow.NewStep(sendUserMessage("Enter email:")). Validate(nonEmptyValidator). - Assign(func(state *flow.State) error { - value, _ := state.Get(userStorageKey) - userValue := value.(*user) - userValue.email = state.Context.Message().Text + Assign(func(state flow.State) error { + u := state.Read(userStorageKey).(*user) + u.email = state.Read(flow.StateContextKey).(tele.Context).Message().Text return nil }), ). - Step( - flow.NewStep(). - Begin(sendUserMessage("Enter password:")). - Validate(badValidator). - Assign(func(state *flow.State) error { - value, _ := state.Get(userStorageKey) - userValue := value.(*user) - userValue.password = state.Context.Message().Text + Next( + flow.NewStep(sendUserMessage("Enter password:")). + Validate(nonEmptyValidator). + Assign(func(state flow.State) error { + u := state.Read(userStorageKey).(*user) + u.password = state.Read(flow.StateContextKey).(tele.Context).Message().Text return nil }). - Success(func(state *flow.State) error { + Then(func(state flow.State, step *flow.Step) error { log.Println("Second step successfully passed!") return nil }), ). - Step( - flow.NewStep(). - Begin(func(state *flow.State) error { - return state.Machine.Fail(state, errors.New("should be passed to the [Fail]")) - //return state.Machine.Back(state) - //return state.Machine.ToStep(0, state) - //return state.Machine.Next(state) - }), + Next( + flow.NewStep(func(state flow.State) error { + return errors.New("should be passed to the [Catch]") + }), ). - UseValidatorErrorsAsUserResponse(true). - Fail(func(state *flow.State, err error) error { - log.Println("Something get wrong: ", err) + Then(func(state flow.State) error { + log.Println("Steps are completed!. User: ", state.Read(userStorageKey)) - return nil + return state.Read(flow.StateContextKey).(tele.Context).Reply("Done") }). - Success(func(state *flow.State) error { - log.Println(state.Get(userStorageKey)) + Catch(func(state flow.State, err error) error { + log.Println("FAILED: ", err) - return state.Context.Reply("You have successfully completed all the steps!") + return nil }), )) From c531d96a0bd10b1b948b2398d991ae414d51da77 Mon Sep 17 00:00:00 2001 From: shindakioku Date: Sun, 25 Feb 2024 17:53:04 +0200 Subject: [PATCH 5/6] Remove unused code. --- flow/factory.go | 7 ------- flow/step.go | 2 -- 2 files changed, 9 deletions(-) diff --git a/flow/factory.go b/flow/factory.go index b7f3043f..1568b180 100644 --- a/flow/factory.go +++ b/flow/factory.go @@ -66,13 +66,6 @@ type StepFactory struct { step *Step } -// Name sets a value for the [Step.name]. -func (f *StepFactory) Name(name int) *StepFactory { - f.step.name = name - - return f -} - // Validate sets values for the [Step.validators]. func (f *StepFactory) Validate(validators ...StepValidator) *StepFactory { f.step.validators = validators diff --git a/flow/step.go b/flow/step.go index e04e7103..7bef8afa 100644 --- a/flow/step.go +++ b/flow/step.go @@ -17,8 +17,6 @@ type Step struct { // There are no restrictions on logic; the handler is not required, and the user can even use an empty mock. // Therefore, you can do whatever you want: move backward or forward steps, validate previously saved prompts, and so on. handler StateHandler - // Step name (by default, it is filled by auto-increment, but the user is able to define a custom value). - name int // Defined validators validators []StepValidator // Callback called after the validation process if successful. From 632542d05e055a53652808e734b7fabe35e60b0c Mon Sep 17 00:00:00 2001 From: shindakioku Date: Mon, 26 Feb 2024 22:21:18 +0200 Subject: [PATCH 6/6] Update flow bus. Add metadata information. Improve catching handler --- flow/bus.go | 156 ++++++++++++++++++++--------- flow/factory.go | 28 ++---- flow/flow.go | 46 +++++++-- flow/step.go | 8 +- flow_simple_manual_testing/main.go | 46 ++++----- 5 files changed, 182 insertions(+), 102 deletions(-) diff --git a/flow/bus.go b/flow/bus.go index eb574e9f..918bdbb5 100644 --- a/flow/bus.go +++ b/flow/bus.go @@ -54,19 +54,23 @@ type Bus interface { Handle(telebot.Context) error // Flow initiates flow configuration - Flow(factory *Factory) telebot.HandlerFunc + Flow(endpoint interface{}, factory *Factory) error } // describes the state to the [SimpleBus.states] value type flowState struct { + // telegram bot endpoint + endpoint interface{} // User's flow - flow *Flow - state State - machine Machine + flow *Flow + state State + machine Machine + metaData *MetaData } // SimpleBus implementation for the [Bus] contract type SimpleBus struct { + bot *telebot.Bot // Stores the active user flows by their IDs. // Key - user id (int64) // Value - the [state] instance @@ -105,28 +109,19 @@ func (b *SimpleBus) UserContinueFlowOrCustom(handler telebot.HandlerFunc) telebo } } -func (b *SimpleBus) Handle(c telebot.Context) error { - stV, exists := b.states.Load(c.Sender().ID) - if !exists { - return UserDoesNotHaveActiveFlow - } - - st := stV.(flowState) - // Update context for the state - st.state.Add(StateContextKey, c) - // Get active step - step := st.flow.steps[st.machine.ActiveStep()] +// Calls the meta functions for the step [validators/assign/etc]. +func (b *SimpleBus) handleMetaForStep(st flowState, c telebot.Context, step Step) error { // Call validators if they are defined validators := step.validators if len(validators) > 0 { for _, validator := range validators { err := validator.Validate(st.state) + // Fill metadata information on error if err != nil { - if st.flow.UseValidatorErrorsAsUserResponse { - return c.Reply(err.Error()) - } else { - return err - } + st.metaData.FailureStage = MetaDataFailureStageValidation + st.metaData.FailedError = err + + return err } } } @@ -134,27 +129,84 @@ func (b *SimpleBus) Handle(c telebot.Context) error { // Call [assign] if step.assign != nil { if err := step.assign(st.state); err != nil { + // Fill step metadata + st.metaData.FailureStage = MetaDataFailureStageAssign + st.metaData.FailedError = err + return err } } + return nil +} + +// Call the [catch] function for the [flow] and remove the flow from the state. +func (b *SimpleBus) handleCatch(st flowState, c telebot.Context) error { + // Remove flow on any error occurring within flow logic. + // We need to call the [Fail] function because, typically, + // that handler should send something to the user like [Try again]. + b.removeState(c.Sender().ID) + + if st.flow.catch == nil { + return nil + } + + return st.flow.catch(st.state, st.metaData) +} + +func (b *SimpleBus) Handle(c telebot.Context) error { + stV, exists := b.states.Load(c.Sender().ID) + if !exists { + return UserDoesNotHaveActiveFlow + } + + st := stV.(flowState) + // Update context for the state + st.state.Add(StateContextKey, c) activeStep := st.machine.ActiveStep() - // Call [then] event if it's defined - if step.then != nil { - if err := step.then(st.state, &step); err != nil { - return err + // Get active step + step := st.flow.steps[activeStep] + // Begin filling metadata for the current step. + st.metaData.LastActiveStep = StepMetaData{Step: st.machine.ActiveStep()} + defer func() { + // Update the flowState for the user only if [failedError] is nil. + // Otherwise, if the flow failed and the [catch] function was called, + // we don't need to update the flow because it no longer exists. + if st.metaData.FailureStage == MetaDataFailureStageNone { + b.states.Store(c.Sender().ID, st) } + }() + + if err := b.handleMetaForStep(st, c, step); err != nil { + st.metaData.FailedStep = &st.metaData.LastActiveStep + + return b.handleCatch(st, c) } - // It was the last step. Call the [then] handler - if len(st.flow.steps) <= st.machine.ActiveStep()+1 { - b.removeState(c.Sender().ID) + // Since it is the last step, call the [then] handler. + if len(st.flow.steps) <= activeStep+1 { + // Call on each step handler if it is defined + if st.flow.onEachStep != nil { + st.flow.onEachStep(st.state, st.metaData.LastActiveStep) + } if st.flow.then == nil { + b.removeState(c.Sender().ID) + return nil } - return st.flow.then(st.state) + // If an error is returned, we need to call [catch] for the flow. + err := st.flow.then(st.state) + if err != nil { + // Fill step metadata + st.metaData.FailureStage = MetaDataFailureStageThen + st.metaData.FailedError = err + + return b.handleCatch(st, c) + } + + return err } // Sometimes, the user may navigate through steps within handlers. @@ -164,33 +216,33 @@ func (b *SimpleBus) Handle(c telebot.Context) error { // Process to the next step err := st.machine.Next(st.state) if err != nil { - // Remove flow on any error occurring within flow logic. - // We need to call the [Fail] function because, typically, - // that handler should send something to the user like [Try again]. - b.removeState(c.Sender().ID) + // Fill step metadata + st.metaData.FailureStage = MetaDataFailureStageBegin + st.metaData.FailedError = err - if st.flow.catch == nil { - return nil - } + return b.handleCatch(st, c) + } - return st.flow.catch(st.state, err) + // Call on each step handler if it is defined + if st.flow.onEachStep != nil { + st.flow.onEachStep(st.state, st.metaData.LastActiveStep) } } return nil } -// Remove [state] from the [SimpleBus.states] +// Remove the [state] from the [SimpleBus.states]. func (b *SimpleBus) removeState(userID int64) { b.states.Delete(userID) } -func (b *SimpleBus) Flow(factory *Factory) telebot.HandlerFunc { - return func(c telebot.Context) error { - if len(factory.flow.steps) == 0 { - return NoStepsDefined - } +func (b *SimpleBus) Flow(endpoint interface{}, factory *Factory) error { + if len(factory.flow.steps) == 0 { + return NoStepsDefined + } + b.bot.Handle(endpoint, func(c telebot.Context) error { // If the user already has a flow, we need to recall the active step. stV, exists := b.states.Load(c.Sender().ID) if exists { @@ -207,14 +259,23 @@ func (b *SimpleBus) Flow(factory *Factory) telebot.HandlerFunc { Add(StateMachineKey, machine) // Register flow for the user st := flowState{ - flow: factory.flow, - state: state, - machine: machine, + endpoint: endpoint, + flow: factory.flow, + state: state, + machine: machine, + metaData: &MetaData{ + Endpoint: endpoint, + // Sets the first step as the last active step. + LastActiveStep: StepMetaData{Step: 0}, + }, } b.states.Store(c.Sender().ID, st) + // Call the machine for the start the first step return machine.ToStep(0, st.state) - } + }) + + return nil } // Removes flows that have been active for longer than [flowSessionIsAvailableFor] time. @@ -223,8 +284,9 @@ func (b *SimpleBus) removeIdleFlows() { // For example, a developer may want to notify a user that their session has expired. } -func NewBus(flowSessionIsAvailableFor time.Duration) Bus { +func NewBus(bot *telebot.Bot, flowSessionIsAvailableFor time.Duration) Bus { bus := &SimpleBus{ + bot: bot, flowSessionIsAvailableFor: flowSessionIsAvailableFor, } diff --git a/flow/factory.go b/flow/factory.go index 1568b180..4fb038c3 100644 --- a/flow/factory.go +++ b/flow/factory.go @@ -24,21 +24,28 @@ func (f *Factory) WithState(userState map[interface{}]interface{}) *Factory { return f } -// Next adds a step to the [Flow.Steps] +// Next adds a step to the [Flow.steps] func (f *Factory) Next(step *StepFactory) *Factory { f.flow.steps = append(f.flow.steps, *step.step) return f } -// Then sets a handler for the [Flow.Success] event. +// OnEachStep sets a handler for the [Flow.onEachStep] event. +func (f *Factory) OnEachStep(handler OnEachStepHandler) *Factory { + f.flow.onEachStep = handler + + return f +} + +// Then sets a handler for the [Flow.then] event. func (f *Factory) Then(handler StateHandler) *Factory { f.flow.then = handler return f } -// Catch sets a handler for the [Flow.Fail] event. +// Catch sets a handler for the [Flow.catch] event. func (f *Factory) Catch(handler FailHandler) *Factory { f.flow.catch = handler @@ -53,14 +60,6 @@ func New() *Factory { } } -// NewWithConfiguration start describing the flow. -func NewWithConfiguration(flow Flow) *Factory { - return &Factory{ - flow: &flow, - userState: make(map[interface{}]interface{}), - } -} - // StepFactory for creating a [Step] object. type StepFactory struct { step *Step @@ -80,13 +79,6 @@ func (f *StepFactory) Assign(assign StateHandler) *StepFactory { return f } -// Then sets a value for the [Step.then]. -func (f *StepFactory) Then(handler StepThenHandler) *StepFactory { - f.step.then = handler - - return f -} - // NewStep initiates the description of a step for the flow. func NewStep(handler StateHandler) *StepFactory { return &StepFactory{step: &Step{handler: handler}} diff --git a/flow/flow.go b/flow/flow.go index 167c8c7a..16c6893f 100644 --- a/flow/flow.go +++ b/flow/flow.go @@ -1,6 +1,42 @@ package flow -type FailHandler func(State, error) error +// MetaDataFailureStage provides information on the case in which the step failed: begin/validation/assign, and so on. +type MetaDataFailureStage uint8 + +const ( + // MetaDataFailureStageNone indicates that the step was successfully passed. + MetaDataFailureStageNone MetaDataFailureStage = iota + // MetaDataFailureStageBegin means fail happened on the first stage (maybe some network problems for instance) + MetaDataFailureStageBegin + // MetaDataFailureStageValidation indicates that the user input prompted bad data + // or something occurred during the validation process. + MetaDataFailureStageValidation + // MetaDataFailureStageAssign means that the assigner function returned an error + // (which could be due to an internal problem, for instance). + MetaDataFailureStageAssign + MetaDataFailureStageThen +) + +// MetaData is an object that provides the user with information for different stages +type MetaData struct { + // Endpoint for the Telegram bot that is served by the flow. + Endpoint interface{} + // Provides the last active step. + // For example, this is useful when the flow was terminated due to inactivity. + LastActiveStep StepMetaData + FailureStage MetaDataFailureStage + // Sometimes the flow can fail due to a step. + // If this occurs, the data indicates the failed step. + FailedStep *StepMetaData + // Provides the error that caused the failure, if there was one. + FailedError error +} + +// OnEachStepHandler called after the step is executed. +// Please refer to the documentation for [Flow.onEachStep] below for more details. +type OnEachStepHandler func(State, StepMetaData) + +type FailHandler func(State, *MetaData) error // Flow describes a process from beginning to end. It retains all defined steps, the user's final handler, and more. // Additionally, it offers a straightforward interface to access internal storage for marshaling and saving elsewhere. @@ -11,10 +47,6 @@ type Flow struct { then StateHandler // Calls on any error (@TODO: update the comment) catch FailHandler - - // User options - - // Determines whether we need to send errors from a validator to the user as a response. - // If true, errors from a validator are responded, otherwise, no response is sent. - UseValidatorErrorsAsUserResponse bool + // This handler is called for each step and only on success. + onEachStep OnEachStepHandler } diff --git a/flow/step.go b/flow/step.go index 7bef8afa..68566fdc 100644 --- a/flow/step.go +++ b/flow/step.go @@ -1,5 +1,11 @@ package flow +// StepMetaData is an object that provides the user with information for different stages +type StepMetaData struct { + // Ordinal number + Step int +} + // StepThenHandler handler for the successfully completed step type StepThenHandler func(State, *Step) error @@ -22,6 +28,4 @@ type Step struct { // Callback called after the validation process if successful. // It can, for example, assign the user's prompt to a variable. assign StateHandler - // Called if the step is successfully passed. - then StepThenHandler } diff --git a/flow_simple_manual_testing/main.go b/flow_simple_manual_testing/main.go index 7f193245..34eddb8e 100644 --- a/flow_simple_manual_testing/main.go +++ b/flow_simple_manual_testing/main.go @@ -2,6 +2,7 @@ package main import ( "errors" + "fmt" tele "gopkg.in/telebot.v3" "gopkg.in/telebot.v3/flow" "log" @@ -101,25 +102,23 @@ func main() { return state.Read(flow.StateContextKey).(tele.Context).Reply(message) } } - stepCompletedLogging := func(state flow.State, step *flow.Step) error { - log.Println("Step completed") - - return nil + stepCompletedLogging := func(state flow.State, metadata flow.StepMetaData) { + log.Println(fmt.Sprintf("Step completed [%d]", metadata.Step)) } // Configure flow bus - flowBus := flow.NewBus(5 * time.Minute) + flowBus := flow.NewBus(b, 5*time.Minute) // Handle any text by flow bus b.Handle(tele.OnText, flowBus.Handle) // First flow var email string - b.Handle("/start", flowBus.Flow( + flowBus.Flow( + "/start", flow.New(). Next( flow.NewStep(sendUserMessage("Enter email:")). Validate(nonEmptyValidator). - Assign(TextAssigner(&email)). - Then(stepCompletedLogging), + Assign(TextAssigner(&email)), ). Next( flow.NewStep(sendUserMessage("Enter password:")). @@ -128,23 +127,18 @@ func main() { ). Next( flow.NewStep(sendUserMessage("Third step:")). - Then(func(state flow.State, step *flow.Step) error { - //return state.Read(flow.StateMachineKey).(flow.Machine).ToStep(0, state) - - return nil - }), + Validate(badValidator), ). + OnEachStep(stepCompletedLogging). Then(func(state flow.State) error { - log.Println("Steps are completed!") - return state.Read(flow.StateContextKey).(tele.Context).Reply("Done") }). - Catch(func(state flow.State, err error) error { - log.Println("FAILED: ", err) + Catch(func(state flow.State, metadata *flow.MetaData) error { + log.Println("Catch an error: ", metadata) - return nil + return state.Read(flow.StateContextKey).(tele.Context).Reply("Try later") }), - )) + ) // Flow using state storage type user struct { @@ -152,7 +146,8 @@ func main() { password string } userStorageKey := "user" - b.Handle("/start2", flowBus.Flow( + flowBus.Flow( + "/start2", flow.New(). AddState(userStorageKey, &user{}). Next( @@ -172,11 +167,6 @@ func main() { u := state.Read(userStorageKey).(*user) u.password = state.Read(flow.StateContextKey).(tele.Context).Message().Text - return nil - }). - Then(func(state flow.State, step *flow.Step) error { - log.Println("Second step successfully passed!") - return nil }), ). @@ -190,12 +180,12 @@ func main() { return state.Read(flow.StateContextKey).(tele.Context).Reply("Done") }). - Catch(func(state flow.State, err error) error { - log.Println("FAILED: ", err) + Catch(func(state flow.State, metadata *flow.MetaData) error { + log.Println("FAILED: ", metadata.FailedError) return nil }), - )) + ) b.Start() }