Skip to content

Commit

Permalink
Merge pull request #386 from dinal/ms
Browse files Browse the repository at this point in the history
dev -> master
  • Loading branch information
dinal authored Feb 17, 2020
2 parents f469ba1 + 38be2aa commit 30d889d
Show file tree
Hide file tree
Showing 51 changed files with 2,148 additions and 338 deletions.
150 changes: 150 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
# Created by .ignore support plugin (hsz.mobi)
### Go template
# Binaries for programs and plugins
*.exe
*.exe~
*.dll
*.so
*.dylib

# Test binary, built with `go test -c`
*.test

# Output of the go coverage tool, specifically when used with LiteIDE
*.out

# Dependency directories (remove the comment below to include it)
# vendor/

### Python template
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class

# C extensions

# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
pip-wheel-metadata/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST

# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec

# Installer logs
pip-log.txt
pip-delete-this-directory.txt

# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/

# Translations
*.mo
*.pot

# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal

# Flask stuff:
instance/
.webassets-cache

# Scrapy stuff:
.scrapy

# Sphinx documentation
docs/_build/

# PyBuilder
target/

# Jupyter Notebook
.ipynb_checkpoints

# IPython
profile_default/
ipython_config.py

# pyenv
.python-version

# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock

# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/

# Celery stuff
celerybeat-schedule
celerybeat.pid

# SageMath parsed files
*.sage.py

# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/

# Jetbrains project settings
.idea/

# Spyder project settings
.spyderproject
.spyproject

# Rope project settings
.ropeproject

# mkdocs documentation
/site

# mypy
.mypy_cache/
.dmypy.json
dmypy.json

# Pyre type checker
.pyre/
7 changes: 7 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@ build:
--tag $(FRAMES_REPOSITORY)frames:$(FRAMES_TAG) \
.

build-framulate:
docker build \
--build-arg FRAMES_VERSION=$(FRAMES_TAG) \
--file cmd/framulate/Dockerfile \
--tag $(FRAMES_REPOSITORY)framulate:$(FRAMES_TAG) \
.

.PHONY: test
test: test-go test-py

Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -806,3 +806,4 @@ docker run \

[Apache 2](LICENSE)


36 changes: 28 additions & 8 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (

"github.com/nuclio/logger"
"github.com/pkg/errors"
v3io "github.com/v3io/v3io-go/pkg/dataplane"
v3iohttp "github.com/v3io/v3io-go/pkg/dataplane/http"
)

Expand Down Expand Up @@ -127,7 +128,7 @@ func (api *API) Write(request *frames.WriteRequest, in chan frames.Frame) (int,
api.logger.ErrorWith(msg, "error", err)
return -1, -1, errors.Wrap(err, msg)
}

defer appender.Close()
nFrames, nRows := 0, 0
if request.ImmidiateData != nil {
nFrames, nRows = 1, request.ImmidiateData.Len()
Expand Down Expand Up @@ -265,19 +266,38 @@ func (api *API) populateQuery(request *frames.ReadRequest) error {
func (api *API) createBackends(config *frames.Config) error {
api.backends = make(map[string]frames.DataBackend)

for _, cfg := range config.Backends {
factory := backends.GetFactory(cfg.Type)
for _, backendConfig := range config.Backends {
httpClient := v3iohttp.NewClient(nil, 0)
httpClient.MaxConnsPerHost = backendConfig.MaxConnections

api.logger.InfoWith("Creating v3io context for backend",
"backend", backendConfig.Name,
"workers", backendConfig.V3ioGoWorkers,
"requestChanLength", backendConfig.V3ioGoRequestChanLength,
"maxConnsPerHost", backendConfig.MaxConnections)

// create a context for the backend
v3ioContext, err := v3iohttp.NewContext(api.logger, httpClient, &v3io.NewContextInput{
NumWorkers: backendConfig.V3ioGoWorkers,
RequestChanLen: backendConfig.V3ioGoRequestChanLength,
})

if err != nil {
return errors.Wrap(err, "Failed to create v3io context for backend")
}

factory := backends.GetFactory(backendConfig.Type)
if factory == nil {
return fmt.Errorf("unknown backend - %q", cfg.Type)
return fmt.Errorf("unknown backend - %q", backendConfig.Type)
}

httpClient := v3iohttp.NewDefaultClient()
backend, err := factory(api.logger, httpClient, cfg, config)
backend, err := factory(api.logger, v3ioContext, backendConfig, config)
if err != nil {
return errors.Wrapf(err, "%s:%s - can't create backend", cfg.Name, cfg.Type)
return errors.Wrapf(err, "%s:%s - can't create backend", backendConfig.Name, backendConfig.Type)
}

api.backends[cfg.Name] = backend
api.backends[backendConfig.Name] = backend

}

return nil
Expand Down
4 changes: 2 additions & 2 deletions backends/backends.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (

"github.com/nuclio/logger"
"github.com/v3io/frames"
"github.com/valyala/fasthttp"
v3io "github.com/v3io/v3io-go/pkg/dataplane"
)

var (
Expand All @@ -37,7 +37,7 @@ var (
)

// Factory is a backend factory
type Factory func(logger.Logger, *fasthttp.Client, *frames.BackendConfig, *frames.Config) (frames.DataBackend, error)
type Factory func(logger.Logger, v3io.Context, *frames.BackendConfig, *frames.Config) (frames.DataBackend, error)

// Register registers a backend factory for a type
func Register(typ string, factory Factory) error {
Expand Down
4 changes: 2 additions & 2 deletions backends/backends_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@ import (

"github.com/nuclio/logger"
"github.com/v3io/frames"
"github.com/valyala/fasthttp"
v3io "github.com/v3io/v3io-go/pkg/dataplane"
)

// Special error return from testFactory so we can see it's this function
var errorBackendsTest = fmt.Errorf("backends test")

func testFactory(logger.Logger, *fasthttp.Client, *frames.BackendConfig, *frames.Config) (frames.DataBackend, error) {
func testFactory(logger.Logger, v3io.Context, *frames.BackendConfig, *frames.Config) (frames.DataBackend, error) {
return nil, errorBackendsTest
}

Expand Down
19 changes: 17 additions & 2 deletions backends/csv/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (

"github.com/nuclio/logger"
"github.com/pkg/errors"
"github.com/valyala/fasthttp"
v3io "github.com/v3io/v3io-go/pkg/dataplane"

"github.com/v3io/frames"
"github.com/v3io/frames/backends"
Expand All @@ -45,7 +45,7 @@ type Backend struct {
}

// NewBackend returns a new CSV backend
func NewBackend(logger logger.Logger, httpClient *fasthttp.Client, config *frames.BackendConfig, framesConfig *frames.Config) (frames.DataBackend, error) {
func NewBackend(logger logger.Logger, v3ioContext v3io.Context, config *frames.BackendConfig, framesConfig *frames.Config) (frames.DataBackend, error) {
backend := &Backend{
rootDir: config.RootDir,
logger: logger.GetChild("csv"),
Expand Down Expand Up @@ -410,9 +410,15 @@ type csvAppender struct {
writer io.Writer
csvWriter *csv.Writer
headerWritten bool
closed bool
}

func (ca *csvAppender) Add(frame frames.Frame) error {
if ca.closed {
err := errors.New("Adding on a closed csv appender")
ca.logger.Error(err)
return err
}
ca.logger.InfoWith("adding frame", "size", frame.Len())
names := frame.Names()
if !ca.headerWritten {
Expand Down Expand Up @@ -457,6 +463,11 @@ type syncer interface {

// WaitForComplete wait for write completion
func (ca *csvAppender) WaitForComplete(timeout time.Duration) error {
if ca.closed {
err := errors.New("Adding on a closed csv appender")
ca.logger.Error(err)
return err
}
ca.csvWriter.Flush()
if err := ca.csvWriter.Error(); err != nil {
ca.logger.ErrorWith("csv Flush", "error", err)
Expand All @@ -470,6 +481,10 @@ func (ca *csvAppender) WaitForComplete(timeout time.Duration) error {
return nil
}

func (ca *csvAppender) Close() {
ca.closed = true
}

func fileExists(path string) bool {
_, err := os.Stat(path)
return err == nil
Expand Down
Loading

0 comments on commit 30d889d

Please sign in to comment.