Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

10 concurrent requests for OData feed results in database connection leak #482

Closed
matthew-white opened this issue May 6, 2022 · 6 comments · Fixed by #599
Closed

10 concurrent requests for OData feed results in database connection leak #482

matthew-white opened this issue May 6, 2022 · 6 comments · Fixed by #599
Labels

Comments

@matthew-white
Copy link
Member

matthew-white commented May 6, 2022

To reproduce:

  • Run select count(*) from pg_stat_activity where state <> 'idle'. The count should be 1 (corresponding to this query itself).
  • Send 10 concurrent requests for a large OData feed, specifying the $skip query parameter. I used the Survey form on the QA server. (See below for a simple Bash script to send these requests.)
  • 9 responses will be 200s. One will be a 500. Specifically, the one returns the following Problem response:
{"message":"Completely unhandled exception: timeout exceeded when trying to connect","details":{"stack":["ConnectionError: timeout exceeded when trying to connect","at Object.createConnection (/usr/odk/node_modules/slonik/dist/src/factories/createConnection.js:54:23)","at runMicrotasks (<anonymous>)","at runNextTicks (internal/process/task_queues.js:60:5)","at listOnTimeout (internal/timers.js:526:9)","at processTimers (internal/timers.js:500:7)","at async Promise.all (index 0)"]}}
  • Run the count query again. The count is now 2.
  • Each time you repeat the above, the count will increase by 1. Once the count is 11, the database connection pool will be full, and all requests will result in a 500 response.

When I added more logging to the endpoint for the OData feed, I observed that in the request that failed, the first two queries in the following Promise.all() returned resolved Promises. However, the third query was unable to acquire a database connection (see #477) and returned a rejected promise:

return Promise.all([
Forms.getFields(form.def.id),
Submissions.streamForExport(form.id, draft, undefined, options),
((params.table === 'Submissions') && options.hasPaging())
? Submissions.countByFormId(form.id, draft, options) : resolve(null)
])
.then(([ fields, stream, count ]) =>
json(rowStreamToOData(fields, params.table, env.domain, originalUrl, query, stream, count)));

Because the third query returned a rejected promise, the Promise.all() was rejected, which means that the then() callback was never called. That means that a stream is successfully created, but the then() callback never processes it. As a result, I'm thinking the stream is never cleaned up and won't let go of its connection. If that's the case, I think we would need to find a way to end the stream even if the Promise.all() is rejected.


Simple Bash script. Before running it, set $ODK_CENTRAL_TOKEN to a session token.

function request-odata() {
	echo "sending request for \$skip=$1..."
	local url="https://test.getodk.cloud/v1/projects/149/forms/Survey.svc/Submissions?\$skip=$1"
	local response
	response=$(curl -H "Authorization: Bearer $ODK_CENTRAL_TOKEN" --include --silent --show-error "$url") || return $?
	echo "response for \$skip=$1:"
	local head=$(head -1 <<< "$response")
	if grep -q '200 OK' <<< "$head"; then
		echo "$head"
	else
		echo "$response"
		echo
	fi
}

for i in {1..10}; do
	request-odata $i &
done
@matthew-white
Copy link
Member Author

I discussed this issue with @issa-tseng and wanted to add some notes from that conversation:

  • If one promise in a Promise.all() resolves to a stream, but another promise is rejected, the stream will need to be manually cleaned up.
  • To immediately close the stream, call its destroy() method.
    • Only call end() to signal that you're done manually writing rows. In this case, call destroy().
    • Streams can be in flowing mode or stop mode, which affects their behavior. In this case, the stream is probably in flowing mode: some records may have been buffered, but nothing has been read from the stream.
    • The discussion in stream.end() does not return connection to pool - race condition knex/knex#2324 is relevant here (even though it's about Knex, not Slonik).
  • A solution would be to write a utility function that works like Promise.all() except it looks out for results that are streams.
    • If a promise is rejected, then if any other promise has resolved or will resolve to a stream, that stream should be destroyed. (You need to watch for streams both before and after the rejection.)
    • To determine whether something is a stream, check the typeof its pipe property, which should be 'function'.
    • We also need to account for PartialPipes. Probably the best way would be to iterate over the streams property of the PartialPipe, destroying each stream.
  • Implementing something along the lines of Use single database connection for GET request #477 wouldn't cause the stream to automatically be destroyed. A transaction is automatically cleaned up at the end of the response if there is an error, but the way that happens is through a rollback.

Here's the utility function I have so far (might need more work):

const allWithStreams = (values) => {
  const streams = [];
  let anyError = false;
  const handleStream = (stream) => {
    if (anyError)
      stream.destroy();
    else
      streams.push(stream);
  };
  for (const value of values) {
    resolve(value)
      .then((result) => {
        if (result instanceof PartialPipe) {
          for (const stream of result.streams) handleStream(stream);
        } else if (typeof result === 'object' && result != null &&
          typeof result.pipe === 'function') {
          handleStream(result);
        }
      })
      .catch((e) => {
        for (const stream of streams) stream.destroy();
        streams.splice(0, streams.length);
        anyError = true;
      });
  }
  return Promise.all(values);
};

However, while trying out this utility function, I ran into what appears to be a separate Slonik issue. I've filed an issue with Slonik. Once that's resolved, I'll circle back to this utility function.

@lognaturel
Copy link
Member

gajus/slonik#347 and also related to #485

@matthew-white
Copy link
Member Author

I think even with #599, we'll still want to use a utility function like the one above. @alxndrsn, does that sound right? #599 will allow us to call .destroy(), and then I think we'll need to use that ability in the case where a stream is used in a Promise.all() but a different promise in the Promise.all() is rejected. In that case, I think the stream won't be destroyed unless we call .destroy() ourselves.

Promise.all() is used with streams in just a few places. Instead of using a utility function like the one above, we might be able to rework things so that the streaming happens only after the Promise.all(). #564 does that for the .csv.zip endpoints, moving three streams in a Promise.all() to after the Promise.all().

@alxndrsn
Copy link
Contributor

@matthew-white yes, that sounds correct: either a utility function, or stop using Promise.all() with streams.

If the work's already been done in #564, can that work be pulled out into a separate PR?

@matthew-white
Copy link
Member Author

matthew-white commented Sep 11, 2022

Pulling it into a separate PR sounds reasonable to me. We'd also have to do something similar in the other places where Promise.all() is used with one or more streams (maybe just the submissions CSV and OData). I'm not sure whether we'd see a decrease in performance if we did that, but it does seem simpler than a utility function. (And maybe any decrease in performance wouldn't be noticeable?)

