-
Notifications
You must be signed in to change notification settings - Fork 2.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Storage] Internal avro parser. #10764
[Storage] Internal avro parser. #10764
Conversation
value = value.encode('utf-8') | ||
self._meta[key] = value | ||
|
||
def _GetInputFileLength(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if this will work with Quick Query, since QQ streams don't have a defined length.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I managed to get rid of that. Looks like stream-like object starts to return empty when end is reached.
Thanks @kasobol-msft! Adding @rakshith91 for review. |
@@ -0,0 +1,5 @@ | |||
# ------------------------------------------------------------------------- |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please note that the contents of the _shared
directory must be copied across all storage SDKs (queues/fileshare/datalake/etc). This includes the _shared
directory in the test suite. If you feel this should not be in the other SDKs (i.e. that avro will not be a part of queue messages, or datalake etc), then please move this directory up a layer :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@annatisch this code is going to be used in azure-storage-blob
and azure-storage-blob-changefeed
(that one is work in progress) modules. So shared seems to be better fit. If so do we need to copy avro to all storage modules or only two that take dependency on it?
For the context. It's imperative that we hide avro parser and not expose it. So having it packaged separately and adding as dependency isn't an option unfortunately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can keep it in shared - but it will be distributed in every storage SDK (queues, datalake, etc).
At some point we will add a CI step to ensure the _shared
module is kept consistent between all the SDKs.
If you do not wish this to be included in all the storage SDKs, then you could move it up a level to azure.storage.blob._avro
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi Anna! @annatisch
We created a package azure-storage-blob-changefeed package which depends on azure-storage-blob. Previously I planned to add _shared folder to azure-storage-blob-changefeed to use avro(that is what I told kamil), but now we plan to directly import _shared.avro from azure-storage-blob so that we don't need to put _shared folder in blob-changefeed pkg.
In the future we will have changefeed feature in fileshare and datalake, so probably we can also import the _shared folder from azure-storage-file-share and azure-storage-file-datalake so we don't need to add _shared folder in azure-storage-fileshare-changefeed and azure-storage-datalake-changefeed(these changefeed packages haven't been created yet)
So I guess probably it makes more sense to put leave the avro in shared folder?
What do you think ^_^?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So it stays in _shared
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @xiafu-msft! Yes if the plan is to support this in files and datalake as well, then shared is a good place for it. Could you please duplicate it across the other packages?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi Anna! @annatisch
Thanks for your feedback! We still need to tweak the DafaFileReader a bit to fit ChangeFeed feature. After that we will duplicate it! So that will be in another pr.
@annatisch All internal. No intent of exposing that. |
return self.read_data(self.writer_schema, self.reader_schema, decoder) | ||
|
||
def read_data(self, writer_schema, reader_schema, decoder): | ||
# schema matching |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need both writer and reader schema?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In most general case we can have two schemas - schema in the file that was used to write it. Reader schema can be supplied by someone trying to parse avro file to narrow down things they care about. It's kind of equivalent of "casting" in strongly type language.
However, I think we are not going to use it in foreseeable future, so I'm removing that and just leaving writer_schema
.
Please also notice how writer_schema
is defined:
As defined in the Avro specification, we call the schema encoded in the data the "writer's schema".
# read the header: magic, meta, sync | ||
self._read_header() | ||
|
||
# ensure codec is valid |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't really need codec for QQ or ChangeFeed, are we sure it works?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should be validating that so that we don't attempt parsing something we can't support. For example server side can at some point add codec support before SDK does that Or server side adds it and customer is using SDK that didn't catch up. In such case we want clear message about what's going on.
I believe we have that validation in .Net as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One thing I'm debating whether to keep or drop "deflate" support from sync version. It's working (and covered by tests). Though we won't use it in nearest future.
class DataFileReader(object): | ||
"""Read files written by DataFileWriter.""" | ||
|
||
# TODO: allow user to specify expected schema? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we want to delete these?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed TODO's.
avro_name = self.GetName(name=name, namespace=namespace) | ||
return self._names.get(avro_name.fullname, None) | ||
|
||
def GetSchema(self, name, namespace=None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we want to follow the same naming format? to make all method names in the format like get_schema()?
Ctrl+Shift+r will help you replace the name across files, just be careful not to replace the name in other packages lol
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
del prunable['namespace'] | ||
return prunable | ||
|
||
def Register(self, schema): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also let's make this method lower case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
# ------------------------------------------------------------------------------ | ||
|
||
|
||
def _SchemaFromJSONString(json_string, names): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's make all methods below in lower case ^_^ eg. _schema_from_json_string
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
…tten to file ('writer_schema').
sdk/storage/azure-storage-blob/azure/storage/blob/_shared/avro/avro_io.py
Outdated
Show resolved
Hide resolved
sdk/storage/azure-storage-blob/azure/storage/blob/_shared/avro/avro_io.py
Show resolved
Hide resolved
sdk/storage/azure-storage-blob/azure/storage/blob/_shared/avro/avro_io.py
Outdated
Show resolved
Hide resolved
sdk/storage/azure-storage-blob/azure/storage/blob/_shared/avro/avro_io.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me on the Avro side of things. Could you work with Emily and eventually maybe get a QQ file so we can add a test for that, just for completion sake. Doesnt have to be part of this PR
from azure.storage.blob._shared.avro.datafile import DataFileReader | ||
from azure.storage.blob._shared.avro.avro_io import DatumReader | ||
|
||
SCHEMAS_TO_VALIDATE = ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any specific reason to place the tests in the _shared folder? fyi, Just like the _shared folder in the code; we need to follow the same norms in the tests too. _shared folder will be common across packages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rakshith91 It's testing _shared
code so I was trying to match paths. But I see your point, so I'll move it.
However I wonder what would be a good place for such tests when _shared
is actually automagically cloned across packages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would put it an avro folder - looks like you already did - thanks :)
* initial avro parser * try fixing test... * falling in love with python compatibility... * make linter happy. * raise StopIteration when there is no more bytes instead of tracking file length. * async avro parser * fix syntax for Python 3.5 * get rid of 'readers_schema' as we only honor schema that has been written to file ('writer_schema'). * pr feedback * trim unused code. * pr feedback. * simplify skip sync in next. * move avro tests from _shared.
* initial avro parser * try fixing test... * falling in love with python compatibility... * make linter happy. * raise StopIteration when there is no more bytes instead of tracking file length. * async avro parser * fix syntax for Python 3.5 * get rid of 'readers_schema' as we only honor schema that has been written to file ('writer_schema'). * pr feedback * trim unused code. * pr feedback. * simplify skip sync in next. * move avro tests from _shared.
* initial avro parser * try fixing test... * falling in love with python compatibility... * make linter happy. * raise StopIteration when there is no more bytes instead of tracking file length. * async avro parser * fix syntax for Python 3.5 * get rid of 'readers_schema' as we only honor schema that has been written to file ('writer_schema'). * pr feedback * trim unused code. * pr feedback. * simplify skip sync in next. * move avro tests from _shared.
* initial avro parser * try fixing test... * falling in love with python compatibility... * make linter happy. * raise StopIteration when there is no more bytes instead of tracking file length. * async avro parser * fix syntax for Python 3.5 * get rid of 'readers_schema' as we only honor schema that has been written to file ('writer_schema'). * pr feedback * trim unused code. * pr feedback. * simplify skip sync in next. * move avro tests from _shared.
* [Storage]STG73 * [Blob][Swagger]Update Swagger (#10943) * [Blob][Swagger]Regenerate Swagger Code * fix container test failure caused by list_containers include type change * [Storage] Internal avro parser. (#10764) * initial avro parser * try fixing test... * falling in love with python compatibility... * make linter happy. * raise StopIteration when there is no more bytes instead of tracking file length. * async avro parser * fix syntax for Python 3.5 * get rid of 'readers_schema' as we only honor schema that has been written to file ('writer_schema'). * pr feedback * trim unused code. * pr feedback. * simplify skip sync in next. * move avro tests from _shared. * Jumbo blob support (#11176) * wip * initial test coverage. * wip. * wip * single upload. * add async tests. * disable 50k block tests. * datalake append. * async datalake * disable tests that send large payload over network. * pr feedback. * Undelete share (#11394) * Undelete container (#11339) * [Storage][Blob] Added support for Object Replication (#11525) * Blob versioning (#11154) * [Blob][QuickQuery]Add Quick Query Support (#10946) * [Blob][STG73]Blob Tags (#11418) * regenerate code (#11964) * fix the bug which caused only showing fatal error (#11997) * [Storage][STG73]Address API Review Comments (#12111) * [Storage][STG73]Address API Review Comments * [Storage][STG73]dict<policy, rules> -> list(ObjectReplicationPolicy) * fix blob tag_value test * expose ObjectReplicationPolicy and ObjectReplicationRule, fix test * fix test * Changefeed (#10755) * [ChangeFeed]Add ChangeFeed Package * test_avro failure * update dev_requirement.txt * change namespace to azure.storage.blob.changefeed * address comments * optimize memory when reading changefeed events * namespace change * set up package change * fix failed tests * readme and kwargs * Update sdk/storage/azure-storage-blob-changefeed/azure/storage/blob/changefeed/_change_feed_client.py Co-authored-by: Rakshith Bhyravabhotla <sabhyrav@microsoft.com> * address comments * 'azure-storage-blob>=12.3.0' which does not match the frozen requirement 'azure-storage-blob~=1.3' Co-authored-by: Rakshith Bhyravabhotla <sabhyrav@microsoft.com> * [Storage-Blob] Quick Query API (#11991) * Renamed query error * Renamed query reader * Updated config models * Updated format request params * Updated iterator * fix the bug which caused only showing fatal error * Updated Error message * Fixed query helper * Started test conversion * small fix * Fixed tests * Updated error handling + json model * Updated recordings * Removed old recording * Added iter tests * Iter test recordings * Fix test * Remove extra recording * Fix pylint * Some docs cleanup * Renamed iter_records -> iter_stream * Review feedback * Updated tests * Missing commas * Fix syntax * Fix pylint Co-authored-by: xiafu <xiafu@microsoft.com> * tag sas (#12258) * tag sas * disable undelete_container * pylint * skip undelete_container tests * [Blob][Versioning]Disable Versioning Live Test (#12281) * [Blob][QQ]Default output_format to input_format (#12283) * [Storage][Jumbo]Remove super (#12314) * [Storage][JumboBlob]remove empty super() * pypy3 * change sas version to latest * set tags account location to central canada * re-recording queue * changefeed paths generator * mark tests for vid as playback only * fix changefeed * fix pylint make the test account location for tags to central canada * add a delay before calling find_blobs_by_tags * remove tags header * mark a large file test playback only * revert "mark a large file test playback only" skip upload large file test address comment * move tag permission and filter_by_tags permission to kwargs Co-authored-by: Kamil Sobol <61715331+kasobol-msft@users.noreply.github.com> Co-authored-by: Ze Qian Zhang <zezha@microsoft.com> Co-authored-by: Rakshith Bhyravabhotla <sabhyrav@microsoft.com> Co-authored-by: annatisch <antisch@microsoft.com>
No description provided.