Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(Bulk Loader + Live Loader): Supporting Loading files via s3/minio #7359

Merged
merged 11 commits into from
Jan 25, 2021
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,4 @@ dgraph.iml
.DS_Store

vendor
.minio.sys
7 changes: 6 additions & 1 deletion chunker/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ func slurpQuoted(r *bufio.Reader, out *bytes.Buffer) error {
// and decompressed automatically even without the gz extension. The key, if non-nil,
// is used to decrypt the file. The caller is responsible for calling the returned cleanup
// function when done with the reader.
func FileReader(file string, key x.SensitiveByteSlice) (rd *bufio.Reader, cleanup func()) {
func FileReader(file string, key x.SensitiveByteSlice) (*bufio.Reader, func()) {
var f *os.File
var err error
if file == "-" {
Expand All @@ -363,6 +363,11 @@ func FileReader(file string, key x.SensitiveByteSlice) (rd *bufio.Reader, cleanu

x.Check(err)

return StreamReader(file, key, f)
}

// StreamReader returns a bufio given a ReadCloser. The file is passed just to check for .gz files
func StreamReader(file string, key x.SensitiveByteSlice, f io.ReadCloser) (rd *bufio.Reader, cleanup func()) {
cleanup = func() { _ = f.Close() }

if filepath.Ext(file) == ".gz" {
Expand Down
14 changes: 12 additions & 2 deletions compose/compose.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ type options struct {
AlphaEnvFile []string
ZeroEnvFile []string
Minio bool
MinioDataDir string
MinioPort uint16
MinioEnvFile []string

Expand Down Expand Up @@ -414,7 +415,7 @@ func getJaeger() service {
return svc
}

func getMinio() service {
func getMinio(minioDataDir string) service {
svc := service{
Image: "minio/minio:RELEASE.2020-11-13T20-10-18Z",
ContainerName: containerName("minio1"),
Expand All @@ -425,6 +426,13 @@ func getMinio() service {
Command: "minio server /data/minio --address :" +
strconv.FormatUint(uint64(opts.MinioPort), 10),
}
if minioDataDir != "" {
svc.Volumes = append(svc.Volumes, volume{
Type: "bind",
Source: minioDataDir,
Target: "/data/minio",
})
}
return svc
}

Expand Down Expand Up @@ -608,6 +616,8 @@ func main() {
"env_file for zero")
cmd.PersistentFlags().BoolVar(&opts.Minio, "minio", false,
"include minio service")
cmd.PersistentFlags().StringVar(&opts.MinioDataDir, "minio_data_dir", "",
"default minio data directory")
cmd.PersistentFlags().Uint16Var(&opts.MinioPort, "minio_port", 9001,
"minio service port")
cmd.PersistentFlags().StringArrayVar(&opts.MinioEnvFile, "minio_env_file", nil,
Expand Down Expand Up @@ -717,7 +727,7 @@ func main() {
}

if opts.Minio {
services["minio1"] = getMinio()
services["minio1"] = getMinio(opts.MinioDataDir)
}

if opts.Acl {
Expand Down
11 changes: 7 additions & 4 deletions dgraph/cmd/bulk/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (

"github.com/dgraph-io/dgraph/chunker"
"github.com/dgraph-io/dgraph/ee/enc"
"github.com/dgraph-io/dgraph/filestore"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/schema"
"github.com/dgraph-io/dgraph/x"
Expand Down Expand Up @@ -164,7 +165,7 @@ func getWriteTimestamp(zero *grpc.ClientConn) uint64 {
}

func readSchema(opt *options) *schema.ParsedSchema {
f, err := os.Open(opt.SchemaFile)
f, err := filestore.Open(opt.SchemaFile)
x.Check(err)
defer f.Close()

Expand Down Expand Up @@ -199,7 +200,9 @@ func (ld *loader) mapStage() {
}
ld.xids = xidmap.New(ld.zero, db, filepath.Join(ld.opt.TmpDir, bufferDir))

files := x.FindDataFiles(ld.opt.DataFiles, []string{".rdf", ".rdf.gz", ".json", ".json.gz"})
fs := filestore.NewFileStore(ld.opt.DataFiles)

files := fs.FindDataFiles(ld.opt.DataFiles, []string{".rdf", ".rdf.gz", ".json", ".json.gz"})
if len(files) == 0 {
fmt.Printf("No data files found in %s.\n", ld.opt.DataFiles)
os.Exit(1)
Expand Down Expand Up @@ -237,7 +240,7 @@ func (ld *loader) mapStage() {
if !ld.opt.Encrypted {
key = nil
}
r, cleanup := chunker.FileReader(file, key)
r, cleanup := fs.ChunkReader(file, key)
defer cleanup()

chunk := chunker.NewChunker(loadType, 1000)
Expand Down Expand Up @@ -278,7 +281,7 @@ func (ld *loader) processGqlSchema(loadType chunker.InputFormat) {
return
}

f, err := os.Open(ld.opt.GqlSchemaFile)
f, err := filestore.Open(ld.opt.GqlSchemaFile)
x.Check(err)
defer f.Close()

Expand Down
6 changes: 4 additions & 2 deletions dgraph/cmd/bulk/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"strconv"
"strings"

"github.com/dgraph-io/dgraph/filestore"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/worker"

Expand Down Expand Up @@ -209,7 +210,8 @@ func run() {
if opt.SchemaFile == "" {
fmt.Fprint(os.Stderr, "Schema file must be specified.\n")
os.Exit(1)
} else if _, err := os.Stat(opt.SchemaFile); err != nil && os.IsNotExist(err) {
}
if !filestore.Exists(opt.SchemaFile) {
fmt.Fprintf(os.Stderr, "Schema path(%v) does not exist.\n", opt.SchemaFile)
os.Exit(1)
}
Expand All @@ -219,7 +221,7 @@ func run() {
} else {
fileList := strings.Split(opt.DataFiles, ",")
for _, file := range fileList {
if _, err := os.Stat(file); err != nil && os.IsNotExist(err) {
if !filestore.Exists(file) {
fmt.Fprintf(os.Stderr, "Data path(%v) does not exist.\n", file)
os.Exit(1)
}
Expand Down
13 changes: 8 additions & 5 deletions dgraph/cmd/live/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (

"github.com/dgraph-io/dgraph/chunker"
"github.com/dgraph-io/dgraph/ee/enc"
"github.com/dgraph-io/dgraph/filestore"
"github.com/dgraph-io/dgraph/types"
"github.com/dgraph-io/dgraph/x"
"github.com/dgraph-io/dgraph/xidmap"
Expand Down Expand Up @@ -198,7 +199,7 @@ func processSchemaFile(ctx context.Context, file string, key x.SensitiveByteSlic
ctx = metadata.NewOutgoingContext(ctx, md)
}

f, err := os.Open(file)
f, err := filestore.Open(file)
x.CheckfNoTrace(err)
defer f.Close()

Expand Down Expand Up @@ -403,10 +404,10 @@ func (l *loader) allocateUids(nqs []*api.NQuad) {
}

// processFile forwards a file to the RDF or JSON processor as appropriate
func (l *loader) processFile(ctx context.Context, filename string, key x.SensitiveByteSlice) error {
func (l *loader) processFile(ctx context.Context, fs filestore.FileStore, filename string, key x.SensitiveByteSlice) error {
fmt.Printf("Processing data file %q\n", filename)

rd, cleanup := chunker.FileReader(filename, key)
rd, cleanup := fs.ChunkReader(filename, key)
defer cleanup()

loadType := chunker.DataFormat(filename, opt.dataFormat)
Expand Down Expand Up @@ -658,7 +659,9 @@ func run() error {
return errors.New("RDF or JSON file(s) location must be specified")
}

filesList := x.FindDataFiles(opt.dataFiles, []string{".rdf", ".rdf.gz", ".json", ".json.gz"})
fs := filestore.NewFileStore(opt.dataFiles)

filesList := fs.FindDataFiles(opt.dataFiles, []string{".rdf", ".rdf.gz", ".json", ".json.gz"})
totalFiles := len(filesList)
if totalFiles == 0 {
return errors.Errorf("No data files found in %s", opt.dataFiles)
Expand All @@ -670,7 +673,7 @@ func run() error {
for _, file := range filesList {
file = strings.Trim(file, " \t")
go func(file string) {
errCh <- errors.Wrapf(l.processFile(ctx, file, opt.key), file)
errCh <- errors.Wrapf(l.processFile(ctx, fs, file, opt.key), file)
}(file)
}

Expand Down
45 changes: 45 additions & 0 deletions filestore/filestore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package filestore

import (
"bufio"
"io"
"net/url"

"github.com/dgraph-io/dgraph/x"
)

// FileStore represents a file or directory of files that are either stored
// locally or on minio/s3
type FileStore interface {
// Similar to os.Open
Open(path string) (io.ReadCloser, error)
Exists(path string) bool
FindDataFiles(str string, ext []string) []string
ChunkReader(file string, key x.SensitiveByteSlice) (*bufio.Reader, func())
}

// NewFileStore returns a new file storage. If remote, it's backed by an x.MinioClient
func NewFileStore(path string) FileStore {
url, err := url.Parse(path)
x.Check(err)

if url.Scheme == "minio" || url.Scheme == "s3" {
mc, err := x.NewMinioClient(url, nil)
x.Check(err)

return &remoteFiles{mc}
}

return &localFiles{}
}

// Open takes a single path and returns a io.ReadCloser, similar to os.Open
func Open(path string) (io.ReadCloser, error) {
return NewFileStore(path).Open(path)
}

// Exists returns false if the file doesn't exist. For remote storage, true does
// not guarantee existence
func Exists(path string) bool {
return NewFileStore(path).Exists(path)
}
34 changes: 34 additions & 0 deletions filestore/local_files.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package filestore

import (
"bufio"
"io"
"os"

"github.com/dgraph-io/dgraph/chunker"
"github.com/dgraph-io/dgraph/x"
)

type localFiles struct {
}

func (*localFiles) Open(path string) (io.ReadCloser, error) {
return os.Open(path)
}

func (*localFiles) Exists(path string) bool {
if _, err := os.Stat(path); err != nil && os.IsNotExist(err) {
return false
}
return true
}

func (*localFiles) FindDataFiles(str string, ext []string) []string {
return x.FindDataFiles(str, ext)
}

func (*localFiles) ChunkReader(file string, key x.SensitiveByteSlice) (*bufio.Reader, func()) {
return chunker.FileReader(file, key)
}

var _ FileStore = (*localFiles)(nil)
72 changes: 72 additions & 0 deletions filestore/remote_files.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package filestore

import (
"bufio"
"context"
"io"
"net/url"
"strings"

"github.com/dgraph-io/dgraph/chunker"
"github.com/dgraph-io/dgraph/x"
"github.com/minio/minio-go/v6"
)

type remoteFiles struct {
mc *x.MinioClient
}

func (rf *remoteFiles) Open(path string) (io.ReadCloser, error) {
url, err := url.Parse(path)
x.Check(err)

bucket, prefix := rf.mc.ParseBucketAndPrefix(url.Path)
obj, err := rf.mc.GetObject(bucket, prefix, minio.GetObjectOptions{})
if err != nil {
return nil, err
}
return obj, nil
}

// Checking if a file exists is a no-op in minio, since s3 cannot confirm if a directory exists
func (rf *remoteFiles) Exists(path string) bool {
return true
}

func hasAnySuffix(str string, suffixes []string) bool {
for _, suffix := range suffixes {
if strings.HasSuffix(str, suffix) {
return true
}
}
return false
}

func (rf *remoteFiles) FindDataFiles(str string, ext []string) (paths []string) {
for _, dirPath := range strings.Split(str, ",") {
url, err := url.Parse(dirPath)
x.Check(err)

bucket, prefix := rf.mc.ParseBucketAndPrefix(url.Path)
for obj := range rf.mc.ListObjectsV2(bucket, prefix, true, context.TODO().Done()) {
if hasAnySuffix(obj.Key, ext) {
paths = append(paths, bucket+"/"+obj.Key)
}
}
}
return
}

func (rf *remoteFiles) ChunkReader(file string, key x.SensitiveByteSlice) (*bufio.Reader, func()) {
url, err := url.Parse(file)
x.Check(err)

bucket, prefix := rf.mc.ParseBucketAndPrefix(url.Path)

obj, err := rf.mc.GetObject(bucket, prefix, minio.GetObjectOptions{})
x.Check(err)

return chunker.StreamReader(url.Path, key, obj)
}

var _ FileStore = (*remoteFiles)(nil)
2 changes: 1 addition & 1 deletion graphql/admin/list_backups.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func resolveListBackups(ctx context.Context, q schema.Query) *resolve.Resolved {
return resolve.EmptyResult(q, err)
}

creds := &worker.Credentials{
creds := &x.MinioCredentials{
AccessKey: input.AccessKey,
SecretKey: input.SecretKey,
SessionToken: input.SessionToken,
Expand Down
20 changes: 15 additions & 5 deletions systest/bulk_live/bulk/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,22 +1,32 @@
# Auto-generated with: [../../compose/compose -z=1]
# Auto-generated with: [../../../compose/compose -z=1 -a=0 -o=0 --minio --minio_data_dir=. --expose_ports=false --names=false]
#
version: "3.5"
services:
minio1:
image: minio/minio:RELEASE.2020-11-13T20-10-18Z
ports:
- "9001"
volumes:
- type: bind
source: .
target: /data/minio
read_only: false
command: minio server /data/minio --address :9001
zero1:
image: dgraph/dgraph:latest
working_dir: /data/zero1
labels:
cluster: test
ports:
- 5080
- 6080
- "5080"
- "6080"
volumes:
- type: bind
source: $GOPATH/bin
target: /gobin
read_only: true
command: /gobin/dgraph zero --raft="idx=1" --my=zero1:5080 --logtostderr
-v=2 --bindall
command: /gobin/dgraph zero --raft='idx=1' --my=zero1:5080 --logtostderr -v=2
--bindall
deploy:
resources:
limits:
Expand Down
Loading