-
Notifications
You must be signed in to change notification settings - Fork 181
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Data Converter #2487
Data Converter #2487
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic looks good. See my comments for improvement, mainly in error handling and documentation.
DATA_TYPE_FLOAT_ARRAY = 258 | ||
|
||
|
||
class DamEncoder: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please document the encoding format
def add_float_array(self, value: List[float]): | ||
self.entries.append((DATA_TYPE_FLOAT_ARRAY, value)) | ||
|
||
def finish(self) -> bytes: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should the method name be changed to "encode"?
return result | ||
|
||
def read_string(self, length: int) -> str: | ||
result = self.buffer[self.pos : self.pos + length].decode("utf-8") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if exception occurs here?
return result | ||
|
||
def read_int64(self) -> int: | ||
(result,) = struct.unpack_from("q", self.buffer, self.pos) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need exception handling
return result | ||
|
||
def read_float(self) -> float: | ||
(result,) = struct.unpack_from("d", self.buffer, self.pos) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need exception handling
|
||
return result | ||
|
||
def decode_aggregation_context(self, buffer: bytes, fl_ctx: FLContext) -> AggregationContext: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please document the encoding format of aggr ctx
slots = decoder.decode_int_array() | ||
for i in range(num): | ||
bin_assignment = [] | ||
for row_id in range(self.num_samples): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if num_samples is not set? Need to handle this exception.
node_list = decoder.decode_int_array() | ||
sample_groups = {} | ||
for node in node_list: | ||
row_ids = decoder.decode_int_array() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's row_id? Isn't the same as sample_id? If so, let's use the same term.
row_ids = decoder.decode_int_array() | ||
sample_groups[node] = row_ids | ||
|
||
return AggregationContext(self.features, sample_groups) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The whole method should be in try/except block, since any decode method call could fail.
|
||
def encode_aggregation_result( | ||
self, aggr_results: Dict[int, List[FeatureAggregationResult]], fl_ctx: FLContext | ||
) -> bytes: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please document encoding format.
Combined in #2512 |
This is the DataConverter implementation for Processor plugin. It's the Python counterpart of
the C++ plugin.
Types of changes
./runtest.sh
.