Skip to content

Commit

Permalink
added ows_cache for GetCaps (#508)
Browse files Browse the repository at this point in the history
  • Loading branch information
edisonguo authored Apr 30, 2021
1 parent dd9de01 commit 0fe59c7
Show file tree
Hide file tree
Showing 8 changed files with 260 additions and 18 deletions.
22 changes: 22 additions & 0 deletions mas/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,28 @@ func handler(response http.ResponseWriter, request *http.Request) {
request.URL.Path,
).Scan(&payload)

} else if _, ok := query["put_ows_cache"]; ok {
err = db.QueryRow(
`select mas_put_ows_cache(
nullif($1,'')::text,
nullif($2,'')::text,
nullif($3,'')::jsonb
) as json`,
request.URL.Path,
request.FormValue("query"),
request.FormValue("value"),
).Scan(&payload)

} else if _, ok := query["get_ows_cache"]; ok {
err = db.QueryRow(
`select mas_get_ows_cache(
nullif($1,'')::text,
nullif($2,'')::text
) as json`,
request.URL.Path,
request.FormValue("query"),
).Scan(&payload)

} else {
httpJSONError(response, errors.New("unknown operation; currently supported: ?intersects, ?timestamps, ?extents"), 400)
return
Expand Down
75 changes: 69 additions & 6 deletions mas/api/mas.sql
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,7 @@ create or replace function mas_timestamps(
returns jsonb language plpgsql as $$
declare
result jsonb;
query_hash text;
query_hash uuid;
shard text;
begin

Expand All @@ -605,16 +605,16 @@ create or replace function mas_timestamps(
end if;

query_hash := md5(concat(gpath, coalesce(time_a::text, 'null'),
coalesce(time_b::text, 'null'), array_to_string(namespace, ',', 'null')));
coalesce(time_b::text, 'null'), array_to_string(namespace, ',', 'null')))::uuid;

if token is not null and token = query_hash then
select jsonb_build_object('timestamps', '[]'::jsonb, 'token', query_hash) into result from timestamps_cache where query_id = query_hash;
if token is not null and token = query_hash::text then
select jsonb_build_object('timestamps', '[]'::jsonb, 'token', query_hash) into result from ows_cache where query_id = query_hash;
if result is not null then
return result;
end if;
end if;

select timestamps || jsonb_build_object('token', query_hash) into result from timestamps_cache where query_id = query_hash;
select value || jsonb_build_object('token', query_hash) into result from ows_cache where query_id = query_hash;
if result is not null then
return result;
end if;
Expand Down Expand Up @@ -657,7 +657,7 @@ create or replace function mas_timestamps(

), '[]'::jsonb), 'token', query_hash);

insert into timestamps_cache (query_id, timestamps) values (query_hash, result)
insert into ows_cache (query_id, value) values (query_hash, result)
on conflict (query_id) do nothing;

perform mas_reset();
Expand All @@ -666,6 +666,69 @@ create or replace function mas_timestamps(
end
$$;

create or replace function mas_put_ows_cache(
gpath text,
query text,
val jsonb
)
returns jsonb language plpgsql as $$
declare
query_hash uuid;
shard text;
begin

if gpath is null then
raise exception 'invalid search path';
end if;

perform mas_reset();
shard := mas_view(gpath);

if shard = '' then
raise exception 'invalid search path';
end if;

query_hash := md5(query)::uuid;
insert into ows_cache (query_id, value) values (query_hash, val)
on conflict (query_id) do update set value = val;

perform mas_reset();
return jsonb_build_object('error', '');
end
$$;

create or replace function mas_get_ows_cache(
gpath text,
query text
)
returns jsonb language plpgsql as $$
declare
query_hash uuid;
shard text;
result jsonb;
begin

if gpath is null then
raise exception 'invalid search path';
end if;

perform mas_reset();
shard := mas_view(gpath);

if shard = '' then
raise exception 'invalid search path';
end if;

query_hash := md5(query)::uuid;

result := jsonb_build_object('value',
(select value from ows_cache where query_id = query_hash));

perform mas_reset();
return result;
end
$$;

-- Find geospatial and temporal extents

create or replace function mas_spatial_temporal_extents(
Expand Down
12 changes: 5 additions & 7 deletions mas/db/shard.sql
Original file line number Diff line number Diff line change
Expand Up @@ -660,19 +660,17 @@ create or replace function refresh_polygons()
end
$$;

drop table if exists timestamps_cache cascade;

-- cache for timestamps
create unlogged table timestamps_cache (
query_id text primary key,
timestamps jsonb not null
drop table if exists ows_cache cascade;
create table ows_cache (
query_id uuid primary key,
value jsonb not null
);

create or replace function refresh_caches()
returns boolean language plpgsql as $$
begin
raise notice 'refresh caches';
truncate timestamps_cache;
truncate ows_cache;
return true;
end
$$;
Expand Down
2 changes: 1 addition & 1 deletion mas/db/shard_create.sh
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ insert into public.shards (sh_code, sh_path)
\\i shard.sql
grant select,insert,update on ${shard}.timestamps_cache to api;
grant select,insert,update on ${shard}.ows_cache to api;
EOD
)
2 changes: 1 addition & 1 deletion mas/db/shard_reset.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ alter default privileges for role mas in schema ${shard}_tmp grant select on tab
\\i shard.sql
grant select,insert,update on ${shard}_tmp.timestamps_cache to api;
grant select,insert,update on ${shard}_tmp.ows_cache to api;
EOD
)
39 changes: 36 additions & 3 deletions ows.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,15 +186,48 @@ func serveWMS(ctx context.Context, params utils.WMSParams, conf *utils.Config, r
return
}

newConf := conf.Copy(r)
utils.LoadConfigTimestamps(newConf, *verbose)
query := fmt.Sprintf("%swms_getcaps", r.URL.Path)
gpath := utils.FindConfigGPath(conf)
owsCache := utils.NewOWSCache(conf.ServiceConfig.MASAddress, gpath, *verbose)
newConf, err := owsCache.GetConfig(query)
cacheMiss := false
if err != nil {
if *verbose {
log.Printf("WMS GetCapabilities get cache error: %v", err)
}
cacheMiss = true
} else if newConf == nil {
cacheMiss = true
} else if len(newConf.Layers) == 0 {
cacheMiss = true
}

err := utils.ExecuteWriteTemplateFile(w, newConf,
if cacheMiss {
newConf = conf.Copy(r)
utils.LoadConfigTimestamps(newConf, *verbose)
}

err = utils.ExecuteWriteTemplateFile(w, newConf,
utils.DataDir+"/templates/WMS_GetCapabilities.tpl")
if err != nil {
metricsCollector.Info.HTTPStatus = 500
http.Error(w, err.Error(), 500)
}

if cacheMiss {
jsonBytes, mErr := json.Marshal(newConf)
if mErr == nil {
err = owsCache.Put(query, string(jsonBytes))
if err != nil {
if *verbose {
log.Printf("WMS GetCapabilities put cache error: %v", err)
}
}
} else {
log.Printf("json.Marshal failed for WMS GetCapabilities")
}
}

case "GetFeatureInfo":
x, y, err := utils.GetCoordinates(params)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions utils/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ type ServiceConfig struct {
TempDir string `json:"temp_dir"`
MaxGrpcBufferSize int `json:"max_grpc_buffer_size"`
EnableAutoLayers bool `json:"enable_auto_layers"`
OWSCacheGPath string `json:"ows_cache_gpath"`
}

type Mask struct {
Expand Down
125 changes: 125 additions & 0 deletions utils/ows_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package utils

import (
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
"net/url"
"sort"
"strings"
)

type OWSCache struct {
MASAddress string
GPath string
verbose bool
}

func NewOWSCache(masAddress, gpath string, verbose bool) *OWSCache {
return &OWSCache{
MASAddress: masAddress,
GPath: gpath,
verbose: verbose,
}
}

func (o *OWSCache) Put(query string, value string) error {
reqURL := fmt.Sprintf("http://%s%s?put_ows_cache&query=%s", o.MASAddress, o.GPath, query)
postBody := url.Values{"value": {value}}
if o.verbose {
log.Printf("querying MAS for OWSCache Put: %v", reqURL)
}
resp, err := http.PostForm(reqURL, postBody)
if err != nil {
return err
}
defer resp.Body.Close()

body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}

type PutStatus struct {
Error string `json:"error"`
}

var status PutStatus
err = json.Unmarshal(body, &status)
if err != nil {
return err
}

if len(status.Error) > 0 {
return fmt.Errorf("%s", status.Error)
}
return nil
}

func (o *OWSCache) Get(query string) ([]byte, error) {
var result []byte
url := fmt.Sprintf("http://%s%s?get_ows_cache&query=%s", o.MASAddress, o.GPath, query)
if o.verbose {
log.Printf("querying MAS for OWSCache Get: %v", url)
}

resp, err := http.Get(url)
if err != nil {
return result, err
}
defer resp.Body.Close()

result, err = ioutil.ReadAll(resp.Body)
if err != nil {
return result, err
}
return result, nil
}

func (o *OWSCache) GetConfig(query string) (*Config, error) {
value, err := o.Get(query)
if err != nil {
return nil, err
}
type cacheResult struct {
Error string `json:"error"`
Config *Config `json:"value"`
}

var result cacheResult
err = json.Unmarshal(value, &result)
if err != nil {
return nil, err
}

if len(result.Error) > 0 {
return nil, fmt.Errorf("%s", result.Error)
}
return result.Config, nil
}

func FindConfigGPath(config *Config) string {
if len(strings.TrimSpace(config.ServiceConfig.OWSCacheGPath)) > 0 {
return config.ServiceConfig.OWSCacheGPath
}

if len(config.Layers) == 0 {
return ""
}
var layerList []*Layer
for iLayer := range config.Layers {
layer := &config.Layers[iLayer]
if hasBlendedService(layer) {
continue
}
if len(strings.TrimSpace(layer.DataSource)) == 0 {
continue
}
layerList = append(layerList, layer)
}

sort.Slice(layerList, func(i, j int) bool { return layerList[i].Name <= layerList[j].Name })
return layerList[0].DataSource
}

0 comments on commit 0fe59c7

Please sign in to comment.