diff --git a/flow/bus.go b/flow/bus.go new file mode 100644 index 00000000..918bdbb5 --- /dev/null +++ b/flow/bus.go @@ -0,0 +1,297 @@ +package flow + +import ( + "errors" + "gopkg.in/telebot.v3" + "sync" + "time" +) + +var ( + UserDoesNotHaveActiveFlow = errors.New("flow: user does not have active flow") + NoStepsDefined = errors.New("flow: no steps defined") +) + +// The Bus handles user actions, such as [telebot.OnText, telebot.OnMedia, etc]. +// Flow processing involves handling any user prompt after the user has begun the flow process. +// We offer this contract to give developers more control, avoiding reliance on obscure mechanisms. +type Bus interface { + // UserInFlow returns true if the user is currently engaged in flow processing. + // + // Example: + // bot.Handle("/start", func(c telebot.Context) error { + // if flowBus.UserInFlow(c.Sender().ID) { // Reply with an error message. } + // + // return c.Reply("Hello!") + // }) + UserInFlow(userID int64) bool + // UserContinueFlow initiates or continues the flow process for a user if one is already in progress. + // + // Example: + // bot.Handle("/start", func(c telebot.Context) error { + // if flowBus.UserInFlow(c.Sender().ID) { flowBus.UserContinueFlow(c.Sender().ID) } + // + // return c.Reply("Hello!") + // }) + UserContinueFlow(userID int64, c telebot.Context) error + // UserContinueFlowOrCustom calls [UserContinueFlow] if the flow process for a user is in progress. + // Otherwise, it calls a custom function. + // For instance, you may need to call this function to define a custom handler for any action required by the flow. + // + // Example: + // + // bot.Handle(telebot.OnText, flowBus.ProcessUserToFlowOrCustom(func (c telebot.Context) error { + // // Called only if the user hasn't begun the flow. + // + // return nil + // })) + UserContinueFlowOrCustom(telebot.HandlerFunc) telebot.HandlerFunc + // Handle implements any message handler. + // This function checks if the user is continuing work on their active flow and processes it if so. + // + // Example: + // bot.Handle(telebot.OnText, flowBus.Handle) + Handle(telebot.Context) error + + // Flow initiates flow configuration + Flow(endpoint interface{}, factory *Factory) error +} + +// describes the state to the [SimpleBus.states] value +type flowState struct { + // telegram bot endpoint + endpoint interface{} + // User's flow + flow *Flow + state State + machine Machine + metaData *MetaData +} + +// SimpleBus implementation for the [Bus] contract +type SimpleBus struct { + bot *telebot.Bot + // Stores the active user flows by their IDs. + // Key - user id (int64) + // Value - the [state] instance + states sync.Map + // We don't need to keep active flows indefinitely. + // This setting defines the maximum lifespan for each flow. + // Background process will remove flows that have been alive longer than the defined duration. + // @TODO: Provide a callback handler for every deletion process. + flowSessionIsAvailableFor time.Duration +} + +func (b *SimpleBus) UserInFlow(userID int64) bool { + _, exists := b.states.Load(userID) + + return exists +} + +func (b *SimpleBus) UserContinueFlow(userID int64, c telebot.Context) error { + //flow, exists := b.states.Load(userID) + _, exists := b.states.Load(userID) + if !exists { + return UserDoesNotHaveActiveFlow + } + + // @TODO: call machine + return nil +} + +func (b *SimpleBus) UserContinueFlowOrCustom(handler telebot.HandlerFunc) telebot.HandlerFunc { + return func(c telebot.Context) error { + if b.UserInFlow(c.Sender().ID) { + return b.UserContinueFlow(c.Sender().ID, c) + } + + return handler(c) + } +} + +// Calls the meta functions for the step [validators/assign/etc]. +func (b *SimpleBus) handleMetaForStep(st flowState, c telebot.Context, step Step) error { + // Call validators if they are defined + validators := step.validators + if len(validators) > 0 { + for _, validator := range validators { + err := validator.Validate(st.state) + // Fill metadata information on error + if err != nil { + st.metaData.FailureStage = MetaDataFailureStageValidation + st.metaData.FailedError = err + + return err + } + } + } + + // Call [assign] + if step.assign != nil { + if err := step.assign(st.state); err != nil { + // Fill step metadata + st.metaData.FailureStage = MetaDataFailureStageAssign + st.metaData.FailedError = err + + return err + } + } + + return nil +} + +// Call the [catch] function for the [flow] and remove the flow from the state. +func (b *SimpleBus) handleCatch(st flowState, c telebot.Context) error { + // Remove flow on any error occurring within flow logic. + // We need to call the [Fail] function because, typically, + // that handler should send something to the user like [Try again]. + b.removeState(c.Sender().ID) + + if st.flow.catch == nil { + return nil + } + + return st.flow.catch(st.state, st.metaData) +} + +func (b *SimpleBus) Handle(c telebot.Context) error { + stV, exists := b.states.Load(c.Sender().ID) + if !exists { + return UserDoesNotHaveActiveFlow + } + + st := stV.(flowState) + // Update context for the state + st.state.Add(StateContextKey, c) + activeStep := st.machine.ActiveStep() + // Get active step + step := st.flow.steps[activeStep] + // Begin filling metadata for the current step. + st.metaData.LastActiveStep = StepMetaData{Step: st.machine.ActiveStep()} + defer func() { + // Update the flowState for the user only if [failedError] is nil. + // Otherwise, if the flow failed and the [catch] function was called, + // we don't need to update the flow because it no longer exists. + if st.metaData.FailureStage == MetaDataFailureStageNone { + b.states.Store(c.Sender().ID, st) + } + }() + + if err := b.handleMetaForStep(st, c, step); err != nil { + st.metaData.FailedStep = &st.metaData.LastActiveStep + + return b.handleCatch(st, c) + } + + // Since it is the last step, call the [then] handler. + if len(st.flow.steps) <= activeStep+1 { + // Call on each step handler if it is defined + if st.flow.onEachStep != nil { + st.flow.onEachStep(st.state, st.metaData.LastActiveStep) + } + + if st.flow.then == nil { + b.removeState(c.Sender().ID) + + return nil + } + + // If an error is returned, we need to call [catch] for the flow. + err := st.flow.then(st.state) + if err != nil { + // Fill step metadata + st.metaData.FailureStage = MetaDataFailureStageThen + st.metaData.FailedError = err + + return b.handleCatch(st, c) + } + + return err + } + + // Sometimes, the user may navigate through steps within handlers. + // If this occurs, we don't need to call the [next] function because navigating + // through the machine already triggers it. + if activeStep == st.machine.ActiveStep() { + // Process to the next step + err := st.machine.Next(st.state) + if err != nil { + // Fill step metadata + st.metaData.FailureStage = MetaDataFailureStageBegin + st.metaData.FailedError = err + + return b.handleCatch(st, c) + } + + // Call on each step handler if it is defined + if st.flow.onEachStep != nil { + st.flow.onEachStep(st.state, st.metaData.LastActiveStep) + } + } + + return nil +} + +// Remove the [state] from the [SimpleBus.states]. +func (b *SimpleBus) removeState(userID int64) { + b.states.Delete(userID) +} + +func (b *SimpleBus) Flow(endpoint interface{}, factory *Factory) error { + if len(factory.flow.steps) == 0 { + return NoStepsDefined + } + + b.bot.Handle(endpoint, func(c telebot.Context) error { + // If the user already has a flow, we need to recall the active step. + stV, exists := b.states.Load(c.Sender().ID) + if exists { + st := stV.(flowState) + // Update context + st.state.Add(StateContextKey, c) + + return st.machine.ToStep(st.machine.ActiveStep(), st.state) + } + + machine := NewMachine(factory.flow) + state := NewRuntimeState(factory.userState). + Add(StateContextKey, c). + Add(StateMachineKey, machine) + // Register flow for the user + st := flowState{ + endpoint: endpoint, + flow: factory.flow, + state: state, + machine: machine, + metaData: &MetaData{ + Endpoint: endpoint, + // Sets the first step as the last active step. + LastActiveStep: StepMetaData{Step: 0}, + }, + } + b.states.Store(c.Sender().ID, st) + + // Call the machine for the start the first step + return machine.ToStep(0, st.state) + }) + + return nil +} + +// Removes flows that have been active for longer than [flowSessionIsAvailableFor] time. +func (b *SimpleBus) removeIdleFlows() { + // @TODO: Provide an API for clients. + // For example, a developer may want to notify a user that their session has expired. +} + +func NewBus(bot *telebot.Bot, flowSessionIsAvailableFor time.Duration) Bus { + bus := &SimpleBus{ + bot: bot, + flowSessionIsAvailableFor: flowSessionIsAvailableFor, + } + + // @TODO: do we need to create an API for users to interact with this? + go bus.removeIdleFlows() + + return bus +} diff --git a/flow/factory.go b/flow/factory.go new file mode 100644 index 00000000..4fb038c3 --- /dev/null +++ b/flow/factory.go @@ -0,0 +1,85 @@ +package flow + +// This package contains factories for describing flows. +// Factories generate a flow object in a simple manner. + +// Factory for creating a [Flow] object. +type Factory struct { + flow *Flow + // Represents any user state with [State.userState]. + userState map[interface{}]interface{} +} + +// AddState adds a state to the [Factory.userState] +func (f *Factory) AddState(key interface{}, value interface{}) *Factory { + f.userState[key] = value + + return f +} + +// WithState sets a value for [Factory.userState] +func (f *Factory) WithState(userState map[interface{}]interface{}) *Factory { + f.userState = userState + + return f +} + +// Next adds a step to the [Flow.steps] +func (f *Factory) Next(step *StepFactory) *Factory { + f.flow.steps = append(f.flow.steps, *step.step) + + return f +} + +// OnEachStep sets a handler for the [Flow.onEachStep] event. +func (f *Factory) OnEachStep(handler OnEachStepHandler) *Factory { + f.flow.onEachStep = handler + + return f +} + +// Then sets a handler for the [Flow.then] event. +func (f *Factory) Then(handler StateHandler) *Factory { + f.flow.then = handler + + return f +} + +// Catch sets a handler for the [Flow.catch] event. +func (f *Factory) Catch(handler FailHandler) *Factory { + f.flow.catch = handler + + return f +} + +// New start describing the flow. +func New() *Factory { + return &Factory{ + flow: &Flow{}, + userState: make(map[interface{}]interface{}), + } +} + +// StepFactory for creating a [Step] object. +type StepFactory struct { + step *Step +} + +// Validate sets values for the [Step.validators]. +func (f *StepFactory) Validate(validators ...StepValidator) *StepFactory { + f.step.validators = validators + + return f +} + +// Assign sets a value for the [Step.assign]. +func (f *StepFactory) Assign(assign StateHandler) *StepFactory { + f.step.assign = assign + + return f +} + +// NewStep initiates the description of a step for the flow. +func NewStep(handler StateHandler) *StepFactory { + return &StepFactory{step: &Step{handler: handler}} +} diff --git a/flow/flow.go b/flow/flow.go new file mode 100644 index 00000000..16c6893f --- /dev/null +++ b/flow/flow.go @@ -0,0 +1,52 @@ +package flow + +// MetaDataFailureStage provides information on the case in which the step failed: begin/validation/assign, and so on. +type MetaDataFailureStage uint8 + +const ( + // MetaDataFailureStageNone indicates that the step was successfully passed. + MetaDataFailureStageNone MetaDataFailureStage = iota + // MetaDataFailureStageBegin means fail happened on the first stage (maybe some network problems for instance) + MetaDataFailureStageBegin + // MetaDataFailureStageValidation indicates that the user input prompted bad data + // or something occurred during the validation process. + MetaDataFailureStageValidation + // MetaDataFailureStageAssign means that the assigner function returned an error + // (which could be due to an internal problem, for instance). + MetaDataFailureStageAssign + MetaDataFailureStageThen +) + +// MetaData is an object that provides the user with information for different stages +type MetaData struct { + // Endpoint for the Telegram bot that is served by the flow. + Endpoint interface{} + // Provides the last active step. + // For example, this is useful when the flow was terminated due to inactivity. + LastActiveStep StepMetaData + FailureStage MetaDataFailureStage + // Sometimes the flow can fail due to a step. + // If this occurs, the data indicates the failed step. + FailedStep *StepMetaData + // Provides the error that caused the failure, if there was one. + FailedError error +} + +// OnEachStepHandler called after the step is executed. +// Please refer to the documentation for [Flow.onEachStep] below for more details. +type OnEachStepHandler func(State, StepMetaData) + +type FailHandler func(State, *MetaData) error + +// Flow describes a process from beginning to end. It retains all defined steps, the user's final handler, and more. +// Additionally, it offers a straightforward interface to access internal storage for marshaling and saving elsewhere. +type Flow struct { + // User's defined steps + steps []Step + // Calls after successfully passing full flow + then StateHandler + // Calls on any error (@TODO: update the comment) + catch FailHandler + // This handler is called for each step and only on success. + onEachStep OnEachStepHandler +} diff --git a/flow/machine.go b/flow/machine.go new file mode 100644 index 00000000..5a529352 --- /dev/null +++ b/flow/machine.go @@ -0,0 +1,85 @@ +package flow + +import ( + "errors" + "fmt" +) + +// Machine describes the contract for the flow handling +type Machine interface { + // Back move backward by step + Back(state State) error + // Next move forward by step + Next(state State) error + // ToStep Move to the step + ToStep(step int, state State) error + // ActiveStep returns the current step + ActiveStep() int +} + +// SimpleMachine implements the [Machine] contract +type SimpleMachine struct { + // User defined flow + flow *Flow + // Active step for the user + activeStep int +} + +func (m *SimpleMachine) Back(state State) error { + if m.activeStep <= 0 { + return errors.New("already first step") + } + + m.activeStep -= 1 + + return m.run(state) +} + +func (m *SimpleMachine) Next(state State) error { + if m.activeStep+1 >= len(m.flow.steps) { + return errors.New("already last step") + } + + m.activeStep += 1 + + return m.run(state) +} + +func (m *SimpleMachine) ToStep(step int, state State) error { + if step < 0 { + return errors.New("step cannot be less than zero") + } + + if step > len(m.flow.steps) { + return errors.New("step cannot be greater than steps count") + } + + m.activeStep = step + + return m.run(state) +} + +func (m *SimpleMachine) ActiveStep() int { + return m.activeStep +} + +// Run the current step (this function should be called by [Back]/[Next]/[ToStep] functions). +func (m *SimpleMachine) run(state State) error { + if len(m.flow.steps) < m.activeStep { + return errors.New(fmt.Sprintf("step isn't defined (%d)", m.activeStep)) + } + + step := m.flow.steps[m.activeStep] + if step.handler != nil { + return step.handler(state) + } + + return nil +} + +func NewMachine(flow *Flow) Machine { + return &SimpleMachine{ + flow: flow, + activeStep: 0, + } +} diff --git a/flow/state.go b/flow/state.go new file mode 100644 index 00000000..0c068eaf --- /dev/null +++ b/flow/state.go @@ -0,0 +1,67 @@ +package flow + +// Defines keys for the basic data that must be in the state +const ( + StateMachineKey = "machine" + StateContextKey = "context" +) + +// StateHandler is a common handler for global processes, such as finally, fail, step, and so on. +type StateHandler func(State) error + +// State defines the user's state and persists common elements, such as the bot instance, flow handler instance, etc. +type State interface { + // Set initializes the [userState] field. + Set(userState map[interface{}]interface{}) + // Get returns data corresponding to the provided key, along with a boolean value indicating if the key exists. + Get(key interface{}) (interface{}, bool) + // Read returns data corresponding to the provided key. + // You can especially use this for fast type assertion. + // + // state.Read("machine").(flow.Machine) + Read(key interface{}) interface{} + // Add adds data to the storage. + // Caution: it does not check if the key already exists. It will overwrite any existing data associated with the key. + Add(key interface{}, value interface{}) State + // Exists returns a boolean representing whether the key exists in the map. + Exists(key interface{}) bool +} + +type RuntimeState struct { + // @TODO: make it concurrently safe? + // User state represents any custom data for a user. + // It's simply a container that you can use within steps. + // For instance, you may use it if you want to populate a struct at each step + // and then use this data after the flow has successfully completed. + userState map[interface{}]interface{} +} + +func (s *RuntimeState) Set(userState map[interface{}]interface{}) { + s.userState = userState +} + +func (s *RuntimeState) Get(key interface{}) (interface{}, bool) { + value, exists := s.userState[key] + + return value, exists +} + +func (s *RuntimeState) Read(key interface{}) interface{} { + return s.userState[key] +} + +func (s *RuntimeState) Add(key interface{}, value interface{}) State { + s.userState[key] = value + + return s +} + +func (s *RuntimeState) Exists(key interface{}) bool { + _, exists := s.userState[key] + + return exists +} + +func NewRuntimeState(userState map[interface{}]interface{}) State { + return &RuntimeState{userState: userState} +} diff --git a/flow/step.go b/flow/step.go new file mode 100644 index 00000000..68566fdc --- /dev/null +++ b/flow/step.go @@ -0,0 +1,31 @@ +package flow + +// StepMetaData is an object that provides the user with information for different stages +type StepMetaData struct { + // Ordinal number + Step int +} + +// StepThenHandler handler for the successfully completed step +type StepThenHandler func(State, *Step) error + +// StepValidator ensures that each step can be validated before the flow process progresses to the next step. +// Typically, users require simple validators, such as ensuring text is not empty or a photo is uploaded. +// Therefore, having a variety of different validators and describing them in a single function is not ideal. +type StepValidator interface { + // Validate is called after the user prompts anything. + Validate(State) error +} + +// Step describes a user's step within a flow. +type Step struct { + // This is the user's custom function called at the beginning of a step. + // There are no restrictions on logic; the handler is not required, and the user can even use an empty mock. + // Therefore, you can do whatever you want: move backward or forward steps, validate previously saved prompts, and so on. + handler StateHandler + // Defined validators + validators []StepValidator + // Callback called after the validation process if successful. + // It can, for example, assign the user's prompt to a variable. + assign StateHandler +} diff --git a/flow_simple_manual_testing/main.go b/flow_simple_manual_testing/main.go new file mode 100644 index 00000000..34eddb8e --- /dev/null +++ b/flow_simple_manual_testing/main.go @@ -0,0 +1,191 @@ +package main + +import ( + "errors" + "fmt" + tele "gopkg.in/telebot.v3" + "gopkg.in/telebot.v3/flow" + "log" + "reflect" + "strconv" + "time" +) + +// Validators: I'm certain that we need to implement some basic validators for users. + +type NonEmptyValidator struct{} + +func (NonEmptyValidator) Validate(state flow.State) error { + if len(state.Read(flow.StateContextKey).(tele.Context).Message().Text) < 2 { + return errors.New("message is required") + } + + return nil +} + +type BadValidator struct{} + +func (BadValidator) Validate(state flow.State) error { + return errors.New("test") + //return state.Machine.Fail(state) +} + +var ( + nonEmptyValidator = NonEmptyValidator{} + badValidator = BadValidator{} +) + +// TextAssigner I'm certain that we need to implement some basic assigner for users. +func TextAssigner(value interface{}) flow.StateHandler { + vai := reflect.ValueOf(value) + vai = vai.Elem() + + return func(state flow.State) error { + text := state.Read(flow.StateContextKey).(tele.Context).Text() + if len(text) == 0 { + return nil + } + + switch vai.Kind() { + case reflect.Int, reflect.Int8, reflect.Int16, + reflect.Int32, reflect.Int64: + n, err := strconv.ParseInt(text, 10, 32) + if err != nil { + return err + } + + vai.SetInt(n) + case reflect.Uint, reflect.Uint8, reflect.Uint16, + reflect.Uint32, reflect.Uint64, reflect.Uintptr: + n, err := strconv.ParseUint(text, 10, 32) + if err != nil { + return err + } + + vai.SetUint(n) + // ...floating-point and complex cases omitted for brevity... + case reflect.Bool: + n, err := strconv.ParseBool(text) + if err != nil { + return err + } + + vai.SetBool(n) + case reflect.String: + vai.SetString(text) + //case reflect.Chan, reflect.Func, reflect.Ptr, reflect.Slice, reflect.Map: + // return v.Type().String() + " 0x" + + // strconv.FormatUint(uint64(v.Pointer()), 16) + default: // reflect.Array, reflect.Struct, reflect.Interface + vai.SetString(text) + case reflect.Invalid: + return errors.New("invalid type") + } + + return nil + } +} + +func main() { + pref := tele.Settings{ + Token: "", + Poller: &tele.LongPoller{Timeout: 10 * time.Second}, + } + + b, err := tele.NewBot(pref) + if err != nil { + panic(err) + } + + sendUserMessage := func(message string) func(flow.State) error { + return func(state flow.State) error { + return state.Read(flow.StateContextKey).(tele.Context).Reply(message) + } + } + stepCompletedLogging := func(state flow.State, metadata flow.StepMetaData) { + log.Println(fmt.Sprintf("Step completed [%d]", metadata.Step)) + } + + // Configure flow bus + flowBus := flow.NewBus(b, 5*time.Minute) + // Handle any text by flow bus + b.Handle(tele.OnText, flowBus.Handle) + // First flow + var email string + flowBus.Flow( + "/start", + flow.New(). + Next( + flow.NewStep(sendUserMessage("Enter email:")). + Validate(nonEmptyValidator). + Assign(TextAssigner(&email)), + ). + Next( + flow.NewStep(sendUserMessage("Enter password:")). + Validate(nonEmptyValidator). + Assign(TextAssigner(&email)), + ). + Next( + flow.NewStep(sendUserMessage("Third step:")). + Validate(badValidator), + ). + OnEachStep(stepCompletedLogging). + Then(func(state flow.State) error { + return state.Read(flow.StateContextKey).(tele.Context).Reply("Done") + }). + Catch(func(state flow.State, metadata *flow.MetaData) error { + log.Println("Catch an error: ", metadata) + + return state.Read(flow.StateContextKey).(tele.Context).Reply("Try later") + }), + ) + + // Flow using state storage + type user struct { + email string + password string + } + userStorageKey := "user" + flowBus.Flow( + "/start2", + flow.New(). + AddState(userStorageKey, &user{}). + Next( + flow.NewStep(sendUserMessage("Enter email:")). + Validate(nonEmptyValidator). + Assign(func(state flow.State) error { + u := state.Read(userStorageKey).(*user) + u.email = state.Read(flow.StateContextKey).(tele.Context).Message().Text + + return nil + }), + ). + Next( + flow.NewStep(sendUserMessage("Enter password:")). + Validate(nonEmptyValidator). + Assign(func(state flow.State) error { + u := state.Read(userStorageKey).(*user) + u.password = state.Read(flow.StateContextKey).(tele.Context).Message().Text + + return nil + }), + ). + Next( + flow.NewStep(func(state flow.State) error { + return errors.New("should be passed to the [Catch]") + }), + ). + Then(func(state flow.State) error { + log.Println("Steps are completed!. User: ", state.Read(userStorageKey)) + + return state.Read(flow.StateContextKey).(tele.Context).Reply("Done") + }). + Catch(func(state flow.State, metadata *flow.MetaData) error { + log.Println("FAILED: ", metadata.FailedError) + + return nil + }), + ) + + b.Start() +}