Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rows.Close() getting all result rows #2153

Open
vladimirschuka opened this issue Oct 21, 2024 · 20 comments
Open

rows.Close() getting all result rows #2153

vladimirschuka opened this issue Oct 21, 2024 · 20 comments

Comments

@vladimirschuka
Copy link

I have a part of code like this

rows, err := dbconn.Query(ctx, sql)
defer rows.Close()

for rows.Next() {
		msg, err := pgx.RowToAddrOfStructByName[Sometype](rows)
                if err != nil {
			                return fmt.Errorf("error scanning row: %w", err)
		                }
}

but when I get an error, app tries to close rows and it is stuck in a loop where it is getting all result rows, which I don't need anymore. Particularly here:
pgconn.go 1593

func (rr *ResultReader) Close() (CommandTag, error) {
	if rr.closed {
		return rr.commandTag, rr.err
	}
	rr.closed = true
//// HERE ->
	for !rr.commandConcluded {
		_, err := rr.receiveMessage()
		if err != nil {
			return CommandTag{}, rr.err
		}
	}
	

and If your query returns huge amount of rows you just wait until all rows are read.
Do I do something wrong?
Shouldn't I close the rows? Or ..?

I expect that rows.Close() just stops the query in the database and cleans the memory.

Version:
v5.7.1

@codercms
Copy link
Contributor

When you encounter a scanning error inside the rows.Next() loop, the query cannot be cancelled because it has already completed and the server (PostgreSQL) is streaming results to the client (your application).

According to the PostgreSQL wire protocol, all remaining messages must be consumed before a new query can be processed (actually PostgreSQL can accept new query, but to consume new query results you still need to consume all previous messages).

If you really want to stop receiving data, probably your best option is to terminate the connection to the server.

@vladimirschuka
Copy link
Author

When you encounter a scanning error inside the rows.Next() loop, the query cannot be cancelled because it has already completed and the server (PostgreSQL) is streaming results to the client (your application).

According to the PostgreSQL wire protocol, all remaining messages must be consumed before a new query can be processed (actually PostgreSQL can accept new query, but to consume new query results you still need to consume all previous messages).

If you really want to stop receiving data, probably your best option is to terminate the connection to the server.

It is not true.
I expect this behavior :
https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-FLOW-CANCELING-REQUESTS
I don't want to close the connection, I need to request cancelation.
How can I cancel the query then?

In C driver you have PGCancel
if you run query in the psql, ctrl+c is really good in canceling queries and for sure it doesn't close the connection.

@codercms
Copy link
Contributor

As mentioned in the PostgreSQL documentation (https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-FLOW-CANCELING-REQUESTS):

The cancellation signal might or might not have any effect — for example, if it arrives after the backend has finished processing the query, then it will have no effect. If the cancellation is effective, it results in the current command being terminated early with an error message.

Once you're receiving query results, the query is already done, so there’s nothing left to cancel.
If you want to stop receiving query results, the only option I see is to terminate the connection

@codercms
Copy link
Contributor

In C driver you have PGCancel

You can still use same API in pgx: https://pkg.go.dev/github.com/jackc/pgx/v5/pgconn#PgConn.CancelRequest
See also https://pkg.go.dev/github.com/jackc/pgx/v5/pgconn#hdr-Context_Support

However, as mentioned earlier, cancellation only affects queries that are still in progress

@vladimirschuka
Copy link
Author

I understand ,what you are talking about.
Just tell me please one thing.

When I work with API Query -> rows.Next() should I close rows by rows.Close() or not?

@codercms
Copy link
Contributor

It's generally better to explicitly use defer rows.Close() to ensure rows are properly closed in cases of panics or early returns, etc.

But you can let pgx to manage it for you, see CollectRows API - https://pkg.go.dev/github.com/jackc/pgx/v5#CollectRows

pgx/rows.go

Lines 421 to 447 in 2ec9004

// AppendRows iterates through rows, calling fn for each row, and appending the results into a slice of T.
//
// This function closes the rows automatically on return.
func AppendRows[T any, S ~[]T](slice S, rows Rows, fn RowToFunc[T]) (S, error) {
defer rows.Close()
for rows.Next() {
value, err := fn(rows)
if err != nil {
return nil, err
}
slice = append(slice, value)
}
if err := rows.Err(); err != nil {
return nil, err
}
return slice, nil
}
// CollectRows iterates through rows, calling fn for each row, and collecting the results into a slice of T.
//
// This function closes the rows automatically on return.
func CollectRows[T any](rows Rows, fn RowToFunc[T]) ([]T, error) {
return AppendRows([]T{}, rows, fn)
}

Here's how you use this API:

myRows, err := pgx.CollectRows(rows, pgx.RowToAddrOfStructByName[Sometype])

@codercms
Copy link
Contributor

codercms commented Oct 22, 2024

Here's how do I wrap this API usage in my personal app:

type Queryer interface {
	Query(ctx context.Context, sql string, args ...any) (pgx.Rows, error)

	Exec(ctx context.Context, sql string, args ...any) (pgconn.CommandTag, error)
}

func PgxFetchRowsPtrCtx[T any](ctx context.Context, pool Queryer, sql string, values ...any) ([]*T, error) {
	rows, _ := pool.Query(ctx, sql, values...)
	res, err := pgx.CollectRows(rows, pgx.RowToAddrOfStructByNameLax[T])
	if err != nil {
		return nil, err
	}

	return res, nil
}

// Then in your app you write something like that

rows, err := utils.PgxFetchRowsPtrCtx[yourRowType](ctx, r.pgxPool, sql, query bindings goes here)

Such wrapper can make your life even easier

@vladimirschuka
Copy link
Author

Then I have to admit, that if you have a case like mine, API requires additional clarification.

I have a query which returns about 40 000 000 rows (10 GB) which I need to process. I don't want to load it into memory (I don't have much), and my question is not about deserialize data to a structure. Let's imagine I have some kind of processing and in the middle of processing I have whatever error and decide to cancel processing and close the rows, I have to wait until rows.Close() read all the rows for nothing, just several GB of data through network just because we need to be a connection in a particular state, instead of cancel a query and wait until DB really stopped the stream. As we work all in any DB IDE, we run a query, ups we need to cancel it, we do, and then we run another query with the same connection.

