Skip to content

Commit c6700d5

Browse files
committed
state of world so I dont lose work
1 parent e76be4a commit c6700d5

File tree

190 files changed

+1462
-28045
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

190 files changed

+1462
-28045
lines changed

Gopkg.lock

+8-2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Gopkg.toml

+8
Original file line numberDiff line numberDiff line change
@@ -68,3 +68,11 @@
6868
[[constraint]]
6969
name = "github.com/nyaruka/gocommon"
7070
version = "0.2.0"
71+
72+
[[constraint]]
73+
branch = "master"
74+
name = "github.com/juju/errors"
75+
76+
[[constraint]]
77+
name = "github.com/shopspring/decimal"
78+
version = "1.1.0"

models/assets.go

+242
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,242 @@
1+
package models
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"sync"
7+
"time"
8+
9+
"github.com/nyaruka/goflow/legacy"
10+
"github.com/nyaruka/goflow/utils"
11+
"github.com/sirupsen/logrus"
12+
13+
"github.com/jmoiron/sqlx"
14+
"github.com/nyaruka/goflow/flows"
15+
)
16+
17+
type OrgID int
18+
19+
// OrgAssets is the set of assets for an organization. These are loaded lazily from the database as asked for.
20+
// OrgAssets are thread safe and can be shared across goroutines.
21+
// OrgAssets implement the flows.SessionAssets interface.
22+
type OrgAssets struct {
23+
ctx context.Context
24+
db *sqlx.DB
25+
orgID OrgID
26+
env utils.Environment
27+
28+
channels *flows.ChannelSet
29+
channelsByID map[flows.ChannelID]flows.Channel
30+
channelOnce sync.Once
31+
32+
fields *flows.FieldSet
33+
fieldsByUUID map[FieldUUID]*flows.Field
34+
fieldsLock sync.RWMutex
35+
36+
groups *flows.GroupSet
37+
groupsByID map[flows.GroupID]*flows.Group
38+
groupOnce sync.Once
39+
40+
labels *flows.LabelSet
41+
labelsLock sync.RWMutex
42+
43+
resthooks *flows.ResthookSet
44+
resthooksLock sync.RWMutex
45+
46+
flows map[flows.FlowUUID]flows.Flow
47+
flowIDs map[flows.FlowUUID]FlowID
48+
flowsLock sync.RWMutex
49+
50+
// TODO: implement locations
51+
locations *flows.LocationHierarchySet
52+
locationsLock sync.RWMutex
53+
}
54+
55+
func NewOrgAssets(ctx context.Context, db *sqlx.DB, orgID OrgID) *OrgAssets {
56+
return &OrgAssets{
57+
ctx: ctx,
58+
db: db,
59+
orgID: orgID,
60+
env: utils.NewDefaultEnvironment(),
61+
62+
flows: make(map[flows.FlowUUID]flows.Flow),
63+
}
64+
}
65+
66+
func (o *OrgAssets) GetOrgID() OrgID {
67+
return o.orgID
68+
}
69+
70+
func (o *OrgAssets) GetChannelByID(id flows.ChannelID) (flows.Channel, error) {
71+
_, err := o.GetChannelSet()
72+
if err != nil {
73+
return nil, err
74+
}
75+
channel, found := o.channelsByID[id]
76+
if !found {
77+
return nil, fmt.Errorf("no channel found with ID: %d", id)
78+
}
79+
return channel, nil
80+
}
81+
82+
func (o *OrgAssets) GetChannel(uuid flows.ChannelUUID) (flows.Channel, error) {
83+
cs, err := o.GetChannelSet()
84+
if err != nil {
85+
return nil, err
86+
}
87+
88+
return cs.FindByUUID(uuid), nil
89+
}
90+
91+
func (o *OrgAssets) GetChannelSet() (*flows.ChannelSet, error) {
92+
return o.channels, nil
93+
}
94+
95+
func (o *OrgAssets) GetFieldByUUID(uuid FieldUUID) (*flows.Field, error) {
96+
_, err := o.GetFieldSet()
97+
if err != nil {
98+
return nil, err
99+
}
100+
field, found := o.fieldsByUUID[uuid]
101+
if !found {
102+
return nil, nil
103+
}
104+
105+
return field, nil
106+
}
107+
108+
func (o *OrgAssets) GetField(key string) (*flows.Field, error) {
109+
fs, err := o.GetFieldSet()
110+
if err != nil {
111+
return nil, err
112+
}
113+
return fs.FindByKey(key), nil
114+
}
115+
116+
func (o *OrgAssets) GetFieldSet() (*flows.FieldSet, error) {
117+
return o.fields, nil
118+
}
119+
120+
func (o *OrgAssets) GetFlow(uuid flows.FlowUUID) (flows.Flow, error) {
121+
o.flowsLock.RLock()
122+
flow, found := o.flows[uuid]
123+
o.flowsLock.RUnlock()
124+
if found {
125+
return flow, nil
126+
}
127+
128+
flow, err := o.loadFlow(uuid)
129+
if err != nil {
130+
return nil, err
131+
}
132+
o.flowsLock.Lock()
133+
o.flows[uuid] = flow
134+
o.flowsLock.Unlock()
135+
136+
return flow, nil
137+
}
138+
139+
func (o *OrgAssets) GetFlowID(uuid flows.FlowUUID) (FlowID, error) {
140+
o.flowsLock.RLock()
141+
flowID, found := o.flowIDs[uuid]
142+
o.flowsLock.RUnlock()
143+
if !found {
144+
return -1, fmt.Errorf("no flow known with uuid: %s", uuid)
145+
}
146+
return flowID, nil
147+
}
148+
149+
func (o *OrgAssets) GetGroupByID(id flows.GroupID) (*flows.Group, error) {
150+
_, err := o.GetGroupSet()
151+
if err != nil {
152+
return nil, err
153+
}
154+
group, found := o.groupsByID[id]
155+
if !found {
156+
return nil, fmt.Errorf("no group found with id: %d", id)
157+
}
158+
return group, nil
159+
}
160+
161+
func (o *OrgAssets) GetGroup(uuid flows.GroupUUID) (*flows.Group, error) {
162+
gs, err := o.GetGroupSet()
163+
if err != nil {
164+
return nil, err
165+
}
166+
return gs.FindByUUID(uuid), nil
167+
}
168+
169+
func (o *OrgAssets) GetGroupSet() (*flows.GroupSet, error) {
170+
return o.groups, nil
171+
}
172+
173+
func (o *OrgAssets) GetLabel(uuid flows.LabelUUID) (*flows.Label, error) {
174+
ls, err := o.GetLabelSet()
175+
if err != nil {
176+
return nil, err
177+
}
178+
return ls.FindByUUID(uuid), nil
179+
}
180+
181+
func (o *OrgAssets) GetLabelSet() (*flows.LabelSet, error) {
182+
return o.labels, nil
183+
}
184+
185+
func (o *OrgAssets) HasLocations() bool {
186+
return false
187+
}
188+
189+
func (o *OrgAssets) GetLocationHierarchySet() (*flows.LocationHierarchySet, error) {
190+
return nil, nil
191+
}
192+
193+
func (o *OrgAssets) GetResthookSet() (*flows.ResthookSet, error) {
194+
return o.resthooks, nil
195+
}
196+
197+
const selectFlowSQL = `
198+
SELECT
199+
fr.definition::jsonb ||
200+
jsonb_build_object(
201+
'flow_type', f.flow_type,
202+
'metadata', jsonb_build_object(
203+
'uuid', f.uuid,
204+
'id', f.id,
205+
'name', f.name,
206+
'revision', fr.revision,
207+
'expires', f.expires_after_minutes
208+
)
209+
) as definition
210+
FROM
211+
flows_flowrevision fr,
212+
flows_flow f
213+
WHERE
214+
f.uuid = $1 AND
215+
fr.flow_id = f.id AND
216+
fr.is_active = TRUE AND
217+
f.is_active = TRUE
218+
ORDER BY
219+
revision DESC LIMIT 1;`
220+
221+
// loads the flow with the passed in UUID
222+
func (o *OrgAssets) loadFlow(uuid flows.FlowUUID) (flows.Flow, error) {
223+
ctx, cancel := context.WithTimeout(o.ctx, time.Second*15)
224+
defer cancel()
225+
226+
var definition string
227+
err := o.db.GetContext(ctx, &definition, selectFlowSQL, uuid)
228+
if err != nil {
229+
return nil, err
230+
}
231+
232+
// load it in from our json
233+
legacyFlow, err := legacy.ReadLegacyFlow([]byte(definition))
234+
if err != nil {
235+
logrus.WithField("definition", definition).WithError(err).Error("error loading flow")
236+
return nil, err
237+
}
238+
239+
// migrate forwards returning our final flow definition
240+
flow, err := legacyFlow.Migrate(false, false)
241+
return flow, err
242+
}

models/channels.go

+87
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
package models
2+
3+
import (
4+
"context"
5+
6+
"github.com/jmoiron/sqlx"
7+
"github.com/juju/errors"
8+
"github.com/lib/pq"
9+
"github.com/nyaruka/goflow/flows"
10+
)
11+
12+
// Channel is the mailman struct that represents channels
13+
type Channel struct {
14+
id flows.ChannelID
15+
uuid flows.ChannelUUID
16+
name string
17+
address string
18+
schemes []string
19+
roles []string
20+
}
21+
22+
// ID returns the id of this channel
23+
func (c *Channel) ID() flows.ChannelID { return c.id }
24+
25+
// UUID returns the UUID of this channel
26+
func (c *Channel) UUID() flows.ChannelUUID { return c.uuid }
27+
28+
// Name returns the name of this channel
29+
func (c *Channel) Name() string { return c.name }
30+
31+
// Address returns the name of this channel
32+
func (c *Channel) Address() string { return c.address }
33+
34+
// Schemes returns the schemes this channel supports
35+
func (c *Channel) Schemes() []string { return c.schemes }
36+
37+
// Roles returns the roles this channel supports
38+
func (c *Channel) Roles() []string { return c.roles }
39+
40+
// loadChannels loads all the channels for the passed in org
41+
func loadChannels(ctx context.Context, db sqlx.Queryer, orgID OrgID) ([]*Channel, error) {
42+
rows, err := db.Query(selectChannelsSQL, orgID)
43+
if err != nil {
44+
return nil, errors.Annotatef(err, "error querying channels for org: %d", orgID)
45+
}
46+
defer rows.Close()
47+
48+
channels := make([]*Channel, 0, 2)
49+
for rows.Next() {
50+
channel := &Channel{}
51+
52+
err := rows.Scan(&channel.id, &channel.uuid, &channel.name, &channel.address, pq.Array(&channel.schemes), pq.Array(&channel.roles))
53+
if err != nil {
54+
return nil, errors.Annotate(err, "error scanning channel row")
55+
}
56+
57+
channels = append(channels, channel)
58+
}
59+
60+
return channels, nil
61+
}
62+
63+
const selectChannelsSQL = `
64+
SELECT
65+
id,
66+
uuid,
67+
name,
68+
address,
69+
schemes,
70+
(SELECT ARRAY(
71+
SELECT CASE r
72+
WHEN 'R' THEN 'receive'
73+
WHEN 'S' THEN 'send'
74+
WHEN 'C' THEN 'call'
75+
WHEN 'A' THEN 'answer'
76+
WHEN 'U' THEN 'ussd'
77+
END
78+
FROM unnest(regexp_split_to_array(role,'')) as r)
79+
) as roles
80+
FROM
81+
channels_channel
82+
WHERE
83+
org_id = $1 AND
84+
is_active = TRUE
85+
ORDER BY
86+
created_on ASC
87+
`

0 commit comments

Comments
 (0)