Skip to content

Commit bfb2d70

Browse files
authored
Merge pull request #3 from mimiro-io/feature/add-concurrent-processing-example
added sample code to process entities in parallel
2 parents a332339 + 49a422c commit bfb2d70

File tree

2 files changed

+70
-1
lines changed

2 files changed

+70
-1
lines changed

sample/sample_transform.go

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
ct "github.com/mimiro-io/common-http-transform"
66
egdm "github.com/mimiro-io/entity-graph-data-model"
7+
"sync"
78
)
89

910
// EnrichConfig is a function that can be used to enrich the config by reading additional files or environment variables
@@ -30,7 +31,69 @@ type SampleTransform struct {
3031
func (dl *SampleTransform) Stop(_ context.Context) error { return nil }
3132

3233
func (dl *SampleTransform) Transform(ec *egdm.EntityCollection) (*egdm.EntityCollection, ct.TransformError) {
33-
return ec, nil
34+
result, err := processEntitiesConcurrently(ec.Entities, 10, processEntity)
35+
if err != nil {
36+
return nil, ct.Err(err, 1)
37+
}
38+
return &egdm.EntityCollection{Entities: result}, nil
39+
}
40+
41+
type Result struct {
42+
Index int
43+
Entity *egdm.Entity
44+
Error error
45+
}
46+
47+
func processEntitiesConcurrently(entities []*egdm.Entity, concurrency int, handler func(entity *egdm.Entity) (*egdm.Entity, error)) ([]*egdm.Entity, error) {
48+
var wg sync.WaitGroup
49+
results := make([]Result, len(entities))
50+
resultsChan := make(chan Result, len(entities))
51+
52+
// Number of goroutines to use
53+
itemsPerGoroutine := (len(entities) + concurrency - 1) / concurrency
54+
55+
for i := 0; i < concurrency; i++ {
56+
start := i * itemsPerGoroutine
57+
end := start + itemsPerGoroutine
58+
if end > len(entities) {
59+
end = len(entities)
60+
}
61+
62+
wg.Add(1)
63+
go func(start, end, index int) {
64+
defer wg.Done()
65+
for j := start; j < end; j++ {
66+
result, err := handler(entities[j])
67+
resultsChan <- Result{Index: j, Entity: result, Error: err}
68+
}
69+
}(start, end, i)
70+
}
71+
72+
go func() {
73+
wg.Wait()
74+
close(resultsChan)
75+
}()
76+
77+
for result := range resultsChan {
78+
if result.Error != nil {
79+
return nil, result.Error
80+
}
81+
results[result.Index] = result
82+
}
83+
84+
// filter out non nil
85+
finalResults := make([]*egdm.Entity, 0)
86+
for _, result := range results {
87+
if result.Entity != nil {
88+
finalResults = append(finalResults, result.Entity)
89+
}
90+
}
91+
92+
return finalResults, nil
93+
}
94+
95+
func processEntity(entity *egdm.Entity) (*egdm.Entity, error) {
96+
return entity, nil
3497
}
3598

3699
// NewSampleTransform is a factory function that creates a new instance of the sample transform

sample/sample_transform_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,12 @@ func TestNewSampleDataLayer(t *testing.T) {
5555
"props": {
5656
"http://example.com/name": "John Smith"
5757
}
58+
},
59+
{
60+
"id": "http://example.com/2",
61+
"props": {
62+
"http://example.com/name": "James Shadwell"
63+
}
5864
}
5965
]`
6066

0 commit comments

Comments
 (0)