Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: potentially add xmin replication? #219

Open
qbatten opened this issue Aug 25, 2023 · 4 comments
Open

feat: potentially add xmin replication? #219

qbatten opened this issue Aug 25, 2023 · 4 comments

Comments

@qbatten
Copy link
Contributor

qbatten commented Aug 25, 2023

Airbyte has this as of July 10, see here. Haven't spent time scoping this out but doesnt work on current version of this plugin. I suspect this'd be easy to add, I assume the catalog is excluding hidden/system columns and it could just include them(?)

Specifically, I tried to just set xmin as the replication column, and it looks like it errors saying hey that column isnt in the catalog.

2023-08-25T17:06:34.041196Z [info     ] time=2023-08-25 17:06:31 name=tap_postgres level=INFO message=Stream public-mytable is using incremental replication with replication key xmin cmd_type=elb consumer=False name=pipeline-0 producer=True stdio=stderr string_id=pipeline-0
2023-08-25T17:06:34.041319Z [info     ] time=2023-08-25 17:06:31 name=tap_postgres level=CRITICAL message='NoneType' object has no attribute 'get' cmd_type=elb consumer=False name=pipeline-0 producer=True stdio=stderr string_id=pipeline-0
...
2023-08-25T17:06:34.043415Z [info     ]     replication_key_sql_datatype = md_map.get(('properties', replication_key)).get('sql-datatype') cmd_type=elb consumer=False name=pipeline-0 producer=True stdio=stderr string_id=pipeline-0
2023-08-25T17:06:34.043529Z [info     ] AttributeError: 'NoneType' object has no attribute 'get' cmd_type=elb consumer=False name=pipeline-0 producer=True stdio=stderr string_id=pipeline-0
@qbatten qbatten changed the title Potentially add xmin replication? feat: potentially add xmin replication? Aug 25, 2023
@visch
Copy link
Member

visch commented Aug 25, 2023

Airbyte has this as of July 10, see here. Haven't spent time scoping this out but doesnt work on current version of this plugin. I suspect this'd be easy to add, I assume the catalog is excluding hidden/system columns and it could just include them(?)

Specifically, I tried to just set xmin as the replication column, and it looks like it errors saying hey that column isnt in the catalog.

2023-08-25T17:06:34.041196Z [info     ] time=2023-08-25 17:06:31 name=tap_postgres level=INFO message=Stream public-mytable is using incremental replication with replication key xmin cmd_type=elb consumer=False name=pipeline-0 producer=True stdio=stderr string_id=pipeline-0
2023-08-25T17:06:34.041319Z [info     ] time=2023-08-25 17:06:31 name=tap_postgres level=CRITICAL message='NoneType' object has no attribute 'get' cmd_type=elb consumer=False name=pipeline-0 producer=True stdio=stderr string_id=pipeline-0
...
2023-08-25T17:06:34.043415Z [info     ]     replication_key_sql_datatype = md_map.get(('properties', replication_key)).get('sql-datatype') cmd_type=elb consumer=False name=pipeline-0 producer=True stdio=stderr string_id=pipeline-0
2023-08-25T17:06:34.043529Z [info     ] AttributeError: 'NoneType' object has no attribute 'get' cmd_type=elb consumer=False name=pipeline-0 producer=True stdio=stderr string_id=pipeline-0

Interesting! I had someone else ask about this as well recently, seems like we could definitely do this. Like you said discovery would need to pull these additional columns, we could just add the two explicitly.

@qbatten if you overrode the schema for the table you're trying to pull (annoying but worth a shot) and added xmin I wonder if it'd magically work.

@qbatten
Copy link
Contributor Author

qbatten commented Sep 5, 2023

Hm okay there is a lot of complexity here.

On your suggestion, I added an extra to the extractor for xmin (as in "meltano.yml code block" below). This got us a step further in that it did get meltano to try and use xmin in the select statement. However, postgres complains about it. Specifically it complains that there is no ordering operator for datatype xid. That makes sense bc xmin's data type is xid and ordering on xid's is not straightforward. We could cast it to text and then postgres wont complain but the behavior of order by xmin::text is not gonna be what we want, I believe.

Looks like xmin's updates have fairly complex behavior as well (of course, I guess. link). Other useful links I ran into: some vaguely related discussion, txid docs, oid docs).

All this now has me wondering how airbyte handles this.

meltano.yml code block
  - name: my_pipeline-0
    inherit_from: tap-postgres-my_pipeline
    schema:
      public-my_table:
        xmin:
          type: ["int", "null"]
