Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pgq): Add first version of queue validator #7

Merged
merged 7 commits into from
Jan 26, 2024

Conversation

syordanov94
Copy link
Contributor

Hello all

I met @kedlas at Golab 2023 and, after some talking, I proposed that maybe having some queue validations would be useful to assure that the user does not use "invalid" queue schemas.

In this PR you will find a first version of this queue validator that includes the following:

  • A new optional attribute that enables consumer queue validation
  • The validator.go that is in charge of the queue validation. For now this validation only check if the queue whose name was provided, contains all the mandatory columns the pgq project needs to correctly function.
  • Some tests agains correct and incorrect schemas in the validator_test.go

You will see that I have left some TODOs for future additions like index validations, more precise error msgs when validating, etc.. feel free to implement any of these (maybe if I have more free time I might do some of them as well 😄)

I would appreciate any comment and suggestion you might have. I hope you find my proposal interesting ❤️

Thanks and great job on this project, it's very cool!

@prochac
Copy link
Contributor

prochac commented Dec 14, 2023

Hi @syordanov94, in a first place, thank you for your contribution.

Your idea is compatible with us for sure, because we already do some limited table check here:

pgq/consumer.go

Lines 292 to 345 in 25e98a5

func (c *Consumer) verifyTable(ctx context.Context) error {
rows, err := c.db.QueryContext(ctx, `SELECT column_name
FROM information_schema.columns
WHERE table_catalog = CURRENT_CATALOG
AND table_schema = CURRENT_SCHEMA
AND table_name = $1
ORDER BY ordinal_position;
`, c.queueName)
if err != nil {
return errors.Wrap(err, "querying schema of queue table")
}
defer rows.Close()
columns := make(map[string]struct{})
for rows.Next() {
var s string
if err := rows.Scan(&s); err != nil {
return errors.Wrap(err, "reading schema row of queue table")
}
columns[s] = struct{}{}
}
if err := rows.Err(); err != nil {
return errors.Wrap(err, "reading schema of queue table")
}
if err := rows.Close(); err != nil {
return errors.Wrap(err, "closing schema query of queue table")
}
mandatoryFields := []string{
"id",
"locked_until",
"processed_at",
"consumed_count",
"started_at",
"payload",
"metadata",
}
var missingColumns []string
for _, mandatoryField := range mandatoryFields {
if _, ok := columns[mandatoryField]; !ok {
missingColumns = append(missingColumns, mandatoryField)
}
delete(columns, mandatoryField)
}
if len(missingColumns) > 1 {
return errors.Errorf("some PGQ columns are missing: %v", missingColumns)
}
// TODO log extra columns in queue table or ignore them?
// extraColumns := make([]string, 0, len(columns))
// for k := range columns {
// extraColumns = append(extraColumns, k)
// }
// _ = extraColumns
return nil
}

If you could modify the function and add the type check, please.

And about the x/schema package, it's a wish of @kedlas, but I'm a bit skeptical about it. The reason why is that we plan to introduce Terraform provider that will handle the queues' management for us. Until that, though, we use SQL migration tool. I think, no-one should use the x/schema package in production.
The x package is eXtra eXperimantal and eXpendable 😄 and also compromise between me and my CTO 😄

@syordanov94
Copy link
Contributor Author

Hey @prochac

Oh, I didn't see this function 😅

It basically does the same as the Validate function I wrote in the validator.go so I can just add some more logic and, for example, validate the indexes and so on. I can also test this verifyTable (I see there are no specific tests for it).

Do you want validation to be mandatory? This is because I added an optional parameter in the config to enable validation. But given that it already validates it in prod, I guess it doesn't make sense to add optional validation now 😅

Also regarding the x/schema you mention, I saw it in the Readme and that's why I used it to be honest haha. Given that it's for internal usage maybe we can use it for testing like creating "temporal" queues and destroying them once the test is completed (like I did in the validator_test.go). What do you think?

