Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added ows_cache for WMS GetCaps #508

Merged
merged 1 commit into from
Apr 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions mas/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,28 @@ func handler(response http.ResponseWriter, request *http.Request) {
request.URL.Path,
).Scan(&payload)

} else if _, ok := query["put_ows_cache"]; ok {
err = db.QueryRow(
`select mas_put_ows_cache(
nullif($1,'')::text,
nullif($2,'')::text,
nullif($3,'')::jsonb
) as json`,
request.URL.Path,
request.FormValue("query"),
request.FormValue("value"),
).Scan(&payload)

} else if _, ok := query["get_ows_cache"]; ok {
err = db.QueryRow(
`select mas_get_ows_cache(
nullif($1,'')::text,
nullif($2,'')::text
) as json`,
request.URL.Path,
request.FormValue("query"),
).Scan(&payload)

} else {
httpJSONError(response, errors.New("unknown operation; currently supported: ?intersects, ?timestamps, ?extents"), 400)
return
Expand Down
75 changes: 69 additions & 6 deletions mas/api/mas.sql
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,7 @@ create or replace function mas_timestamps(
returns jsonb language plpgsql as $$
declare
result jsonb;
query_hash text;
query_hash uuid;
shard text;
begin

Expand All @@ -605,16 +605,16 @@ create or replace function mas_timestamps(
end if;

query_hash := md5(concat(gpath, coalesce(time_a::text, 'null'),
coalesce(time_b::text, 'null'), array_to_string(namespace, ',', 'null')));
coalesce(time_b::text, 'null'), array_to_string(namespace, ',', 'null')))::uuid;

if token is not null and token = query_hash then
select jsonb_build_object('timestamps', '[]'::jsonb, 'token', query_hash) into result from timestamps_cache where query_id = query_hash;
if token is not null and token = query_hash::text then
select jsonb_build_object('timestamps', '[]'::jsonb, 'token', query_hash) into result from ows_cache where query_id = query_hash;
if result is not null then
return result;
end if;
end if;

select timestamps || jsonb_build_object('token', query_hash) into result from timestamps_cache where query_id = query_hash;
select value || jsonb_build_object('token', query_hash) into result from ows_cache where query_id = query_hash;
if result is not null then
return result;
end if;
Expand Down Expand Up @@ -657,7 +657,7 @@ create or replace function mas_timestamps(

), '[]'::jsonb), 'token', query_hash);

insert into timestamps_cache (query_id, timestamps) values (query_hash, result)
insert into ows_cache (query_id, value) values (query_hash, result)
on conflict (query_id) do nothing;

perform mas_reset();
Expand All @@ -666,6 +666,69 @@ create or replace function mas_timestamps(
end
$$;

create or replace function mas_put_ows_cache(
gpath text,
query text,
val jsonb
)
returns jsonb language plpgsql as $$
declare
query_hash uuid;
shard text;
begin

if gpath is null then
raise exception 'invalid search path';
end if;

perform mas_reset();
shard := mas_view(gpath);

if shard = '' then
raise exception 'invalid search path';
end if;

query_hash := md5(query)::uuid;
insert into ows_cache (query_id, value) values (query_hash, val)
on conflict (query_id) do update set value = val;

perform mas_reset();
return jsonb_build_object('error', '');
end
$$;

create or replace function mas_get_ows_cache(
gpath text,
query text
)
returns jsonb language plpgsql as $$
declare
query_hash uuid;
shard text;
result jsonb;
begin

if gpath is null then
raise exception 'invalid search path';
end if;

perform mas_reset();
shard := mas_view(gpath);

if shard = '' then
raise exception 'invalid search path';
end if;

query_hash := md5(query)::uuid;

result := jsonb_build_object('value',
(select value from ows_cache where query_id = query_hash));

perform mas_reset();
return result;
end
$$;

-- Find geospatial and temporal extents

create or replace function mas_spatial_temporal_extents(
Expand Down
12 changes: 5 additions & 7 deletions mas/db/shard.sql
Original file line number Diff line number Diff line change
Expand Up @@ -660,19 +660,17 @@ create or replace function refresh_polygons()
end
$$;

drop table if exists timestamps_cache cascade;

-- cache for timestamps
create unlogged table timestamps_cache (
query_id text primary key,
timestamps jsonb not null
drop table if exists ows_cache cascade;
create table ows_cache (
query_id uuid primary key,
value jsonb not null
);

create or replace function refresh_caches()
returns boolean language plpgsql as $$
begin
raise notice 'refresh caches';
truncate timestamps_cache;
truncate ows_cache;
return true;
end
$$;
Expand Down
2 changes: 1 addition & 1 deletion mas/db/shard_create.sh
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ insert into public.shards (sh_code, sh_path)

\\i shard.sql

grant select,insert,update on ${shard}.timestamps_cache to api;
grant select,insert,update on ${shard}.ows_cache to api;

EOD
)
2 changes: 1 addition & 1 deletion mas/db/shard_reset.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ alter default privileges for role mas in schema ${shard}_tmp grant select on tab

\\i shard.sql

grant select,insert,update on ${shard}_tmp.timestamps_cache to api;
grant select,insert,update on ${shard}_tmp.ows_cache to api;

EOD
)
39 changes: 36 additions & 3 deletions ows.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,15 +186,48 @@ func serveWMS(ctx context.Context, params utils.WMSParams, conf *utils.Config, r
return
}

newConf := conf.Copy(r)
utils.LoadConfigTimestamps(newConf, *verbose)
query := fmt.Sprintf("%swms_getcaps", r.URL.Path)
gpath := utils.FindConfigGPath(conf)
owsCache := utils.NewOWSCache(conf.ServiceConfig.MASAddress, gpath, *verbose)
newConf, err := owsCache.GetConfig(query)
cacheMiss := false
if err != nil {
if *verbose {
log.Printf("WMS GetCapabilities get cache error: %v", err)
}
cacheMiss = true
} else if newConf == nil {
cacheMiss = true
} else if len(newConf.Layers) == 0 {
cacheMiss = true
}

err := utils.ExecuteWriteTemplateFile(w, newConf,
if cacheMiss {
newConf = conf.Copy(r)
utils.LoadConfigTimestamps(newConf, *verbose)
}

err = utils.ExecuteWriteTemplateFile(w, newConf,
utils.DataDir+"/templates/WMS_GetCapabilities.tpl")
if err != nil {
metricsCollector.Info.HTTPStatus = 500
http.Error(w, err.Error(), 500)
}

if cacheMiss {
jsonBytes, mErr := json.Marshal(newConf)
if mErr == nil {
err = owsCache.Put(query, string(jsonBytes))
if err != nil {
if *verbose {
log.Printf("WMS GetCapabilities put cache error: %v", err)
}
}
} else {
log.Printf("json.Marshal failed for WMS GetCapabilities")
}
}

case "GetFeatureInfo":
x, y, err := utils.GetCoordinates(params)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions utils/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ type ServiceConfig struct {
TempDir string `json:"temp_dir"`
MaxGrpcBufferSize int `json:"max_grpc_buffer_size"`
EnableAutoLayers bool `json:"enable_auto_layers"`
OWSCacheGPath string `json:"ows_cache_gpath"`
}

type Mask struct {
Expand Down
125 changes: 125 additions & 0 deletions utils/ows_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package utils

import (
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
"net/url"
"sort"
"strings"
)

type OWSCache struct {
MASAddress string
GPath string
verbose bool
}

func NewOWSCache(masAddress, gpath string, verbose bool) *OWSCache {
return &OWSCache{
MASAddress: masAddress,
GPath: gpath,
verbose: verbose,
}
}

func (o *OWSCache) Put(query string, value string) error {
reqURL := fmt.Sprintf("http://%s%s?put_ows_cache&query=%s", o.MASAddress, o.GPath, query)
postBody := url.Values{"value": {value}}
if o.verbose {
log.Printf("querying MAS for OWSCache Put: %v", reqURL)
}
resp, err := http.PostForm(reqURL, postBody)
if err != nil {
return err
}
defer resp.Body.Close()

body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}

type PutStatus struct {
Error string `json:"error"`
}

var status PutStatus
err = json.Unmarshal(body, &status)
if err != nil {
return err
}

if len(status.Error) > 0 {
return fmt.Errorf("%s", status.Error)
}
return nil
}

func (o *OWSCache) Get(query string) ([]byte, error) {
var result []byte
url := fmt.Sprintf("http://%s%s?get_ows_cache&query=%s", o.MASAddress, o.GPath, query)
if o.verbose {
log.Printf("querying MAS for OWSCache Get: %v", url)
}

resp, err := http.Get(url)
if err != nil {
return result, err
}
defer resp.Body.Close()

result, err = ioutil.ReadAll(resp.Body)
if err != nil {
return result, err
}
return result, nil
}

func (o *OWSCache) GetConfig(query string) (*Config, error) {
value, err := o.Get(query)
if err != nil {
return nil, err
}
type cacheResult struct {
Error string `json:"error"`
Config *Config `json:"value"`
}

var result cacheResult
err = json.Unmarshal(value, &result)
if err != nil {
return nil, err
}

if len(result.Error) > 0 {
return nil, fmt.Errorf("%s", result.Error)
}
return result.Config, nil
}

func FindConfigGPath(config *Config) string {
if len(strings.TrimSpace(config.ServiceConfig.OWSCacheGPath)) > 0 {
return config.ServiceConfig.OWSCacheGPath
}

if len(config.Layers) == 0 {
return ""
}
var layerList []*Layer
for iLayer := range config.Layers {
layer := &config.Layers[iLayer]
if hasBlendedService(layer) {
continue
}
if len(strings.TrimSpace(layer.DataSource)) == 0 {
continue
}
layerList = append(layerList, layer)
}

sort.Slice(layerList, func(i, j int) bool { return layerList[i].Name <= layerList[j].Name })
return layerList[0].DataSource
}