Skip to content

Commit

Permalink
Update proto generated files and fix test.
Browse files Browse the repository at this point in the history
  • Loading branch information
satrana42 committed Aug 26, 2024
1 parent 16c1efb commit 6e58bea
Show file tree
Hide file tree
Showing 22 changed files with 282 additions and 215 deletions.
330 changes: 186 additions & 144 deletions fennel/datasets/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -693,18 +693,19 @@ def pipeline1(cls, a: Dataset, b: Dataset):
)
operator_req = sync_request.operators[2]
o = {
"id": "12a2088d8d7a0d265a7bd3f694fc81aa",
"is_root": True,
"pipeline_name": "pipeline1",
"dataset_name": "ABCDataset",
"join": {
"lhs_operand_id": "A",
"rhs_dsref_operand_id": "B",
"on": {"a1": "b1"},
"how": 0,
},
"ds_version": 1,
}
"id":"3338ee30aac1dc899789da9fc78fa025",
"isRoot":True,
"pipelineName":"pipeline1",
"datasetName":"ABCDataset",
"join":{
"lhsOperandId":"A",
"rhsDsrefOperandId":"B",
"on":{
"a1":"b1"
}
},
"dsVersion":1
}
expected_operator_request = ParseDict(o, ds_proto.Operator())
assert operator_req == expected_operator_request, error_message(
operator_req, expected_operator_request
Expand Down Expand Up @@ -865,18 +866,19 @@ def pipeline1(cls, a: Dataset, b: Dataset):
)
operator_req = sync_request.operators[2]
o = {
"id": "12a2088d8d7a0d265a7bd3f694fc81aa",
"is_root": True,
"pipeline_name": "pipeline1",
"dataset_name": "ABCDatasetDefault",
"join": {
"lhs_operand_id": "A",
"rhs_dsref_operand_id": "B",
"on": {"a1": "b1"},
"how": 0,
},
"ds_version": 1,
}
"id":"3338ee30aac1dc899789da9fc78fa025",
"isRoot":True,
"pipelineName":"pipeline1",
"datasetName":"ABCDatasetDefault",
"join":{
"lhsOperandId":"A",
"rhsDsrefOperandId":"B",
"on":{
"a1":"b1"
}
},
"dsVersion":1
}
expected_operator_request = ParseDict(o, ds_proto.Operator())
assert operator_req == expected_operator_request, error_message(
operator_req, expected_operator_request
Expand Down Expand Up @@ -963,18 +965,19 @@ def pipeline1(cls, a: Dataset, b: Dataset):
)
operator_req = sync_request.operators[2]
o = {
"id": "12a2088d8d7a0d265a7bd3f694fc81aa",
"is_root": True,
"pipeline_name": "pipeline1",
"dataset_name": "ABCDatasetDefault",
"join": {
"lhs_operand_id": "A",
"rhs_dsref_operand_id": "B",
"on": {"a1": "b1"},
"how": 0,
},
"ds_version": 1,
}
"id":"3338ee30aac1dc899789da9fc78fa025",
"isRoot":True,
"pipelineName":"pipeline1",
"datasetName":"ABCDatasetDefault",
"join":{
"lhsOperandId":"A",
"rhsDsrefOperandId":"B",
"on":{
"a1":"b1"
}
},
"dsVersion":1
}
expected_operator_request = ParseDict(o, ds_proto.Operator())
assert operator_req == expected_operator_request, error_message(
operator_req, expected_operator_request
Expand Down Expand Up @@ -1061,19 +1064,20 @@ def pipeline1(cls, a: Dataset, b: Dataset):
)
operator_req = sync_request.operators[2]
o = {
"id": "12a2088d8d7a0d265a7bd3f694fc81aa",
"is_root": True,
"pipeline_name": "pipeline1",
"dataset_name": "ABDatasetLow",
"join": {
"lhs_operand_id": "A",
"rhs_dsref_operand_id": "B",
"on": {"a1": "b1"},
"within_low": "3600s",
"how": 0,
},
"ds_version": 1,
}
"id":"3338ee30aac1dc899789da9fc78fa025",
"isRoot":True,
"pipelineName":"pipeline1",
"datasetName":"ABDatasetLow",
"join":{
"lhsOperandId":"A",
"rhsDsrefOperandId":"B",
"on":{
"a1":"b1"
},
"withinLow":"3600s"
},
"dsVersion":1
}
expected_operator_request = ParseDict(o, ds_proto.Operator())
assert operator_req == expected_operator_request, error_message(
operator_req, expected_operator_request
Expand Down Expand Up @@ -1160,19 +1164,20 @@ def pipeline1(cls, a: Dataset, b: Dataset):
)
operator_req = sync_request.operators[2]
o = {
"id": "12a2088d8d7a0d265a7bd3f694fc81aa",
"is_root": True,
"pipeline_name": "pipeline1",
"dataset_name": "ABDatasetHigh",
"join": {
"lhs_operand_id": "A",
"rhs_dsref_operand_id": "B",
"on": {"a1": "b1"},
"how": 0,
"within_high": "86400s",
},
"ds_version": 1,
}
"id":"3338ee30aac1dc899789da9fc78fa025",
"isRoot":True,
"pipelineName":"pipeline1",
"datasetName":"ABDatasetHigh",
"join":{
"lhsOperandId":"A",
"rhsDsrefOperandId":"B",
"on":{
"a1":"b1"
},
"withinHigh":"86400s"
},
"dsVersion":1
}
expected_operator_request = ParseDict(o, ds_proto.Operator())
assert operator_req == expected_operator_request, error_message(
operator_req, expected_operator_request
Expand Down Expand Up @@ -1259,20 +1264,21 @@ def pipeline1(cls, a: Dataset, b: Dataset):
)
operator_req = sync_request.operators[2]
o = {
"id": "12a2088d8d7a0d265a7bd3f694fc81aa",
"is_root": True,
"pipeline_name": "pipeline1",
"dataset_name": "ABDataset",
"join": {
"lhs_operand_id": "A",
"rhs_dsref_operand_id": "B",
"on": {"a1": "b1"},
"how": 0,
"within_low": "259200s",
"within_high": "31536000s",
},
"ds_version": 1,
}
"id":"3338ee30aac1dc899789da9fc78fa025",
"isRoot":True,
"pipelineName":"pipeline1",
"datasetName":"ABDataset",
"join":{
"lhsOperandId":"A",
"rhsDsrefOperandId":"B",
"on":{
"a1":"b1"
},
"withinLow":"259200s",
"withinHigh":"31536000s"
},
"dsVersion":1
}
expected_operator_request = ParseDict(o, ds_proto.Operator())
assert operator_req == expected_operator_request, error_message(
operator_req, expected_operator_request
Expand Down Expand Up @@ -1476,95 +1482,131 @@ def extract_info(df: pd.DataFrame) -> pd.DataFrame:

operator_req = sync_request.operators[3]
o = {
"id": "4202e94cf2e47bf5bcc94fd57aee8d0f",
"pipelineName": "create_fraud_dataset",
"datasetName": "FraudReportAggregatedDataset",
"join": {
"lhsOperandId": "101097826c6986ddb25ce924985d9217",
"rhsDsrefOperandId": "UserInfoDataset",
"on": {"user_id": "user_id"},
"how": 0,
},
"ds_version": 1,
}
"id":"246863a3fc1191098d24b4034f704851",
"pipelineName":"create_fraud_dataset",
"datasetName":"FraudReportAggregatedDataset",
"join":{
"lhsOperandId":"101097826c6986ddb25ce924985d9217",
"rhsDsrefOperandId":"UserInfoDataset",
"on":{
"user_id":"user_id"
}
},
"dsVersion":1
}
expected_operator_request = ParseDict(o, ds_proto.Operator())
assert operator_req == expected_operator_request, error_message(
operator_req, expected_operator_request
)

