Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature writer optimizations #1436

Merged
merged 4 commits into from
Feb 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -355,34 +355,31 @@ private SQLQuery buildCopySpaceQuery(Space sourceSpace, Space targetSpace, int t
if (isRemoteCopy(sourceSpace,targetSpace))
contentQuery = buildCopyQueryRemoteSpace(dbReader(), contentQuery );

return new SQLQuery(
/**/ //TODO: rm workaround after clarifying with feature_writer <-> where (idata.jsondata#>>'{properties,@ns:com:here:xyz,deleted}') is null
"""
WITH ins_data as
(
select
write_features(
jsonb_build_array(
jsonb_build_object('updateStrategy','{"onExists":null,"onNotExists":null,"onVersionConflict":null,"onMergeConflict":null}'::jsonb,
'partialUpdates',false,
'featureData', jsonb_build_object( 'type', 'FeatureCollection', 'features', jsonb_agg( iidata.feature ) )))::text
,iidata.author,false,${{versionToBeUsed}}
) as wfresult
from
//TODO: Do not use slow JSONB functions in the following query!
//TODO: rm workaround after clarifying with feature_writer <-> where (idata.jsondata#>>'{properties,@ns:com:here:xyz,deleted}') is null
return new SQLQuery("""
WITH ins_data as
(
select ((row_number() over ())-1)/${{maxblksize}} as rn, idata.jsondata#>>'{properties,@ns:com:here:xyz,author}' as author, idata.jsondata || jsonb_build_object('geometry', (idata.geo)::json) as feature
from
( ${{contentQuery}} ) idata
where (idata.jsondata#>>'{properties,@ns:com:here:xyz,deleted}') is null
) iidata
group by rn, author
)
select sum((wfresult::json->>'count')::bigint)::bigint into dummy_output from ins_data
"""
/**/
)
.withContext( queryContext )
.withQueryFragment("maxblksize",""+ maxBlkSize)
select
write_features(
jsonb_build_array(
jsonb_build_object('updateStrategy','{"onExists":null,"onNotExists":null,"onVersionConflict":null,"onMergeConflict":null}'::jsonb,
'partialUpdates',false,
'featureData', jsonb_build_object( 'type', 'FeatureCollection', 'features', jsonb_agg( iidata.feature ) )))::text,
'Modifications', iidata.author,false,${{versionToBeUsed}}
) as wfresult
from
(
select ((row_number() over ())-1)/${{maxblksize}} as rn, idata.jsondata#>>'{properties,@ns:com:here:xyz,author}' as author, idata.jsondata || jsonb_build_object('geometry', (idata.geo)::json) as feature
from
( ${{contentQuery}} ) idata
where (idata.jsondata#>>'{properties,@ns:com:here:xyz,deleted}') is null
) iidata
group by rn, author
)
select sum((wfresult::json->>'count')::bigint)::bigint into dummy_output from ins_data
""").withContext(queryContext)
.withQueryFragment("maxblksize", "" + maxBlkSize)
.withQueryFragment("versionToBeUsed", "" + getVersion())
.withQueryFragment("contentQuery", contentQuery);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@

import com.fasterxml.jackson.annotation.JsonView;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.here.xyz.XyzSerializable;
import com.here.xyz.events.UpdateStrategy;
import com.here.xyz.jobs.steps.S3DataFile;
import com.here.xyz.jobs.steps.impl.SpaceBasedStep;
Expand Down Expand Up @@ -635,30 +634,29 @@ private Map<String, Object> getQueryContext() throws WebClientException {
}

private SQLQuery buildFeatureWriterQuery(String featureList, long targetVersion) throws WebClientException, JsonProcessingException {
String writeFeatureModifications = """
[{
"updateStrategy": #updateStrategy#,
"featureData": {"type": "FeatureCollection", "features": #input_features#},
"partialUpdates": "#is_partial#"
}]
""".replaceAll("#updateStrategy#" , XyzSerializable.serialize(
new UpdateStrategy(updateStrategy.onExists(), updateStrategy.onNotExists(), updateStrategy.onVersionConflict(),
updateStrategy.onMergeConflict())))
.replaceAll("#input_features#" , featureList)
.replaceAll("#is_partial#" , "false");

return new SQLQuery("""
SELECT (write_features::JSONB->>'count')::INT as count from write_features(
#{featureModifications},
return new SQLQuery("""
SELECT (write_features::JSONB->>'count')::INT AS count FROM write_features(
#{featureList},
'Features',
#{author},
#{returnResult},
#{version}
#{version},
#{onExists},
#{onNotExists},
#{onVersionConflict},
#{onMergeConflict},
#{isPartial}
);""")
.withNamedParameter("featureModifications", writeFeatureModifications)
.withNamedParameter("author", space().getOwner())
.withNamedParameter("returnResult", false)
.withNamedParameter("version", targetVersion)
.withContext(getQueryContext());
.withNamedParameter("featureList", featureList)
.withNamedParameter("author", space().getOwner())
.withNamedParameter("returnResult", false)
.withNamedParameter("version", targetVersion)
.withNamedParameter("onExists", updateStrategy.onExists())
.withNamedParameter("onNotExists", updateStrategy.onNotExists())
.withNamedParameter("onVersionConflict", updateStrategy.onVersionConflict())
.withNamedParameter("onMergeConflict", updateStrategy.onMergeConflict())
.withNamedParameter("isPartial", false)
.withContext(getQueryContext());
}

private SQLQuery buildProgressQuery(String schema, ImportFilesToSpace step) {
Expand Down
98 changes: 48 additions & 50 deletions xyz-jobs/xyz-job-steps/src/main/resources/jobs/transport.sql
Original file line number Diff line number Diff line change
Expand Up @@ -429,9 +429,8 @@ $BODY$
* Import Trigger for non-empty-layers. (Entity Feature)
*/
--TODO: Remove code-duplication of the following trigger functions!!
CREATE OR REPLACE FUNCTION import_from_s3_trigger_for_non_empty_layer()
RETURNS trigger
AS $BODY$
CREATE OR REPLACE FUNCTION import_from_s3_trigger_for_non_empty_layer() RETURNS trigger AS
$BODY$
DECLARE
author TEXT := TG_ARGV[0];
currentVersion BIGINT := TG_ARGV[1];
Expand All @@ -445,63 +444,62 @@ DECLARE
extendedTable TEXT := TG_ARGV[9];
format TEXT := TG_ARGV[10];
entityPerLine TEXT := TG_ARGV[11];
target_table TEXT := TG_ARGV[12];
targetTable TEXT := TG_ARGV[12];
featureCount INT := 0;
updated_rows INT;
updateStrategy JSONB;
input TEXT;
inputType TEXT;
BEGIN
updateStrategy = jsonb_build_object('onExists', CASE WHEN onExists = 'null' THEN null ELSE onExists END,
'onNotExists', CASE WHEN onNotExists = 'null' THEN null ELSE onNotExists END,
'onVersionConflict', CASE WHEN onVersionConflict = 'null' THEN null ELSE onVersionConflict END,
'onMergeConflict', CASE WHEN onMergeConflict = 'null' THEN null ELSE onMergeConflict END
);
--TODO: Remove the following workaround once the caller-side was fixed
onExists = CASE WHEN onExists = 'null' THEN NULL ELSE onExists END;
onNotExists = CASE WHEN onNotExists = 'null' THEN NULL ELSE onNotExists END;
onVersionConflict = CASE WHEN onVersionConflict = 'null' THEN NULL ELSE onVersionConflict END;
onMergeConflict = CASE WHEN onMergeConflict = 'null' THEN NULL ELSE onMergeConflict END;


--TODO: check how to use asyncify instead
PERFORM context(
jsonb_build_object(
'stepId', get_stepid_from_work_table(TG_TABLE_NAME::REGCLASS) ,
'schema', TG_TABLE_SCHEMA,
'table', target_table,
'historyEnabled', historyEnabled,
'context', CASE WHEN context = 'null' THEN null ELSE context END,
'extendedTable', CASE WHEN extendedTable = 'null' THEN null ELSE extendedTable END
)
jsonb_build_object(
'stepId', get_stepid_from_work_table(TG_TABLE_NAME::REGCLASS) ,
'schema', TG_TABLE_SCHEMA,
'table', targetTable,
'historyEnabled', historyEnabled,
'context', CASE WHEN context = 'null' THEN null ELSE context END,
'extendedTable', CASE WHEN extendedTable = 'null' THEN null ELSE extendedTable END
)
);

IF format = 'CSV_JSON_WKB' AND NEW.geo IS NOT NULL THEN
--TODO: Extend feature_writer with possibility to provide geometry
--TODO: Extend feature_writer with possibility to provide geometry (as JSONB manipulations are quite slow)
--TODO: Remove unnecessary xyz_reduce_precision call, because the FeatureWriter will do it anyways
NEW.jsondata := jsonb_set(NEW.jsondata::JSONB, '{geometry}', xyz_reduce_precision(ST_ASGeojson(ST_Force3D(NEW.geo)), false)::JSONB);
SELECT write_features( $fd$[{"updateStrategy":$fd$ || updateStrategy::TEXT || $fd$,
"featureData":{"type":"FeatureCollection","features":[$fd$ || NEW.jsondata || $fd$]},
"partialUpdates": $fd$ || isPartial || $fd$}]$fd$,
author
)::JSONB->'count' INTO featureCount;
input = NEW.jsondata::TEXT;
inputType = 'Feature';
END IF;

IF format = 'GEOJSON' OR format = 'CSV_GEOJSON' THEN
IF entityPerLine = 'Feature' THEN
SELECT write_features( $fd$[{"updateStrategy":$fd$ || updateStrategy::TEXT || $fd$,
"featureData":{"type":"FeatureCollection","features":[$fd$ || NEW.jsondata || $fd$]},
"partialUpdates": $fd$ || isPartial || $fd$}]$fd$,
author
)::JSONB->'count' INTO featureCount;
input = NEW.jsondata::TEXT;
inputType = 'Feature';
ELSE
--TODO: Extend feature_writer with possibility to provide featureCollection
SELECT write_features( $fd$[{"updateStrategy":$fd$ || updateStrategy::TEXT || $fd$,
"featureData":{"type":"FeatureCollection","features": $fd$ || (NEW.jsondata::JSONB->'features')::TEXT || $fd$},
"partialUpdates": $fd$ || isPartial || $fd$}]$fd$,
author
)::JSONB->'count' INTO featureCount;
--TODO: Shouldn't the input be a FeatureCollection here? Seems to be a list of Features
input = (NEW.jsondata::JSONB->'features')::TEXT;
inputType = 'Features';
END IF;
END IF;

SELECT write_features(
input, inputType, author, false, NULL,
onExists, onNotExists, onVersionConflict, onMergeConflict, isPartial
)::JSONB->'count' INTO featureCount;

NEW.jsondata = NULL;
NEW.geo = NULL;
NEW.count = featureCount;

RETURN NEW;
END;
$BODY$
LANGUAGE plpgsql VOLATILE;
LANGUAGE plpgsql VOLATILE;

/**
* Perform single import from S3
Expand Down Expand Up @@ -620,18 +618,18 @@ $BODY$
* Report export progress
*/
CREATE OR REPLACE FUNCTION report_export_progress(
lambda_function_arn TEXT,
lambda_function_arn TEXT,
lambda_region TEXT,
step_payload JSON,
thread_id INT,
bytes_uploaded BIGINT,
rows_uploaded BIGINT,
thread_id INT,
bytes_uploaded BIGINT,
rows_uploaded BIGINT,
files_uploaded INT
)
RETURNS VOID
LANGUAGE 'plpgsql'
AS $BODY$
BEGIN
BEGIN
PERFORM report_progress(
lambda_function_arn,
lambda_region,
Expand Down Expand Up @@ -678,8 +676,8 @@ BEGIN
s3_bucket TEXT := '$wrappedouter$||s3_bucket||$wrappedouter$'::TEXT;
s3_path TEXT := '$wrappedouter$||s3_path||$wrappedouter$'::TEXT;
s3_region TEXT := '$wrappedouter$||s3_region||$wrappedouter$'::TEXT;
step_payload JSON := '$wrappedouter$||(step_payload::TEXT)||$wrappedouter$'::JSON;
lambda_function_arn TEXT := '$wrappedouter$||lambda_function_arn||$wrappedouter$'::TEXT;
step_payload JSON := '$wrappedouter$||(step_payload::TEXT)||$wrappedouter$'::JSON;
lambda_function_arn TEXT := '$wrappedouter$||lambda_function_arn||$wrappedouter$'::TEXT;
lambda_region TEXT := '$wrappedouter$||lambda_region||$wrappedouter$'::TEXT;
BEGIN
SELECT * FROM s3_plugin_config('GEOJSON') INTO config;
Expand All @@ -688,24 +686,24 @@ BEGIN
||' ''%1$s'', '
||' aws_commons.create_s3_uri(%2$L,%3$L,%4$L),'
||' %5$L )',
format('select jsondata || jsonb_build_object(''''geometry'''', ST_AsGeoJSON(geo, 8)::jsonb) from (%1$s) X',
format('select jsondata || jsonb_build_object(''''geometry'''', ST_AsGeoJSON(geo, 8)::jsonb) from (%1$s) X',
REPLACE(content_query, $x$'$x$, $x$''$x$)),
s3_bucket,
s3_path,
s3_region,
REGEXP_REPLACE(config.plugin_options, '[\(\)]', '', 'g')
)INTO export_statistics;

PERFORM report_export_progress(
lambda_function_arn,
lambda_region,
step_payload,
thread_id,
export_statistics.bytes_uploaded,
export_statistics.rows_uploaded,
thread_id,
export_statistics.bytes_uploaded,
export_statistics.rows_uploaded,
export_statistics.files_uploaded::int
);

EXCEPTION
WHEN OTHERS THEN
-- Export has failed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ protected SQLQuery buildQuery(WriteFeaturesEvent event) throws ErrorResponseExce
queryContext.put("context", event.getContext().toString());
}

return new SQLQuery("SELECT write_features(#{modifications}, #{author}, #{responseDataExpected});")
return new SQLQuery("SELECT write_features(#{modifications}, 'Modifications', #{author}, #{responseDataExpected});")
.withLoggingEnabled(false)
.withContext(queryContext)
.withNamedParameter("modifications", XyzSerializable.serialize(event.getModifications()))
Expand Down
71 changes: 22 additions & 49 deletions xyz-util/src/main/resources/sql/feature_writer.sql
Original file line number Diff line number Diff line change
Expand Up @@ -21,43 +21,18 @@ CREATE EXTENSION IF NOT EXISTS plv8;

/**
* @public
* @param {string} input_type The type of the JSON input. Possible values: "FeatureCollection", "Features", "Feature", "Modifications"
* @throws VersionConflictError, MergeConflictError, FeatureExistsError
*/
--TODO: Get rid of this function by updating the import step to use the new write_features() directly
CREATE OR REPLACE FUNCTION write_features_old(input_features TEXT, author TEXT, on_exists TEXT,
on_not_exists TEXT, on_version_conflict TEXT, on_merge_conflict TEXT, is_partial BOOLEAN, version BIGINT = NULL, return_result BOOLEAN = false)
RETURNS TEXT AS
$BODY$
const writeFeatures = plv8.find_function("write_features");

if (input_features == null)
throw new Error("Parameter input_features must not be null.");

let modification = `{
"updateStrategy": {
"onExists": ${JSON.stringify(on_exists)},
"onNotExists": ${JSON.stringify(on_not_exists)},
"onVersionConflict": ${JSON.stringify(on_version_conflict)},
"onMergeConflict": ${JSON.stringify(on_merge_conflict)}
},
"featureData": {"type": "FeatureCollection", "features": ${input_features}},
"partialUpdates": ${is_partial}
}`;

return writeFeatures(`[${modification}]`, author, return_result, version == null ? undefined : version);
$BODY$ LANGUAGE plv8 IMMUTABLE;

/**
* @public
* @throws VersionConflictError, MergeConflictError, FeatureExistsError
*/
CREATE OR REPLACE FUNCTION write_features(feature_modifications TEXT, author TEXT, return_result BOOLEAN = false, version BIGINT = NULL)
CREATE OR REPLACE FUNCTION write_features(json_input TEXT, input_type TEXT, author TEXT, return_result BOOLEAN = false, version BIGINT = NULL,
--The following parameters are not necessary for input_type = "Modifications"
on_exists TEXT = NULL, on_not_exists TEXT = NULL, on_version_conflict TEXT = NULL, on_merge_conflict TEXT = NULL, is_partial BOOLEAN = false)
RETURNS TEXT AS
$BODY$
try {
//Actual executions
if (feature_modifications == null)
throw new Error("Parameter feature_modifications must not be null.");
if (json_input == null)
throw new Error("Parameter json_input must not be null.");

//Import other functions
let _queryContext;
Expand All @@ -76,7 +51,22 @@ $BODY$
${{FeatureWriter.js}}
//Init completed

let result = FeatureWriter.writeFeatureModifications(JSON.parse(feature_modifications), author, version == null ? undefined : version);
let input = JSON.parse(json_input);

if (input_type == "FeatureCollection") {
input = input.features;
input_type = "Features";
}

let result;
if (input_type == "Modifications")
result = FeatureWriter.writeFeatureModifications(input, author, version == null ? undefined : version);
else if (input_type == "Features")
result = FeatureWriter.writeFeatures(input, author, on_exists, on_not_exists, on_version_conflict, on_merge_conflict, is_partial, null, version == null ? undefined : version);
else if (input_type == "Feature")
result = FeatureWriter.writeFeature(input, author, on_exists, on_not_exists, on_version_conflict, on_merge_conflict, is_partial, null, version == null ? undefined : version);
else
throw new Error("Invalid input_type: " + input_type);

return JSON.stringify(return_result ? result : {"count": result.features.length});
}
Expand All @@ -86,21 +76,4 @@ $BODY$
else
throw error;
}
$BODY$ LANGUAGE plv8 VOLATILE;

/**
* @public
* @throws VersionConflictError, MergeConflictError, FeatureExistsError
*/
CREATE OR REPLACE FUNCTION write_feature(input_feature TEXT, author TEXT, on_exists TEXT,
on_not_exists TEXT, on_version_conflict TEXT, on_merge_conflict TEXT, is_partial BOOLEAN, version BIGINT = NULL, return_result BOOLEAN = false)
RETURNS TEXT AS $BODY$

//Import other functions
const writeFeatures = plv8.find_function("write_features_old");

if (input_feature == null)
throw new Error("Parameter input_feature must not be null.");

return writeFeatures(`[${input_feature}]`, author, on_exists, on_not_exists, on_version_conflict, on_merge_conflict, is_partial, version == null ? undefined : version, return_result);
$BODY$ LANGUAGE plv8 VOLATILE;
Loading