Skip to content

Commit

Permalink
Fix 16k file bucket (#143)
Browse files Browse the repository at this point in the history
* File bucket handles archives less than 16kb

* file bucket can return less bytes than requested [#144]
  • Loading branch information
bdon authored Feb 25, 2024
1 parent 245df71 commit 5e5daa7
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 20 deletions.
21 changes: 16 additions & 5 deletions pmtiles/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,15 @@ func (m mockBucket) NewRangeReaderEtag(_ context.Context, key string, offset int
if len(etag) > 0 && resultEtag != etag {
return nil, "", 412, &RefreshRequiredError{}
}
if offset+length > int64(len(bs)) {
if offset > int64(len(bs)) {
return nil, "", 416, &RefreshRequiredError{416}
}

return io.NopCloser(bytes.NewReader(bs[offset:(offset + length)])), resultEtag, 206, nil
end := offset + length
if end > int64(len(bs)) {
end = int64(len(bs))
}
return io.NopCloser(bytes.NewReader(bs[offset:end])), resultEtag, 206, nil
}

// FileBucket is a bucket backed by a directory on disk
Expand Down Expand Up @@ -124,12 +128,19 @@ func (b FileBucket) NewRangeReaderEtag(_ context.Context, key string, offset, le
}
result := make([]byte, length)
read, err := file.ReadAt(result, offset)

if err == io.EOF {
part := result[0:read]
return io.NopCloser(bytes.NewReader(part)), newEtag, 206, nil
}

if err != nil {
return nil, "", 500, err
}
if read != int(length) {
return nil, "", 416, fmt.Errorf("Expected to read %d bytes but only read %d", length, read)
}

return io.NopCloser(bytes.NewReader(result)), newEtag, 206, nil
}

Expand Down Expand Up @@ -172,7 +183,7 @@ func (b HTTPBucket) NewRangeReaderEtag(ctx context.Context, key string, offset,

if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent {
resp.Body.Close()
if isRefreshRequredCode(resp.StatusCode) {
if isRefreshRequiredCode(resp.StatusCode) {
err = &RefreshRequiredError{resp.StatusCode}
} else {
err = fmt.Errorf("HTTP error: %d", resp.StatusCode)
Expand All @@ -187,7 +198,7 @@ func (b HTTPBucket) Close() error {
return nil
}

func isRefreshRequredCode(code int) bool {
func isRefreshRequiredCode(code int) bool {
return code == http.StatusPreconditionFailed || code == http.StatusRequestedRangeNotSatisfiable
}

Expand Down Expand Up @@ -217,7 +228,7 @@ func (ba BucketAdapter) NewRangeReaderEtag(ctx context.Context, key string, offs
status = 404
if resp != nil {
status = resp.StatusCode()
if isRefreshRequredCode(resp.StatusCode()) {
if isRefreshRequiredCode(resp.StatusCode()) {
return nil, "", resp.StatusCode(), &RefreshRequiredError{resp.StatusCode()}
}
}
Expand Down
28 changes: 20 additions & 8 deletions pmtiles/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package pmtiles

import (
"context"
"fmt"
"io"
"net/http"
"os"
Expand Down Expand Up @@ -110,24 +109,23 @@ func TestHttpBucketRequestRequestEtagFailed(t *testing.T) {
_, _, status, err := bucket.NewRangeReaderEtag(context.Background(), "a/b/c", 0, 3, "etag1")
assert.Equal(t, "etag1", mock.request.Header.Get("If-Match"))
assert.Equal(t, 412, status)
assert.True(t, isRefreshRequredError(err))
assert.True(t, isRefreshRequiredError(err))

mock.response.StatusCode = 416
_, _, status, err = bucket.NewRangeReaderEtag(context.Background(), "a/b/c", 0, 3, "etag1")
assert.Equal(t, 416, status)
assert.True(t, isRefreshRequredError(err))
assert.True(t, isRefreshRequiredError(err))

mock.response.StatusCode = 404
_, _, status, err = bucket.NewRangeReaderEtag(context.Background(), "a/b/c", 0, 3, "etag1")
assert.False(t, isRefreshRequredError(err))
assert.False(t, isRefreshRequiredError(err))
assert.Equal(t, 404, status)
}

func TestFileBucketReplace(t *testing.T) {
tmp := t.TempDir()
bucketURL, _, err := NormalizeBucketKey("", tmp, "")
assert.Nil(t, err)
fmt.Println(bucketURL)
bucket, err := OpenBucket(context.Background(), bucketURL, "")
assert.Nil(t, err)
assert.NotNil(t, bucket)
Expand All @@ -154,7 +152,7 @@ func TestFileBucketReplace(t *testing.T) {
// and requesting with old etag fails with refresh required error
_, _, status, err = bucket.NewRangeReaderEtag(context.Background(), "archive.pmtiles", 1, 1, etag1)
assert.Equal(t, 412, status)
assert.True(t, isRefreshRequredError(err))
assert.True(t, isRefreshRequiredError(err))
}

func TestFileBucketRename(t *testing.T) {
Expand All @@ -164,7 +162,6 @@ func TestFileBucketRename(t *testing.T) {

bucketURL, _, err := NormalizeBucketKey("", tmp, "")
assert.Nil(t, err)
fmt.Println(bucketURL)
bucket, err := OpenBucket(context.Background(), bucketURL, "")
assert.Nil(t, err)
assert.NotNil(t, bucket)
Expand Down Expand Up @@ -192,5 +189,20 @@ func TestFileBucketRename(t *testing.T) {
// and requesting with old etag fails with refresh required error
_, _, status, err = bucket.NewRangeReaderEtag(context.Background(), "archive.pmtiles", 1, 1, etag1)
assert.Equal(t, 412, status)
assert.True(t, isRefreshRequredError(err))
assert.True(t, isRefreshRequiredError(err))
}

func TestFileShorterThan16K(t *testing.T) {
tmp := t.TempDir()
assert.Nil(t, os.WriteFile(filepath.Join(tmp, "archive.pmtiles"), []byte{1, 2, 3}, 0666))

bucketURL, _, err := NormalizeBucketKey("", tmp, "")
bucket, err := OpenBucket(context.Background(), bucketURL, "")

reader, _, status, err := bucket.NewRangeReaderEtag(context.Background(), "archive.pmtiles", 0, 16384, "")
assert.Equal(t, 206, status)
assert.Nil(t, err)
data, err := io.ReadAll(reader)
assert.Nil(t, err)
assert.Equal(t, 3, len(data))
}
8 changes: 4 additions & 4 deletions pmtiles/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func (server *Server) Start() {

if err != nil {
ok = false
result.badEtag = isRefreshRequredError(err)
result.badEtag = isRefreshRequiredError(err)
resps <- response{key: key, value: result}
server.logger.Printf("failed to fetch %s %d-%d, %v", key.name, key.offset, key.length, err)
return
Expand Down Expand Up @@ -256,7 +256,7 @@ func (server *Server) getHeaderMetadataAttempt(ctx context.Context, name, purgeE
defer func() { tracker.finish(ctx, status) }()
r, _, statusCode, err := server.bucket.NewRangeReaderEtag(ctx, name+".pmtiles", int64(header.MetadataOffset), int64(header.MetadataLength), rootValue.etag)
status = strconv.Itoa(statusCode)
if isRefreshRequredError(err) {
if isRefreshRequiredError(err) {
return false, HeaderV3{}, nil, rootValue.etag, nil
}
if err != nil {
Expand Down Expand Up @@ -393,7 +393,7 @@ func (server *Server) getTileAttempt(ctx context.Context, httpHeaders map[string
defer func() { tracker.finish(ctx, status) }()
r, _, statusCode, err := server.bucket.NewRangeReaderEtag(ctx, name+".pmtiles", int64(header.TileDataOffset+entry.Offset), int64(entry.Length), rootValue.etag)
status = strconv.Itoa(statusCode)
if isRefreshRequredError(err) {
if isRefreshRequiredError(err) {
return 500, httpHeaders, []byte("I/O Error"), rootValue.etag
}
// possible we have the header/directory cached but the archive has disappeared
Expand Down Expand Up @@ -429,7 +429,7 @@ func (server *Server) getTileAttempt(ctx context.Context, httpHeaders map[string
return 204, httpHeaders, nil, ""
}

func isRefreshRequredError(err error) bool {
func isRefreshRequiredError(err error) bool {
_, ok := err.(*RefreshRequiredError)
return ok
}
Expand Down
3 changes: 0 additions & 3 deletions pmtiles/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,6 @@ func fakeArchive(t *testing.T, header HeaderV3, metadata map[string]interface{},
archiveBytes = append(archiveBytes, metadataBytes...)
archiveBytes = append(archiveBytes, leavesBytes...)
archiveBytes = append(archiveBytes, tileDataBytes...)
if len(archiveBytes) < 16384 {
archiveBytes = append(archiveBytes, make([]byte, 16384-len(archiveBytes))...)
}
return archiveBytes
}

Expand Down

0 comments on commit 5e5daa7

Please sign in to comment.