The solution for the case with huge queries is:

  1. run a query
  2. process result in a loop row.Next
  3. if error in the processing, I should close the connection instead of closing row (which doesn't mean query will be canceled, it will at the end, when DB realizes that connection is lost, which might be also not immediately)
  4. if this connection is needed anywere else in the app, you just create a new one

Is it right?

In all of those words, I don't like the answer "You can close the connection".

This is how I think it should work, in the function rows.Close(), we have to work as in C, we send a message to the server cancel a query and wait for rr.commandConcluded which also takes time to cancel huge query. Of course I understand that to send a message for cancellation we have to create another connection with the same key and then send a message, it is not a trivial task, but it is a right way to do.

What do you think?

@codercms
Copy link
Contributor

It seems there's some misunderstanding about query processing. Once you start receiving rows, the query has already finished on the PostgreSQL server. You can't cancel it because nothing is running on the PostgreSQL server, at that point - it's simply sending results to your application.
There's nothing to do with it from the pgx side, as I mentioned earlier - you can't cancel query that has already finished.

P.S. Querying 40 000 000 rows at once might not be ideal. PostgreSQL may not fit such a large result set in the connection's RAM (work_mem), potentially using temporary files instead. This topic might be better suited for discussions on platforms like StackOverflow or DBA StackExchange. From my experience, it's better to split your data into smaller chunks. For example, query 10 000 or 100 000 rows at a time, process them, and then move on to the next chunk. Use keyset pagination technique or anything else that will fit your case. Depending on your goals, you can also use temporary tables/cursors.
And probably you want to parallel your data processing, chunking is great for it.

@sean-
Copy link
Contributor

sean- commented Oct 22, 2024

(Nit: that's only true if you're not using a CURSOR[1] . For query processing like this, you may want to consider setting up pagination[2,3] in your app so you can iterate through pages of data. Assuming the connection or rows will be sent back in a single atomic query/transaction due to environmental entropy (that is likely present and unnoticed when performing simple one-row queries, but does show up when performing queries that return 10GB of data), is going to lead to an expectation mismatch that will result in sadness)

[1] https://www.postgresql.org/docs/current/sql-declare.html
[2] https://www.citusdata.com/blog/2016/03/30/five-ways-to-paginate/
[3] https://chrisdone.com/posts/postgresql-pagination/

@vladimirschuka
Copy link
Author

What you are talking about?

the query has already finished on the PostgreSQL server

Are you for real? Who returns rows then?

The response to a SELECT or FETCH query normally consists of CursorResponse, RowDescription, zero or more AsciiRow or BinaryRow messages, and finally CompletedResponse

until we get CompleteResponse query is on the server

And there is no difference between cursor and select internally in the DB.
And what is the problem to get 10 GB from the DB? I don't see , select * from huge_table and get the result , easy as that.

Pagination is not related problem I described.

only meaningful thing in our discussion was :
https://pkg.go.dev/github.com/jackc/pgx/v5/pgconn#PgConn.CancelRequest

to resolve a bug, yes I belive it is a bug:
this is a solution:
pgconn.go 1593

func (rr *ResultReader) Close() (CommandTag, error) {
	if rr.closed {
		return rr.commandTag, rr.err
	}
	rr.closed = true

      // solution
	err := rr.pgConn.CancelRequest(context.Background())
	if err != nil {
		fmt.Println("can't deliver a cancelation")
	}
      //
	for !rr.commandConcluded {
		_, err := rr.receiveMessage()
		if err != nil {
			return CommandTag{}, rr.err
		}
	}

As I said, request is not finished, we send a signal to cancel, wait 'rr.commandConcluded' and driver even returned right message:
rr.err: ERROR: canceling statement due to user request, duration:1983.071 (SQLSTATE 57014)
and closes rows.

By the way connection stays and ready to do another query.

if your query is finished successfully I believe rr.closed will work
And As I understood from documentation, if there is no query in the moment of cancelation signal server just ignores it.

We just have to repeat CancelRequest logic here, I'm not really involved in the development of this driver, that is why I'm not sure such solution is suitable or not.

What do you think?

@codercms
Copy link
Contributor

I see your point about the cancellation request, I did a simple test with PostgreSQL 15:

postgres=# create table some_tbl (id bigint PRIMARY KEY);
CREATE TABLE
postgres=# INSERT INTO some_tbl SELECT s FROM generate_series(1, 40000000) s;
INSERT 0 40000000

Then I wrote a Go app to test cancelling a request during result set retrieval:

Go Code
package main

import (
	"context"
	"github.com/jackc/pgx/v5"
	"log"
)

func main() {
	conn, err := pgx.Connect(context.Background(), "postgresql://postgres:pass@127.0.0.1:5432/postgres?sslmode=disable")
	if err != nil {
		panic(err)
	}

	log.Println("Sending query")

	rows, err := conn.Query(context.Background(), "select * from some_tbl ORDER BY id")
	if err != nil {
		panic(err)
	}

	rows.Next()
	log.Println("First row has been arrived")

	var num int64
	if err := rows.Scan(&num); err != nil {
		log.Fatal("Failed to scan row", err)
	}

	// Now we sending cancel request

	if err := conn.PgConn().CancelRequest(context.Background()); err != nil {
		log.Println("Failed to cancel request", err)
	}

	log.Println("After cancel request has been sent")

	var n int

	// And we still receiving rows after cancellation request was sent
	for rows.Next() {
		var num int64
		if err := rows.Scan(&num); err != nil {
			log.Println("Failed to scan row", err)
			break
		}

		n++
	}

	log.Printf("Rows loop ended, rows received=%d err=%v", n, rows.Err())
}

After sending a cancel request, rows continued to be received, but eventually the cancellation took effect with this output:

2024/10/23 02:18:21 Sending query
2024/10/23 02:18:21 First row has been arrived
2024/10/23 02:18:21 After cancel request has been sent
2024/10/23 02:18:21 Rows loop ended, rows received=24143 err=ERROR: canceling statement due to user request (SQLSTATE 57014)

So, you're right - the query is still considered in progress while streaming results. It’s fine to send a CancelRequest during this stage to stop reading all 40 million rows.

@codercms
Copy link
Contributor

codercms commented Oct 23, 2024

We just have to repeat CancelRequest logic here, I'm not really involved in the development of this driver, that is why I'm not sure such solution is suitable or not.

While your approach of placing CancelRequest inside the rows.Close logic seems reasonable for your use case, it could introduce a breaking change. This could result in exceeding the PostgreSQL connection limit.
For example, if you have a server with a 100 connection limit and 60 concurrent threads (each has its own database connection) all sending CancelRequest (cancel request opens new connection), you could end up with over 60 open connections, quickly hitting the limit of 100 connections, as additional connections cannot be opened.

So I think it's better to keep the CancelRequest in application logic rather than in the driver.

@vladimirschuka
Copy link
Author

Thank you.
The last statement is true.I was really surprised by the fact, that to cancel a query, you need to create another connection. You see, they say it doesn't happen often, that is why they decided not to check cancelation, because it would affect performance.

Maybe we consider adding it to the driver, just ignore the error, I mean, if there are no connections, and we can't send the cancelation signal, doesn't matter, wait for all rows. The cancelation is just one signal and sending connection should be closed. It is quite fast operation.

I think a situation when we cancel queries in the same time less probable, than the problem which causes by waiting until you get all the rows.

Maybe discussion with other Contributors?

@jackc
Copy link
Owner

jackc commented Oct 23, 2024

As mentioned by others above CancelRequest in your application logic is the correct approach if you must use a single connection.

Unfortunately, it is somewhere between very difficult to impossible for the driver to handle query cancellation properly in every case.

Here is a subset of the issues:

How to ensure the proper query is cancelled?

Query execution and the cancellation signal are parallel operations. It is possible for a race condition to occur.

  1. Query 1 is sent
  2. Cancellation signal is sent
  3. Query 1 finishes
  4. Query 2 is sent
  5. Cancellation signal is delivered.

Batch queries and pipeline mode are even more difficult.

How long to wait for cancellation?

We have no way of knowing if or when the cancellation signal will take affect. How long should the blocked Go code wait?

pgx had this quandary for what to do with context cancellation. By default pgx chooses to unblock the Go code as soon as possible at the expense of closing the connection. pgx sends the cancellation signal to avoid wasting server resources, and also immediately closes the underlying net.Conn.

In a previous version of pgx, we tried to automatically recover to avoid closing connections unnecessarily, but the logic was too tricky to handle reliably and there were certain cases where the connection was guaranteed to be broken (interrupted TLS connections) so application code still needed to handle context cancellation killing the connection. In current pgx, you can reconfigure context cancellation logic if you are really ambitious. I doubt that is the right approach for you though.


All that said, unless you specifically need to use a single, stateful connection then what I would recommend is to use a pgxpool and use context cancellation. Everything will just work.

If for some reason you really need to use a single connection then you would need to manually call CancelRequest yourself.

@vladimirschuka
Copy link
Author

@jackc
Could you help me please?
Another finding, I have huge query, running it and getting pgx.Rows. Everything is good, I start loop for rows.Next() {, inside the loop I have rows.Scan and in the process of scanning I'm getting an error and I don't see the error, because in the function func (rows *baseRows) Scan(dest ...any) error { there are parts rows.fatal(err) and I have to wait until rows get closed, which means I have to wait for 10GB of data will be read through network to show me that there is an error.

What I have to do in this situation? Solution for my previous situation was to terminate connection, Ok, understandable, not ordinary situation, huge query, consider that if query is huge then connection belongs to query. But in this situation?

It is really another good question, Why do we close Rows if we can't scan it? but it is not related.

@vladimirschuka
Copy link
Author

OK, My personal problem solved, by server side CURSOR (thx: @sean- ). I also discussed with Java developers, they say, that driver uses select but loads everything in memory before you get iterator (not like pgx) and Resultset (or how they call it) Close() is not the same what we have in pgx driver. But we still fetch everything and throw it away, without memory usage. Java driver uses cursor as fetch size property is set.
That is small example of server side cursor (not without questions but ...):

import (
	"context"
	"fmt"

	"github.com/jackc/pgx/v5"
)

type Cursor struct {
	Name      string
	SQL       string
	FetchSize int
	Tx        pgx.Tx

	count  int
	open   bool
	closed bool
}

const (
	CURSORDECLARATIONSQL = "DECLARE %s CURSOR FOR %s"
	FETCHSQL             = "FETCH %d FROM %s"
	CLOSESQL             = "CLOSE %s"
)

func NewCursor(name, sql string, fetchSize int, tx pgx.Tx) *Cursor {
	return &Cursor{
		Name:      name,
		SQL:       sql,
		FetchSize: fetchSize,
		Tx:        tx,
	}
}

func (c *Cursor) Open() error {
	if c.open {
		return nil
	}
	if c.closed {
		return fmt.Errorf("Cursor already closed")
	}

	_, err := c.Tx.Exec(context.Background(), c.DeclareSQL())
	if err != nil {
		return err
	}

	c.open = true
	return nil
}

func (c *Cursor) Fetch() (pgx.Rows, error) {
	if !c.open {
		return nil, fmt.Errorf("cursor is not open")
	}
	if c.closed {
		return nil, fmt.Errorf("cursor is already closed")
	}

	return c.Tx.Query(context.Background(), c.FetchSQL())
}

func (c *Cursor) Close() error {
	if !c.open {
		return fmt.Errorf("cursor is not open")
	}
	if c.closed {
		return nil
	}

	c.open = false
	c.closed = true
	_, err := c.Tx.Exec(context.Background(), c.CloseSQL())
	return err
}

func (c *Cursor) DeclareSQL() string {
	return fmt.Sprintf(CURSORDECLARATIONSQL, c.Name, c.SQL)
}

func (c *Cursor) FetchSQL() string {
	c.count++
	return fmt.Sprintf(FETCHSQL, c.FetchSize, c.Name)
}

func (c *Cursor) CloseSQL() string {
	return fmt.Sprintf(CLOSESQL, c.Name)
}

func (c *Cursor) Count() int {
	return c.count
}

You FETCH until pgx.Row returns 0 rows
Thank you everyone for the help.
pgx driver is the best anyway :)

Question about, Why we close rows inside Row.Scan(..)? stays. Because only this stopped me to use standard API just with closing the connection.

@jackc
Copy link
Owner

jackc commented Oct 25, 2024

Question about, Why we close rows inside Row.Scan(..)? stays.

IMO it's what the developer would expect in most cases and it's convenient.

A Scan error is almost always a bug in the application, pgx, or a custom data type handler (outside of something crazy like the server sending corrupted data). When Scan is called the entire row is already in memory, so in a correctly written program an error should never occur. Closing the Rows seems a reasonable choice in this case.

It is a convenience because it means that in many cases it is not necessary to check the error from Scan. A Scan error means the next call to Next will return false and the error will be handled by checking Rows.Err()

The only case I can think of where a Scan error might be expected to be recoverable or handleable would be something like strictly unmarshalling JSON where the shape of the data in the database doesn't always fit.

Finally, if you really want to scan without risk of the rows being closed by an error you can use https://pkg.go.dev/github.com/jackc/pgx/v5#ScanRow.

@vladimirschuka
Copy link
Author

In this case I completely disagree, I don't expect Scan closes Rows, for this purpose we have method Rows.Close(), Why do we have such function then? And I think the answer is that it is like "separation of concerns", each function does only one thing. If you have to deal with Rows.Close(), you deal with it, nobody else.
Scan function returns error it means I have to handle this error anyway and what to do next.
For Example:

func ImpossibleToDoWithoutGettingAllRows1() error {
	-----
	rows, err := conn.Query(`SELECT...`)
	if err!= nil {
             return err
    }
	defer rows.Close()

	for rows.Next() {	

			err := rows.Scan(.....)		
			/// I have to wait here, get all the rows even if it is several Gigabyte	
			/// instead It could be immediately done by Cancelling
			/// and yes , I remember what you said before but it is my problem as a developer
			/// to be concerned about race conditions
			if err != nil {
				slog.Error("error scanning row", "err", err)
				_ = conn.PgConn().CancelRequest(context.Background())
				return err
				/// could be return err - which also closes rows implicitly
			}
			//------ Some Processing -------
	}

		if rows.err != nil {
			return err
		}
		return nil
}

or

func ImpossibleToDoWithoutGettingAllRows2() error {
	-----
	rows, err := conn.Query(`SELECT...`)
	if err!= nil {
             return err
    }

// I want to close the connection thereby terminate a query
// As Postgres documentation says
	defer conn.Close()

	for rows.Next() {	

			err := rows.Scan(.....)		
			/// I'm waiting and waiting here .....
			if err != nil {
				slog.Error("error scanning row", "err", err)				
				return err				
			}
			//------ Some Processing -------
	}

		if rows.err != nil {
			return err
		}
		return nil
}

What I really like in pgx driver that pgx.Rows is actually a stream from the server not like ResulSet in Java which loads everything into memory. It gives you really powerful tool because if you don't need a stream anymore, just terminate it (DB after the next cancelation check terminates a query on there side).
When you use Cursor, on the DB side you don't see a query you see FETCH ... FROM ... and it is not really convenient.

At the end I would say that closing rows in the Scan function is wrong because it doesn't bring you any benefits and I mean it, any. Because, you still have in you function a code like this:

 rows, err := conn.Query(`SELECT...`)
	if err!= nil {
             return err
    }
	defer rows.Close()

which does exactly it.

And when you write an application with DB interaction, not always DB belongs to you, one day field is 'NOT NULL' and you scan it to int, other day UPS it is NULL. But you app fairly passed all ... GB or TB of data through network without reason.

Of course I believe it is huge change, because some developers might rely on it by not Closing rows at the end of the function.
But still you just have to write defer rows.Close() .

I understand that if you write an app where a query returns 100 or even 1000 rows you don't care about it, but I work with 40_000_000 rows (it is one day by the way) and about 10 GB of compressed data, maybe that is why I brought this topic. I don't remember exactly but version 4, worked in a different way, didn't it?

@jackc
Copy link
Owner

jackc commented Oct 26, 2024

This behavior goes all the way back to v2 when the Rows interface was introduced. Even if I did agree that is was something that should be changed, it's not something that should be done outside of a major version.

But a good thing about pgx's design is that pgx doesn't have to change for you to get the behavior you want. pgx is designed in a layered fashion where you can usually drop down a layer if you need something different than pgx provides out of the box. As I mentioned above you can use https://pkg.go.dev/github.com/jackc/pgx/v5#ScanRow to do what you want. And if that didn't work you could go down to the pgconn layer, a lower level library similar to the C libpq library. If that didn't work you could go down to the pgproto3 layer and directly read and write PostgreSQL protocol messages. And finally, if even that wasn't sufficient, you can get access to the underlying net.Conn.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants