-
Notifications
You must be signed in to change notification settings - Fork 50
Description
Hi, I am facing a bit of an issue and I am not sure what I am missing:
CREATE OR REPLACE TABLE api_driver_safety_records (
driverId BIGINT,
crashCount INT,
harshAccelCount INT,
harshBrakingCount INT,
harshTurningCount INT,
harshEvents ARRAY<
ROW<
harshEventType STRING,
timestampMs BIGINT,
vehicleId BIGINT
>
>,
safetyScore DOUBLE,
safetyScoreRank STRING,
timeOverSpeedLimitMs BIGINT,
totalDistanceDrivenMeters DOUBLE,
totalHarshEventCount INT,
totalTimeDrivenMs BIGINT,
-- Metadata fields
processing_time AS PROCTIME(),
PRIMARY KEY (driverId) NOT ENFORCED
) WITH (
'connector' = 'rest-lookup',
'url' = 'https://flinkhttptest.requestcatcher.com/v1/fleet/drivers/{driverId}/safety/score',
'format' = 'json',
'asyncPolling' = 'false',
'lookup.max-retries' = '3',
'lookup.cache' = 'PARTIAL',
'lookup.partial-cache.max-rows' = '200',
'gid.connector.http.security.oidc.token.request' = 'Bearer test',
'gid.connector.http.source.lookup.request.thread-pool.size' = '1'
);
insert into kafka_driver_safety_records
SELECT
d.driverId,
s.crashCount,
s.harshAccelCount,
s.harshBrakingCount,
s.harshTurningCount,
s.harshEvents,
s.safetyScore,
s.safetyScoreRank,
s.timeOverSpeedLimitMs,
s.totalDistanceDrivenMeters,
s.totalHarshEventCount,
s.totalTimeDrivenMs,
CURRENT_TIMESTAMP as processing_time
FROM drivers_view AS d
JOIN api_driver_safety_records FOR SYSTEM_TIME AS OF d.proc_time AS s
ON d.driverId = s.driverId;
But as far as I see the driverId gets appended rather then replaced:
Caused by: java.net.URISyntaxException: Illegal character in path at index 60: https://flinkhttptest.requestcatcher.com/v1/fleet/drivers/{driverId}/safety/score?driverId=4356456
at java.net.URI$Parser.fail(URI.java:2913) ~[?:?]
at java.net.URI$Parser.checkChars(URI.java:3084) ~[?:?]
at java.net.URI$Parser.parseHierarchical(URI.java:3166) ~[?:?]
at java.net.URI$Parser.parse(URI.java:3114) ~[?:?]
at java.net.URI.(URI.java:600) ~[?:?]
at com.getindata.connectors.http.internal.utils.uri.URIBuilder.(URIBuilder.java:73) ~[blob_p-2b38d7220f14718bc605e0994b819fe148bb3b80-c1d558420f77f39328360a4ae3905558:?]
at com.getindata.connectors.http.internal.table.lookup.GetRequestFactory.constructGetUri(GetRequestFactory.java:65) ~[blob_p-2b38d7220f14718bc605e0994b819fe148bb3b80-c1d558420f77f39328360a4ae3905558:?]
I have tried changing query factories, it didnt help, so I am a bit stuck.