operator_req = erase_operator_pycode(sync_request.operators[4])
o = {
"id": "bfa10d216f843625785d24e6b9d890fb",
"pipelineName": "create_fraud_dataset",
"datasetName": "FraudReportAggregatedDataset",
"transform": {
"operandId": "4202e94cf2e47bf5bcc94fd57aee8d0f",
"schema": {
"user_id": {"intType": {}},
"merchant_id": {"intType": {}},
"timestamp": {"timestampType": {}},
"transaction_amount": {"doubleType": {}},
},
"pycode": {},
},
}
"id":"6158406804b946bda0c38a994229e995",
"pipelineName":"create_fraud_dataset",
"datasetName":"FraudReportAggregatedDataset",
"transform":{
"operandId":"246863a3fc1191098d24b4034f704851",
"schema":{
"user_id":{
"intType":{

}
},
"merchant_id":{
"intType":{

}
},
"timestamp":{
"timestampType":{

}
},
"transaction_amount":{
"doubleType":{

}
}
},
"pycode":{

}
}
}
expected_operator_request = ParseDict(o, ds_proto.Operator())
assert operator_req == expected_operator_request, error_message(
operator_req, expected_operator_request
)

operator_req = sync_request.operators[5]
o = {
"id": "acd519ba5789e767383099d0561e07c8",
"pipelineName": "create_fraud_dataset",
"datasetName": "FraudReportAggregatedDataset",
"dedup": {
"operandId": "bfa10d216f843625785d24e6b9d890fb",
"columns": ["user_id", "merchant_id"],
},
"ds_version": 1,
}
"id":"c898276d34d964b833155b0e36a4ba2b",
"pipelineName":"create_fraud_dataset",
"datasetName":"FraudReportAggregatedDataset",
"dedup":{
"operandId":"6158406804b946bda0c38a994229e995",
"columns":[
"user_id",
"merchant_id"
]
},
"dsVersion":1
}
expected_operator_request = ParseDict(o, ds_proto.Operator())
assert operator_req == expected_operator_request, error_message(
operator_req, expected_operator_request
)

operator_req = sync_request.operators[6]
o = {
"id": "0b381d6b2444c390000402aaa4485a26",
"isRoot": True,
"pipelineName": "create_fraud_dataset",
"datasetName": "FraudReportAggregatedDataset",
"aggregate": {
"operandId": "acd519ba5789e767383099d0561e07c8",
"keys": ["merchant_id"],
"specs": [
{
"count": {
"name": "num_merchant_fraudulent_transactions",
"window": {"forever": {}},
}
},
{
"count": {
"name": "num_merchant_fraudulent_transactions_7d",
"window": {"sliding": {"duration": "604800s"}},
}
},
{
"quantile": {
"name": "median_transaction_amount",
"window": {"sliding": {"duration": "604800s"}},
"quantile": 0.5,
"default": 0.0,
"of": "transaction_amount",
"approx": True,
}
},
],
},
"ds_version": 1,
}
"id":"45877eefa2fe6d8dbd5aba2fb07e5cb5",
"isRoot":True,
"pipelineName":"create_fraud_dataset",
"datasetName":"FraudReportAggregatedDataset",
"aggregate":{
"operandId":"c898276d34d964b833155b0e36a4ba2b",
"keys":[
"merchant_id"
],
"specs":[
{
"count":{
"name":"num_merchant_fraudulent_transactions",
"window":{
"forever":{

}
}
}
},
{
"count":{
"name":"num_merchant_fraudulent_transactions_7d",
"window":{
"sliding":{
"duration":"604800s"
}
}
}
},
{
"quantile":{
"of":"transaction_amount",
"name":"median_transaction_amount",
"window":{
"sliding":{
"duration":"604800s"
}
},
"default":0.0,
"quantile":0.5,
"approx":True
}
}
]
},
"dsVersion":1
}
expected_operator_request = ParseDict(o, ds_proto.Operator())
assert operator_req == expected_operator_request, error_message(
operator_req, expected_operator_request
Expand Down
Loading

0 comments on commit 6e58bea

Please sign in to comment.