Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Having a STREAMID 1 CANCEL when trying to read a file from ADLS #3948

Closed
invasionofsmallcubes opened this issue Jan 28, 2019 · 11 comments
Closed
Labels
customer-reported Issues that are reported by GitHub users external to the Azure organization. Data Lake Store

Comments

@invasionofsmallcubes
Copy link

invasionofsmallcubes commented Jan 28, 2019

Hello,

I'm having the following issue: I wrote a go program that is scheduled on k8s to read files from ADLS, one at the time. The file structure follows a typical offset pattern so I just consume them following this offset.

I also can reproduce the problem on my laptop so it's not a k8s issue.

If I run file -I filename on these files I get application/octet-stream; charset=binary

I'm having the issue that, for a specific file, I'm always getting back the following error: stream error: stream ID 1; CANCEL.

Here is the way I'm consuming the file:

func (adslReader *ADSLFileReader) GetReader(offset *HeaderOffset) (io.ReadCloser, error) {

   uri := fmt.Sprintf(adslReader.AdlsPath, adslReader.AccountName, offset.LastDate, offset.LastFile)

   client := adls.NewClient()
   if &client == nil {
      return nil, errors.Errorf("no ADLS reader has been configured")
   }

   authorizer, err := auth.NewClientCredentialsConfig(adslReader.ClientID, adslReader.ClientSecret, adslReader.TenantID).Authorizer()
   if err != nil {
      return nil, err
   }
   client.Authorizer = authorizer

   u, err := url.Parse(uri)
   if err != nil {
      return nil, errors.Wrapf(err, "invalid URI (uri=%s)", uri)
   }

   v4 := uuid.NewV4()
   res, err := client.Open(context.Background(), adslReader.AccountName, u.Path, nil, nil, &v4)
   if err != nil {
      return nil, errors.Errorf("error reading file from ADLS (accountName=%s, path=%s)", adslReader.AccountName, u.Path)
   }

   return *res.Value, nil
}

and then I use the io.ReadCloser to read one line after the other and do something with it

func (p *StreamProcessor) StoreData(reader io.ReadCloser) error {
   log.SetFormatter(&log.JSONFormatter{})
   csvReader := csv.NewReader(reader)
   csvReader.Comma = '\a'

   counter := 0

   for {
      record, err := csvReader.Read()
      if err == io.EOF {
         log.Info("Process finished")
         break
      }
      if err != nil {
         // THIS IS THE TEMPORARY CHECK, IT'S a HTTP2STREAMERROR
         if strings.Index(err.Error(), "CANCEL") > 0 {
            log.WithField("lines-count", counter).Info("Final count of processed lines")
            log.WithField("record", record).
               Error("Shutting down because of Azure")
            return err
         }
         log.
            WithFields(log.Fields{"error": err}).
            Error("Problem reading one line")
         break
      }

      if err == nil {
         err = p.LineProcessor.Process(&record)
         if err != nil {
            log.WithField("lines-count", counter).Error("End with error.")
            return err
         }
         counter++
      }

      if counter%1000 == 0 {
         log.WithField("lines-count", counter).Info("Line processed so far")
      }

   }
   log.WithField("lines-count", counter).Info("Final count of processed lines")
   return nil
}

When I try to consume this specific file I always get a different number of row consumed each time (starting always from the beginning).

first run:

{"level":"info","lines-count":33823,"msg":"Final count of processed lines","time":"2019-01-27T23:46:10+01:00"}
{"err":"stream error: stream ID 1; CANCEL","level":"error","msg":"Shutting down because of Azure","record":["xxx","C","xxx","\\N","0","DL","\\N","2019-01-23 00:00:00.0","2019-01-24 00:00:00.0","Completed","\\N","\\N","1","2019-01-24 0"],"time":"2019-01-27T23:46:10+01:00"}

second run:

{"level":"info","lines-count":34513,"msg":"Final count of processed lines","time":"2019-01-27T23:47:23+01:00"}
{"err":"stream error: stream ID 1; CANCEL","level":"error","msg":"Shutting down because of Azure","record":["yyy","S","yyy","\\N","0","DL","\\N","2019-01-03 00:00:00"],"time":"2019-01-27T23:47:23+01:00"}

third run:

{"level":"info","lines-count":31386,"msg":"Final count of processed lines","time":"2019-01-28T00:05:55+01:00"}
{"err":"stream error: stream ID 1; CANCEL","level":"error","msg":"Shutting down because of Azure","record":["204053753","S","MNBU3133166","\\N","0","DL","\\N","2019-01-23 00:"],"time":"2019-01-28T00:05:55+01:00"}

As you can see the line count is always different and the third time consumed less lines than the second time.

I have no issue at all reading this file with pyspark on to of Azure Databricks.

Looking at the error through debug it appears to be a http2ErrorStream with code code net/http2ErrCodeCancel and StreamID 1.

According to documentation ( https://http2.github.io/http2-spec/ ) it seems it could be the client doing this but I'm not really sure on how to verify it.

From the doc:

CANCEL (0x8):
Used by the endpoint to indicate that the stream is no longer needed.
If the client determines, for any reason, that it does not wish to receive the pushed response from the server or if the server takes too long to begin sending the promised response, the client can send a RST_STREAM frame, using either the CANCEL or REFUSED_STREAM code and referencing the pushed stream's identifier.

it says also that

Stream ID 1 is used for streams that started as http/1.1 and were upgraded to http/2. We are seeing the cancellation of Stream ID 1, which is left “half-closed”

Forgot to add that I'm using

"github.com/Azure/azure-sdk-for-go/services/datalake/store/2016-11-01/filesystem"

from

[[constraint]]
  name = "github.com/Azure/azure-sdk-for-go"
  version = "24.0.0"

Again, reading this file with pyspark on Azure Databricks doesn't give any issue at all. I don't know what specifically I'm doing wrong.

Can you help me?

@jhendrixMSFT
Copy link
Member

Hello @invasionofsmallcubes I'm routing this to some folks on the ADLS team to help investigate.

@invasionofsmallcubes
Copy link
Author

invasionofsmallcubes commented Feb 3, 2019

Thanks Joel.
What I found out was this, but please correct me if I'm wrong. The "problem" is here:

         err = p.LineProcessor.Process(&record)

inside that method I'm doing some data transformation and than sending the record to a grpc server, which of course, takes some network time.

The timeout seems to be quite linear to the file size, a bigger size will have the connection open longer.

So I tried to transform my method to just read the whole file in memory and then process it and I don't have any problem.

I think there is like a watchdog that just cuts the connection after a while to not consume the bandwith or to not keep the connection open too much.

Is this assumption correct? Is there some official documentation about this behavior? I'm just guessing here.

@jhendrixMSFT
Copy link
Member

Still trying to get somebody from the service team to comment on this, the extra data point will help. How much extra time is added when performing the processing?

@invasionofsmallcubes
Copy link
Author

invasionofsmallcubes commented Feb 14, 2019

for a 50MB file it's about 5 minutes, 200MB it's about 1h and 30m etc...

@jhendrixMSFT
Copy link
Member

So just to make sure I understand, you read a chunk of the file from ADLS, process, then read the next chunk and repeat the process etc, and the processing make the total read of the 50MB file take 5 minutes? So as a result the connection gets closed presumably because the read is taking too long?

@jhendrixMSFT
Copy link
Member

OK I discussed this with the service team, their recommendation is to repeatedly call Open() on 4MB chunks of the file, and each call should be able to handle transient network failures. Can you give this a try?

@invasionofsmallcubes
Copy link
Author

Honestly this should be done at go sdk level. I mean, yes, one of my options was to keep track of the offset and just recover but doesn't really seems something that should escape from sdk API.

At the moment I'm fine with my solution.

Can you confirm is some kind of safe guard to avoid keep the connection open?

@jhendrixMSFT
Copy link
Member

Agreed it belongs in the SDK, it would be a hand-written "convenience" layer on top of our swagger-generated code. We've had a similar request, see also #3231.
The service team has confirmed it does not close the connection. Are you using the default transport in your HTTP client? I wonder if it needs to be reconfigured to allow for longer delays (just a guess, I don't know much about the inner workings here). See https://golang.org/src/net/http/transport.go#L42 for more info.

@invasionofsmallcubes
Copy link
Author

@jhendrixMSFT can you be more specific on what I should do given the code I pasted in the first post?

@jhendrixMSFT
Copy link
Member

If you want to keep the code as-is (i.e. don't read in 4MB chunks) can you try setting the idle connection timeout to zero (no timeout) or some reasonable upper bound based on your use case. You'd create a custom HTTP client and assign it to the Sender field on the filesystem.Client object, something like this.

j, err := cookiejar.New(nil)
if err != nil {
	// deal with error...
}
client := filesystem.NewClient()
client.Sender = &http.Client{
	Transport: &http.Transport{
		Proxy: http.ProxyFromEnvironment,
		DialContext: (&net.Dialer{
			Timeout:   30 * time.Second,
			KeepAlive: 30 * time.Second,
			DualStack: true,
		}).DialContext,
		MaxIdleConns:          100,
		IdleConnTimeout:       0, // no limit
		TLSHandshakeTimeout:   10 * time.Second,
		ExpectContinueTimeout: 1 * time.Second,
	},
	Jar: j,
}

@invasionofsmallcubes
Copy link
Author

Ok, I'll try. Thank you.

@RickWinter RickWinter added the customer-reported Issues that are reported by GitHub users external to the Azure organization. label Jul 12, 2021
@github-actions github-actions bot locked and limited conversation to collaborators Apr 11, 2023
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
customer-reported Issues that are reported by GitHub users external to the Azure organization. Data Lake Store
Projects
None yet
Development

No branches or pull requests

3 participants