Thanks for answering. Once I get some time I will try and "re-write" my proposal 👍

@syordanov94
Copy link
Contributor Author

Hey @prochac @kedlas

First of all, merry christmas! I hope you guys are enjoying your holidays.

I have made some updates to your verifyTable function, combining it with the functions I implemented in the validator package.

Feel free to check it out and let me know if you find it useful.

Best Regards
Simeon

@prochac
Copy link
Contributor

prochac commented Dec 27, 2023

@syordanov94 I do have some pending review, but it's hard to find some concentration in these pesky times 😄

Copy link
Contributor

@prochac prochac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Finally, I reserved some time. Sorry it took so long. Holidays + tasks stacked during holidays 😬

Here are some cheap change requests. Now I will dive deeper into the other things.

Please, rebase your code. I tried and there is just a small conflict at consumerConfig struct in comsumer.go

validator/validator.go Outdated Show resolved Hide resolved
validator/validator.go Outdated Show resolved Hide resolved
validator/validator.go Outdated Show resolved Hide resolved
Copy link
Contributor

@prochac prochac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The use of array_to_string has some limits:

DROP TABLE IF EXISTS "stupid_table";
CREATE TABLE "stupid_table"
(
    "foo,bar" INT NOT NULL,
    "baz"     INT NOT NULL

);
CREATE INDEX "stupid_table_foo,bar_baz" ON "stupid_table" ("foo,bar", "baz");

SELECT
    i.relname as index_name,
    array_to_string(array_agg(a.attname), ', ') as column_names
FROM
    pg_class t,
    pg_class i,
    pg_index ix,
    pg_attribute a
WHERE
    t.oid = ix.indrelid
  and i.oid = ix.indexrelid
  and a.attrelid = t.oid
  and a.attnum = ANY(ix.indkey)
  and t.relkind = 'r'
  and t.relname = 'stupid_table'
GROUP BY
    t.relname,
    i.relname
ORDER BY
    t.relname,
    i.relname;

validator/validator.go Outdated Show resolved Hide resolved
validator/validator.go Outdated Show resolved Hide resolved
@syordanov94
Copy link
Contributor Author

Hey @prochac!

Thanks for the detailed review! I have made all the mentioned changes you suggested. Please feel free to re-review!

I ran the tests locally and everything seems ok

@syordanov94 syordanov94 requested a review from prochac January 19, 2024 15:35
@prochac
Copy link
Contributor

prochac commented Jan 25, 2024

The tests are not passing.

Is necessary to have validator in separate package and having it exported?
Imo it's ok to have it unexported in pgq root pkg. It also solves the duplicate func openDB(t *testing.T) *sql.DB test helper

@syordanov94
Copy link
Contributor Author

Hey @prochac

Sorry, I was missing an import in the validator_test.go file. Just ran them and they are passing.

Regarding the package where the validator should be located, yes it can reside in the pgq package. I added it to the validator package just because I like to split these types of features per package (maybe something more "me" 😆) because, if you have multiple validation methods (maybe some manual validation or use some external library or something) it's easier to integrate it. But pgq package works fine, I have added it!

Can you re-review? And re run the tests?

btw, the duplication of the func openDB(t *testing.T) *sql.DB won't be solved by this since this functionality is not in the pgq package but in the integtest package

@prochac prochac merged commit 2607344 into dataddo:main Jan 26, 2024
6 checks passed
@prochac
Copy link
Contributor

prochac commented Jan 26, 2024

@syordanov94
Thank you for your contribution and your patience.
It was a very useful experience for me. The public reviews do have a bit different dynamic than internal.

@syordanov94 syordanov94 deleted the add_queue_validator branch January 26, 2024 12:33
@kedlas
Copy link
Contributor

kedlas commented Jan 26, 2024

Thank you @syordanov94 for contribution. If you use pgq, feel free to come with any new ideas. They're very welcome.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants