Skip to content

Commit

Permalink
[BACKPORT 2.18][PLAT-9432] (Fix 1) File upload via node agent gRPC en…
Browse files Browse the repository at this point in the history
…dpoint is slow over network with high latency

Summary:
Original diff - https://phorge.dev.yugabyte.com/D26586 (70b7ac9)

This is an ongoing fix. This reduces the memory consumption by removing write to a tmp buffer when it is no longer needed + some cleanups around user.

Mostly golang crypto issue which is getting addressed in 1.21 release - golang/go#57752

{F83779}

Test Plan: Create universe

Reviewers: amalyshev, cwang

Reviewed By: amalyshev

Subscribers: yugaware

Differential Revision: https://phorge.dev.yugabyte.com/D26644
  • Loading branch information
nkhogen committed Jul 6, 2023
1 parent 0c56693 commit 55e1214
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 72 deletions.
37 changes: 10 additions & 27 deletions managed/node-agent/app/server/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"node-agent/model"
"node-agent/util"
"os"
"os/user"
"path/filepath"

"node-agent/cmux"
Expand Down Expand Up @@ -385,23 +384,14 @@ func (server *RPCServer) UploadFile(stream pb.NodeAgent_UploadFileServer) error
filename := fileInfo.GetFilename()
username := req.GetUser()
chmod := req.GetChmod()
userAcc, err := user.Current()
userDetail, err := util.UserInfo(username)
if err != nil {
return status.Error(codes.Internal, err.Error())
}
var uid, gid uint32
var changeOwner = false
if username != "" && userAcc.Username != username {
userAcc, uid, gid, err = util.UserInfo(username)
if err != nil {
return status.Error(codes.Internal, err.Error())
}
util.FileLogger().Debugf(ctx, "Using user: %s, uid: %d, gid: %d",
userAcc.Username, uid, gid)
changeOwner = true
}
util.FileLogger().Debugf(ctx, "Using user: %s, uid: %d, gid: %d",
userDetail.User.Username, userDetail.UserID, userDetail.GroupID)
if !filepath.IsAbs(filename) {
filename = filepath.Join(userAcc.HomeDir, filename)
filename = filepath.Join(userDetail.User.HomeDir, filename)
}
if chmod == 0 {
// Do not care about file perm.
Expand All @@ -423,8 +413,8 @@ func (server *RPCServer) UploadFile(stream pb.NodeAgent_UploadFileServer) error
return status.Error(codes.Internal, err.Error())
}
defer file.Close()
if changeOwner {
err = file.Chown(int(uid), int(gid))
if !userDetail.IsCurrent {
err = file.Chown(int(userDetail.UserID), int(userDetail.GroupID))
if err != nil {
return status.Error(codes.Internal, err.Error())
}
Expand Down Expand Up @@ -468,20 +458,13 @@ func (server *RPCServer) DownloadFile(
res := &pb.DownloadFileResponse{ChunkData: make([]byte, 1024)}
if !filepath.IsAbs(filename) {
username := in.GetUser()
userAcc, err := user.Current()
userDetail, err := util.UserInfo(username)
if err != nil {
return status.Error(codes.Internal, err.Error())
}
var uid, gid uint32
if username != "" && userAcc.Username != username {
userAcc, uid, gid, err = util.UserInfo(username)
if err != nil {
return status.Error(codes.Internal, err.Error())
}
util.FileLogger().Infof(ctx, "Using user: %s, uid: %d, gid: %d",
userAcc.Username, uid, gid)
}
filename = filepath.Join(userAcc.HomeDir, filename)
util.FileLogger().Debugf(ctx, "Using user: %s, uid: %d, gid: %d",
userDetail.User.Username, userDetail.UserID, userDetail.GroupID)
filename = filepath.Join(userDetail.User.HomeDir, filename)
}
file, err := os.Open(filename)
if err != nil {
Expand Down
43 changes: 21 additions & 22 deletions managed/node-agent/app/task/shell_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"node-agent/util"
"os"
"os/exec"
"os/user"
"regexp"
"sort"
"strconv"
Expand Down Expand Up @@ -114,43 +113,37 @@ func (s *ShellTask) redactCommandArgs(args ...string) []string {
return redacted
}

func (s *ShellTask) command(ctx context.Context, name string, arg ...string) (*exec.Cmd, error) {
func (s *ShellTask) command(
ctx context.Context,
userDetail *util.UserDetail,
name string,
arg ...string,
) (*exec.Cmd, error) {
cmd := exec.CommandContext(ctx, name, arg...)
userAcc, err := user.Current()
if err != nil {
return nil, err
}
if s.user != "" && userAcc.Username != s.user {
var uid, gid uint32
userAcc, uid, gid, err = util.UserInfo(s.user)
if err != nil {
return nil, err
}
util.FileLogger().Debugf(ctx, "Using user: %s, uid: %d, gid: %d",
userAcc.Username, uid, gid)
if !userDetail.IsCurrent {
cmd.SysProcAttr = &syscall.SysProcAttr{}
cmd.SysProcAttr.Credential = &syscall.Credential{
Uid: uid,
Gid: gid,
Uid: userDetail.UserID,
Gid: userDetail.GroupID,
}
}
pwd := userAcc.HomeDir
pwd := userDetail.User.HomeDir
if pwd == "" {
pwd = "/tmp"
}
os.Setenv("PWD", pwd)
os.Setenv("HOME", pwd)
for _, userVar := range userVariables {
os.Setenv(userVar, userAcc.Username)
os.Setenv(userVar, userDetail.User.Username)
}
cmd.Dir = pwd
return cmd, nil
}

func (s *ShellTask) userEnv(ctx context.Context) []string {
func (s *ShellTask) userEnv(ctx context.Context, userDetail *util.UserDetail) []string {
env := []string{}
// Interactive shell to source ~/.bashrc.
cmd, err := s.command(ctx, "bash")
cmd, err := s.command(ctx, userDetail, "bash")
env = append(env, os.Environ()...)
// Create a pseudo tty (non stdin) to act like SSH login.
// Otherwise, the child process is stopped because it is a background process.
Expand Down Expand Up @@ -183,8 +176,14 @@ func (s *ShellTask) userEnv(ctx context.Context) []string {

// Command returns a command with the environment set.
func (s *ShellTask) Command(ctx context.Context, name string, arg ...string) (*exec.Cmd, error) {
env := s.userEnv(ctx)
cmd, err := s.command(ctx, name, arg...)
userDetail, err := util.UserInfo(s.user)
if err != nil {
return nil, err
}
util.FileLogger().Debugf(ctx, "Using user: %s, uid: %d, gid: %d",
userDetail.User.Username, userDetail.UserID, userDetail.GroupID)
env := s.userEnv(ctx, userDetail)
cmd, err := s.command(ctx, userDetail, name, arg...)
if err != nil {
util.FileLogger().Warnf(ctx, "Failed to create command %s. Error: %s", name, err.Error())
return nil, err
Expand Down
5 changes: 5 additions & 0 deletions managed/node-agent/cmux/cmux.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ func (m *cMux) serve(c net.Conn, donec <-chan struct{}, wg *sync.WaitGroup) {
matched := matcher(muc)
muc.reset()
if matched {
muc.disableReset()
select {
case lms.listener.connc <- muc:
case <-donec:
Expand Down Expand Up @@ -251,6 +252,10 @@ func (m *muxConn) reset() {
m.buffer.Reset()
}

func (m *muxConn) disableReset() {
m.buffer.DisableReset()
}

// Read implements io.Reader.
func (m *muxConn) Read(p []byte) (int, error) {
return m.buffer.Read(p)
Expand Down
18 changes: 14 additions & 4 deletions managed/node-agent/util/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type Buffer interface {
type ResettableBuffer interface {
io.Reader
Reset()
DisableReset()
}

// simpleBuffer implements Buffer.
Expand All @@ -36,8 +37,9 @@ type simpleBuffer struct {

// resettableBuffer is a resettable bufer which extends io.Reader.
type resettableBuffer struct {
source io.Reader
readBuffer *bytes.Buffer
source io.Reader
readBuffer *bytes.Buffer
disableReset bool
}

// NewBuffer creates an instance of Buffer.
Expand Down Expand Up @@ -93,19 +95,27 @@ func (p *simpleBuffer) StringWithLen() (string, int) {

// NewResettableBuffer returns a new instance of resettable buffer reader.
func NewResettableBuffer(source io.Reader) ResettableBuffer {
return &resettableBuffer{source: source, readBuffer: &bytes.Buffer{}}
return &resettableBuffer{source: source, readBuffer: &bytes.Buffer{}, disableReset: false}
}

// Reset resets the buffer.
func (rb *resettableBuffer) Reset() {
if rb.readBuffer.Len() > 0 {
if !rb.disableReset && rb.readBuffer.Len() > 0 {
readBuffer := &bytes.Buffer{}
rb.readBuffer.WriteTo(readBuffer)
rb.source = io.MultiReader(readBuffer, rb.source)
}
}

// DisableReset disables further reset.
func (rb *resettableBuffer) DisableReset() {
rb.disableReset = true
}

// Read reads bytes from the buffer.
func (rb *resettableBuffer) Read(p []byte) (int, error) {
if rb.disableReset {
return rb.source.Read(p)
}
return io.TeeReader(rb.source, rb.readBuffer).Read(p)
}
28 changes: 20 additions & 8 deletions managed/node-agent/util/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,20 @@ var (
// ContextKey is the key type go context values.
type ContextKey string

// Handler is a generic handler func.
type Handler func(context.Context) (any, error)

// RPCResponseConverter is the converter for response in async executor.
type RPCResponseConverter func(any) (*pb.DescribeTaskResponse, error)

// UserDetail is a placeholder for OS user.
type UserDetail struct {
User *user.User
UserID uint32
GroupID uint32
IsCurrent bool
}

func NewUUID() uuid.UUID {
return uuid.New()
}
Expand Down Expand Up @@ -271,27 +281,29 @@ func IsDigits(str string) bool {
}

// UserInfo returns the user, user ID and group ID for the user name.
func UserInfo(username string) (*user.User, uint32, uint32, error) {
func UserInfo(username string) (*UserDetail, error) {
userAcc, err := user.Current()
if err != nil {
return nil, 0, 0, err
return nil, err
}
if userAcc.Username != username {
var err error
isCurrent := true
if username != "" && userAcc.Username != username {
userAcc, err = user.Lookup(username)
if err != nil {
return nil, 0, 0, err
return nil, err
}
isCurrent = false
}
uid, err := strconv.Atoi(userAcc.Uid)
if err != nil {
return nil, 0, 0, err
return nil, err
}
gid, err := strconv.Atoi(userAcc.Gid)
if err != nil {
return nil, 0, 0, err
return nil, err
}
return userAcc, uint32(uid), uint32(gid), nil
return &UserDetail{
User: userAcc, UserID: uint32(uid), GroupID: uint32(gid), IsCurrent: isCurrent}, nil
}

// CorrelationID returns the correlation ID from the context.
Expand Down
28 changes: 18 additions & 10 deletions managed/node-agent/util/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,28 +101,36 @@ func (l *AppLogger) getEntry(ctx context.Context) *log.Entry {
return entry
}

func (l *AppLogger) Errorf(ctx context.Context, msg string, v ...interface{}) {
l.getEntry(ctx).Errorf(msg, v...)
func (l *AppLogger) Error(ctx context.Context, msg string) {
l.getEntry(ctx).Error(msg)
}

func (l *AppLogger) Infof(ctx context.Context, msg string, v ...interface{}) {
l.getEntry(ctx).Infof(msg, v...)
func (l *AppLogger) Errorf(ctx context.Context, msg string, v ...interface{}) {
l.getEntry(ctx).Errorf(msg, v...)
}

func (l *AppLogger) Error(ctx context.Context, msg string) {
l.getEntry(ctx).Error(msg)
func (l *AppLogger) Info(ctx context.Context, msg string) {
if l.IsInfoEnabled() {
l.getEntry(ctx).Infof(msg)
}
}

func (l *AppLogger) Info(ctx context.Context, msg string) {
l.getEntry(ctx).Infof(msg)
func (l *AppLogger) Infof(ctx context.Context, msg string, v ...interface{}) {
if l.IsInfoEnabled() {
l.getEntry(ctx).Infof(msg, v...)
}
}

func (l *AppLogger) Debug(ctx context.Context, msg string) {
l.getEntry(ctx).Debug(msg)
if l.IsDebugEnabled() {
l.getEntry(ctx).Debug(msg)
}
}

func (l *AppLogger) Debugf(ctx context.Context, msg string, v ...interface{}) {
l.getEntry(ctx).Debugf(msg, v...)
if l.IsDebugEnabled() {
l.getEntry(ctx).Debugf(msg, v...)
}
}

func (l *AppLogger) Warn(ctx context.Context, msg string) {
Expand Down
1 change: 0 additions & 1 deletion managed/node-agent/ybops/node_agent/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import logging
import shlex
import time
import traceback
import uuid

from ansible.module_utils._text import to_native
Expand Down

0 comments on commit 55e1214

Please sign in to comment.