Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Listable resource iteration through Iterator #543

Merged
merged 24 commits into from
Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 68 additions & 58 deletions .codegen/api.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"
"github.com/databricks/databricks-sdk-go/retries"
"github.com/databricks/databricks-sdk-go/client"
"github.com/databricks/databricks-sdk-go/listing"
"github.com/databricks/databricks-sdk-go/useragent"
)
{{range .Services}}
Expand Down Expand Up @@ -162,70 +163,69 @@ func (a *{{.Service.Name}}API) {{.PascalName}}AndWait(ctx context.Context{{if .R
{{.Comment "// " 80}}
//
// This method is generated by Databricks SDK Code Generator.
func (a *{{.Service.Name}}API) {{.PascalName}}All(ctx context.Context{{if .Request}}, request {{.Request.PascalName}}{{end}}) ([]{{ template "type" .Pagination.Entity }}, error) {
{{if .Pagination.MultiRequest}}var results []{{.Pagination.Entity.PascalName}}
{{ if .Pagination.Limit -}}
var totalCount {{template "type" .Pagination.Limit.Entity}} = 0
{{ end -}}
ctx = useragent.InContext(ctx, "sdk-feature", "pagination")
{{if .NeedsOffsetDedupe -}}
// deduplicate items that may have been added during iteration
seen := map[{{template "type" .IdentifierField.Entity}}]bool{}
{{end}}{{if eq .Pagination.Increment 1 -}}
func (a *{{.Service.Name}}API) {{.PascalName}}(ctx context.Context{{if .Request}}, request {{.Request.PascalName}}{{end}}) {{ template "paginated-return-type" . }} {
{{ if not .Request }}request := struct{}{}{{end}}
{{if eq .Pagination.Increment 1 -}}
request.{{.Pagination.Offset.PascalName}} = 1 // start iterating from the first page
{{end}}for {
response, err := a.impl.{{.PascalName}}(ctx{{if .Request}}, request{{end}})
if err != nil {
return nil, err
}
if len(response.{{.Pagination.Results.PascalName}}) == 0 {
break
}
for _, v := range response.{{.Pagination.Results.PascalName}} {
{{- if .NeedsOffsetDedupe -}}
id := v{{ template "field-path" .IdFieldPath }}
if seen[id] {
// item was added during iteration
continue
}
seen[id] = true
{{- end}}
results = append(results, v)
{{end}}
getNextPage := func(ctx context.Context, req {{ template "paginated-request-type" . }}) (*{{ .Response.PascalName }}, error) {
ctx = useragent.InContext(ctx, "sdk-feature", "pagination")
return a.impl.{{.PascalName}}(ctx{{if .Request}}, req{{end}})
}
getItems := func(resp *{{ .Response.PascalName }}) []{{ template "type" .Pagination.Entity }} {
return {{if .Pagination.Results}}resp.{{.Pagination.Results.PascalName}}{{else}}resp{{end}}
}
{{if .Pagination.MultiRequest -}}
getNextReq := func(resp *{{ .Response.PascalName }}) *{{.Request.PascalName}} {
{{if .Pagination.Token -}}
if resp.{{.Pagination.Token.Bind.PascalName}} == "" {
return nil
}
{{ if and .Pagination.Token .Pagination.Limit -}}
count := {{template "type" .Pagination.Limit.Entity}}(len(response.{{.Pagination.Results.PascalName}}))
totalCount += count
{{ else if .Pagination.Offset -}}
count := {{template "type" .Pagination.Offset.Entity}}(len(response.{{.Pagination.Results.PascalName}}))
totalCount += count
{{ end -}}
{{if eq .Path "/api/2.0/clusters/events" -}}
if response.NextPage == nil {
break
request.{{.Pagination.Token.PollField.PascalName}} = resp.{{.Pagination.Token.Bind.PascalName}}
{{- else if eq .Path "/api/2.0/clusters/events" -}}
if resp.NextPage == nil {
return nil
}
request = *response.NextPage
{{- else if .Pagination.Token -}}
request.{{.Pagination.Token.PollField.PascalName}} = response.{{.Pagination.Token.Bind.PascalName}}
if response.{{.Pagination.Token.Bind.PascalName}} == "" {
break
request = *resp.NextPage
{{ else -}}
if len(getItems(resp)) == 0 {
return nil
}
{{- else if eq .Pagination.Increment 1 -}}
request.{{.Pagination.Offset.PascalName}}++
{{ if eq .Pagination.Increment 1 -}}
request.{{.Pagination.Offset.PascalName}} = resp.{{.Pagination.Offset.PascalName}} + 1
{{- else -}}
request.{{.Pagination.Offset.PascalName}} += {{template "type" .Pagination.Offset.Entity}}(len(response.{{.Pagination.Results.PascalName}}))
{{- end}}
{{ if .Pagination.Limit -}}
limit := request.{{.Pagination.Limit.PascalName}}
if limit > 0 && totalCount >= limit {
break
}
{{- end -}}
}
return results, nil{{else if .Pagination.Results}}response, err := a.impl.{{.PascalName}}(ctx{{if .Request}}, request{{end}})
if err != nil {
return nil, err
request.{{.Pagination.Offset.PascalName}} = resp.{{.Pagination.Offset.PascalName}} + {{template "type" .Pagination.Offset.Entity}}(len(resp.{{.Pagination.Results.PascalName}}))
{{- end}}{{ end }}
return &request
}
return response.{{.Pagination.Results.PascalName}}, nil{{else}}return a.impl.{{.PascalName}}(ctx, request){{end}}
{{- end}}
iterator := listing.NewIterator(
&request,
getNextPage,
getItems,
{{if .Pagination.MultiRequest}}getNextReq{{else}}nil{{end}})
{{ if .NeedsOffsetDedupe -}}
dedupedIterator := listing.NewDedupeIterator[{{ template "type" .Pagination.Entity }}, {{ template "type" .IdentifierField.Entity }}](
iterator,
func(item {{ template "type" .Pagination.Entity }}) {{ template "type" .IdentifierField.Entity }} {
return item{{ template "field-path" .IdFieldPath }}
})
return dedupedIterator
{{- else -}}
return iterator
{{- end }}
}

{{.Comment "// " 80}}
//
// This method is generated by Databricks SDK Code Generator.
func (a *{{.Service.Name}}API) {{.PascalName}}All(ctx context.Context{{if .Request}}, request {{.Request.PascalName}}{{end}}) ([]{{ template "type" .Pagination.Entity }}, error) {
iterator := a.{{.PascalName}}(ctx{{if .Request}}, request{{end}})
{{ if .Pagination.Limit -}}
return listing.ToSliceN[{{ template "type" .Pagination.Entity }}, {{ template "type" .Pagination.Limit.Entity }}](ctx, iterator, request.{{.Pagination.Limit.PascalName}})
{{ else -}}
return listing.ToSlice[{{ template "type" .Pagination.Entity }}](ctx, iterator)
{{- end }}
}
{{end}}{{if .NamedIdMap}}
// {{.NamedIdMap.PascalName}} calls [{{.Service.Name}}API.{{.PascalName}}{{if not .NamedIdMap.Direct}}All{{end -}}] and creates a map of results with [{{.NamedIdMap.Entity.PascalName}}]{{ template "field-path" .NamedIdMap.NamePath }} as key and [{{.NamedIdMap.Entity.PascalName}}]{{ template "field-path" .NamedIdMap.IdPath}} as value.
Expand Down Expand Up @@ -301,4 +301,14 @@ func (a *{{.Service.Name}}API) {{.Shortcut.PascalName}}AndWait(ctx context.Conte
{{- end}}
{{- define "field-path" -}}
{{- range .}}.{{.PascalName}}{{end}}
{{- end -}}
{{- define "paginated-request-type" -}}
{{if .Request}}{{.Request.PascalName}}{{else}}struct{}{{end}}
{{- end -}}
{{- define "paginated-return-type" -}}
{{ if .NeedsOffsetDedupe -}}
*listing.DeduplicatingIterator[{{ template "type" .Pagination.Entity }}, {{ template "type" .IdentifierField.Entity }}]
{{- else -}}
*listing.PaginatingIterator[{{ template "paginated-request-type" .}}, *{{ .Response.PascalName }}, {{ template "type" .Pagination.Entity }}]
{{- end -}}
{{- end -}}
219 changes: 219 additions & 0 deletions listing/listing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
package listing

import (
"context"
"errors"
)

// Iterator[T] is an iterator over items of type T. It is similar to
// java.util.Iterator. It is not thread-safe.
type Iterator[T any] interface {
// HasNext returns true if there are more items to iterate over. HasNext
// will also return true if the iterator needs to fetch the next page of
// items from the underlying source and the request fails, even if there
// are no more items to iterate over. In this case, Next will return the
// error.
HasNext(context.Context) bool

// Next returns the next item in the iterator. If there are no more items
// to iterate over, it returns ErrNoMoreItems. If there was an error
// fetching the next page of items, it returns that error. Once Next
// returns ErrNoMoreItems or an error, it will continue to return that
// error.
Next(context.Context) (T, error)
}

// ToSlice returns all items from the iterator as a slice. If there was an
// error fetching items at any time, it returns that error.
func ToSlice[T any](ctx context.Context, it Iterator[T]) ([]T, error) {
var items []T
for it.HasNext(ctx) {
item, err := it.Next(ctx)
if err != nil {
return nil, err
}
items = append(items, item)
}
return items, nil
}

// ToSliceN returns the first N items from the iterator as a slice. If there
// was an error fetching items at any time, it returns that error.
func ToSliceN[T any, Limit ~int | ~int64](ctx context.Context, it Iterator[T], n Limit) ([]T, error) {
var items []T
for it.HasNext(ctx) && Limit(len(items)) < n {
item, err := it.Next(ctx)
if err != nil {
return nil, err
}
items = append(items, item)
}
return items, nil
}

// Use struct{} for Req for iterators that don't need any request structure.
type PaginatingIterator[Req, Resp, T any] struct {
// nextReq is the request to be used to fetch the next page. If nil, then
// there is no next page to fetch.
nextReq *Req

// getNextPage fetches the next page of items, returning the deserialized
// response and error.
getNextPage func(context.Context, Req) (Resp, error)

// getItems selects the items being iterated over from the response.
getItems func(Resp) []T

// getNextReq is used to get the next request to be used in the next page.
// If nil, then there is no next page to fetch.
getNextReq func(Resp) *Req

// currentPage is the current page of items.
currentPage []T

// currentPageIdx is the index of the next item from currentPage to return.
currentPageIdx int

// lastErr is the last error returned by getNextPage.
lastErr error
}

var ErrNoMoreItems = errors.New("no more items")

// Returns a new iterator. The iterator will fetch the next page of items
// lazily, when needed. nextReq is the request to be used to fetch the initial
// page. If nil, then no page will be fetched. getNextPage fetches the next
// page of items, returning the deserialized response and error. getItems
// selects the items being iterated over from the response. getNextReq is used
// to get the next request to be used in the next page. If the returned value
// is nil, then there is no next page to fetch.
func NewIterator[Req, Resp, T any](
nextReq *Req,
getNextPage func(context.Context, Req) (Resp, error),
getItems func(Resp) []T,
getNextReq func(Resp) *Req,
) *PaginatingIterator[Req, Resp, T] {
return &PaginatingIterator[Req, Resp, T]{
nextReq: nextReq,
getNextPage: getNextPage,
getItems: getItems,
getNextReq: getNextReq,
}
}

func (i *PaginatingIterator[Req, Resp, T]) loadNextPageIfNeeded(ctx context.Context) error {
if i.currentPageIdx < len(i.currentPage) {
return nil
}
if i.nextReq == nil {
i.currentPage = nil
i.currentPageIdx = 0
return nil
}
mgyucht marked this conversation as resolved.
Show resolved Hide resolved
if i.lastErr != nil {
return i.lastErr
}

// Keep loading pages while we have a next request and the current page is
// empty.
i.currentPage = nil
i.currentPageIdx = 0
// Endpoints using token-based pagination may return an empty page with a
// next token. We need to keep fetching pages until we get a non-empty
// page or there are no more pages.
for i.nextReq != nil && len(i.currentPage) == 0 {
mgyucht marked this conversation as resolved.
Show resolved Hide resolved
resp, err := i.getNextPage(ctx, *i.nextReq)
i.lastErr = err
if err != nil {
return err
}
i.nextReq = i.getNextReq(resp)
mgyucht marked this conversation as resolved.
Show resolved Hide resolved
i.currentPage = i.getItems(resp)
}
return nil
}

func (i *PaginatingIterator[Req, Resp, T]) Next(ctx context.Context) (T, error) {
var t T
pietern marked this conversation as resolved.
Show resolved Hide resolved
err := i.loadNextPageIfNeeded(ctx)
if err != nil {
return t, err
}
if i.currentPageIdx >= len(i.currentPage) {
return t, ErrNoMoreItems
pietern marked this conversation as resolved.
Show resolved Hide resolved
}
item := i.currentPage[i.currentPageIdx]
i.currentPageIdx++
return item, nil
}

func (i *PaginatingIterator[Req, Resp, T]) HasNext(ctx context.Context) bool {
err := i.loadNextPageIfNeeded(ctx)
// As described in the documentation for HasNext, if there was an error
// fetching the next page, we still return true to allow the user to handle
// the error in Next.
if err != nil {
return true
mgyucht marked this conversation as resolved.
Show resolved Hide resolved
}
return i.currentPageIdx < len(i.currentPage)
}

type DeduplicatingIterator[T any, Id comparable] struct {
it Iterator[T]
getId func(T) Id
seen map[Id]struct{}
current *T
}

func NewDedupeIterator[T any, Id comparable](it Iterator[T], getId func(T) Id) *DeduplicatingIterator[T, Id] {
return &DeduplicatingIterator[T, Id]{
it: it,
getId: getId,
seen: make(map[Id]struct{}),
}
}

func (i *DeduplicatingIterator[T, Id]) Next(ctx context.Context) (T, error) {
if i.current != nil {
t := *i.current
i.current = nil
return t, nil
}
for {
t, err := i.it.Next(ctx)
if err != nil {
return t, err
}
id := i.getId(t)
if _, ok := i.seen[id]; !ok {
i.seen[id] = struct{}{}
return t, nil
}
}
}

func (i *DeduplicatingIterator[T, Id]) HasNext(ctx context.Context) bool {
if i.current != nil {
return true
}
// To compute HasNext in DeduplicatingIterator, we need to actually fetch
// the next item from the underlying iterator and compare it to seen items.
// However, the retrieved item cannot be discarded, as it needs to be
// returned by the next call to Next. So we store the item in current and
// return it in the next call to Next, after which current is set to nil.
for {
t, err := i.it.Next(ctx)
if errors.Is(err, ErrNoMoreItems) {
return false
}
if err != nil {
return true
}
id := i.getId(t)
if _, ok := i.seen[id]; !ok {
i.current = &t
i.seen[id] = struct{}{}
return true
}
}
mgyucht marked this conversation as resolved.
Show resolved Hide resolved
}
Loading
Loading