Error thrown in meltano
2023-09-05T21:25:45.464197Z [info     ] time=2023-09-05 21:25:43 name=tap_postgres level=INFO message=Stream public-my_table is using incremental replication with replication key xmin cmd_type=elb consumer=False name=my_pipeline-0 pro
ducer=True stdio=stderr string_id=my_pipeline-0
2023-09-05T21:25:45.464323Z [info     ] time=2023-09-05 21:25:44 name=tap_postgres level=INFO message=Current Server Encoding: UTF8 cmd_type=elb consumer=False name=my_pipeline-0 producer=True stdio=stderr string_id=my_pipeline-0
2023-09-05T21:25:45.464447Z [info     ] time=2023-09-05 21:25:44 name=tap_postgres level=INFO message=Current Client Encoding: UTF8 cmd_type=elb consumer=False name=my_pipeline-0 producer=True stdio=stderr string_id=my_pipeline-0
2023-09-05T21:25:45.464565Z [info     ] time=2023-09-05 21:25:44 name=tap_postgres level=INFO message=hstore is UNavailable cmd_type=elb consumer=False name=my_pipeline-0 producer=True stdio=stderr string_id=my_pipeline-0
2023-09-05T21:25:45.464685Z [info     ] time=2023-09-05 21:25:44 name=tap_postgres level=INFO message=Beginning new incremental replication sync 1692982654464 cmd_type=elb consumer=False name=my_pipeline-0 producer=True stdio=stderr string_id=my_pipeline-0
2023-09-05T21:25:45.464821Z [info     ] time=2023-09-05 21:25:44 name=tap_postgres level=INFO message=select statement: SELECT  "id" , "my_col" , "user_id" cmd_type=elb consumer=False name=my_pipeline-0 producer=True stdio=stderr string_id=my_pipeline-0
2023-09-05T21:25:45.464975Z [info     ]                                     FROM "public"."my_table" cmd_type=elb consumer=False name=my_pipeline-0 producer=True stdio=stderr string_id=my_pipeline-0
2023-09-05T21:25:45.465106Z [info     ]                                     ORDER BY  "xmin"  ASC with itersize 20000 cmd_type=elb consumer=False name=my_pipeline-0 producer=True stdio=stderr string_id=my_pipeline-0
2023-09-05T21:25:45.465236Z [info     ] time=2023-09-05 21:25:44 name=singer level=INFO message=METRIC: {"type": "counter", "metric": "record_count", "value": 0, "tags": {}} cmd_type=elb consumer=False name=my_pipeline-0 producer=True stdio=stderr string
_id=my_pipeline-0
2023-09-05T21:25:45.465367Z [info     ] time=2023-09-05 21:25:44 name=tap_postgres level=CRITICAL message=could not identify an ordering operator for type xid cmd_type=elb consumer=False name=my_pipeline-0 producer=True stdio=stderr string_id=my_pipeline-0
2023-09-05T21:25:45.465486Z [info     ] LINE 3:                                     ORDER BY  "xmin"  ASC cmd_type=elb consumer=False name=my_pipeline-0 producer=True stdio=stderr string_id=my_pipeline-0
2023-09-05T21:25:45.465604Z [info     ]                                                       ^ cmd_type=elb consumer=False name=my_pipeline-0 producer=True stdio=stderr string_id=my_pipeline-0

...

2023-09-05T21:25:45.468293Z [info     ]     return super().execute(query, vars) cmd_type=elb consumer=False name=my_pipeline-0 producer=True stdio=stderr string_id=my_pipeline-0
2023-09-05T21:25:45.468423Z [info     ] psycopg2.errors.UndefinedFunction: could not identify an ordering operator for type xid cmd_type=elb consumer=False name=my_pipeline-0 producer=True stdio=stderr string_id=my_pipeline-0
2023-09-05T21:25:45.468545Z [info     ] LINE 3:                                     ORDER BY  "xmin"  ASC cmd_type=elb consumer=False name=my_pipeline-0 producer=True stdio=stderr string_id=my_pipeline-0
2023-09-05T21:25:45.468677Z [info     ]                                                       ^ cmd_type=elb consumer=False name=my_pipeline-0 producer=True stdio=stderr string_id=my_pipeline-0
2023-09-05T21:25:45.468806Z [info     ] HINT:  Use an explicit ordering operator or modify the query. cmd_type=elb consumer=False name=my_pipeline-0 producer=True stdio=stderr string_id=my_pipeline-0
2023-09-05T21:25:45.468944Z [info     ]                                cmd_type=elb consumer=False name=my_pipeline-0 producer=True stdio=stderr string_id=my_pipeline-0

@qbatten
Copy link
Contributor Author

qbatten commented Sep 8, 2023

Ah, here's the meat of Airbyte's xmin-handling code

@visch
Copy link
Member

visch commented Sep 8, 2023

Nice work @qbatten , and good find for sure.

So the conversion we could do here https://github.com/MeltanoLabs/tap-postgres/blob/main/tap_postgres/client.py#L231-L232 , just hardcode if the column name is xmin then do the conversion 🤷 and the logic here.

Those writeups you found are good too, and if we really want this feature we should add them in as warnings. It seems like xmin isn't the best incremental key, but I'd guess for some folks it's better than nothing with very large tables

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: No status
Development

No branches or pull requests

2 participants