matthew-white added a commit that referenced this issue Sep 15, 2022
Related to #482. This prevents the following sequence:

- A stream is created within a Promise.all().
- The stream is to be processed after the Promise.all() is fulfilled.
- However, another promise in the Promise.all() is rejected.
- As a result, the Promise.all() as a whole is rejected.
- As a result, the stream is never processed and isn't destroyed.
matthew-white added a commit that referenced this issue Sep 16, 2022
Closes #482. This commit prevents the following sequence:

- A stream is created within a Promise.all().
- The stream is to be processed after the Promise.all() is fulfilled.
- However, another promise in the Promise.all() is rejected.
- As a result, the Promise.all() as a whole is rejected.
- As a result, the stream is never processed and isn't destroyed.
matthew-white added a commit that referenced this issue Sep 16, 2022
Closes #482. This commit prevents the following sequence:

- A stream is created within a Promise.all().
- The stream is to be processed after the Promise.all() is fulfilled.
- However, another promise in the Promise.all() is rejected.
- As a result, the Promise.all() as a whole is rejected.
- As a result, the stream is never processed and isn't destroyed.
@matthew-white matthew-white linked a pull request Sep 16, 2022 that will close this issue
2 tasks
matthew-white added a commit that referenced this issue Sep 16, 2022
Closes #482. This commit prevents the following sequence:

- A stream is created within a Promise.all().
- The stream is to be processed after the Promise.all() is fulfilled.
- However, another promise in the Promise.all() is rejected.
- As a result, the Promise.all() as a whole is rejected.
- As a result, the stream is never processed and isn't destroyed.

(cherry picked from commit 3ba66bb)
@matthew-white
Copy link
Member Author

Closed by #604.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment