From 34bc129b9c9da2583b24547663f4fc8dc6b1e08c Mon Sep 17 00:00:00 2001 From: Edison Guo Date: Thu, 29 Apr 2021 23:01:10 +1000 Subject: [PATCH] added ows_cache for GetCaps --- mas/api/api.go | 22 ++++++++ mas/api/mas.sql | 75 +++++++++++++++++++++++-- mas/db/shard.sql | 12 ++-- mas/db/shard_create.sh | 2 +- mas/db/shard_reset.sh | 2 +- ows.go | 39 ++++++++++++- utils/config.go | 1 + utils/ows_cache.go | 125 +++++++++++++++++++++++++++++++++++++++++ 8 files changed, 260 insertions(+), 18 deletions(-) create mode 100644 utils/ows_cache.go diff --git a/mas/api/api.go b/mas/api/api.go index 56475a6d..5cb9d029 100644 --- a/mas/api/api.go +++ b/mas/api/api.go @@ -137,6 +137,28 @@ func handler(response http.ResponseWriter, request *http.Request) { request.URL.Path, ).Scan(&payload) + } else if _, ok := query["put_ows_cache"]; ok { + err = db.QueryRow( + `select mas_put_ows_cache( + nullif($1,'')::text, + nullif($2,'')::text, + nullif($3,'')::jsonb + ) as json`, + request.URL.Path, + request.FormValue("query"), + request.FormValue("value"), + ).Scan(&payload) + + } else if _, ok := query["get_ows_cache"]; ok { + err = db.QueryRow( + `select mas_get_ows_cache( + nullif($1,'')::text, + nullif($2,'')::text + ) as json`, + request.URL.Path, + request.FormValue("query"), + ).Scan(&payload) + } else { httpJSONError(response, errors.New("unknown operation; currently supported: ?intersects, ?timestamps, ?extents"), 400) return diff --git a/mas/api/mas.sql b/mas/api/mas.sql index a5c2d707..748f91f3 100644 --- a/mas/api/mas.sql +++ b/mas/api/mas.sql @@ -589,7 +589,7 @@ create or replace function mas_timestamps( returns jsonb language plpgsql as $$ declare result jsonb; - query_hash text; + query_hash uuid; shard text; begin @@ -605,16 +605,16 @@ create or replace function mas_timestamps( end if; query_hash := md5(concat(gpath, coalesce(time_a::text, 'null'), - coalesce(time_b::text, 'null'), array_to_string(namespace, ',', 'null'))); + coalesce(time_b::text, 'null'), array_to_string(namespace, ',', 'null')))::uuid; - if token is not null and token = query_hash then - select jsonb_build_object('timestamps', '[]'::jsonb, 'token', query_hash) into result from timestamps_cache where query_id = query_hash; + if token is not null and token = query_hash::text then + select jsonb_build_object('timestamps', '[]'::jsonb, 'token', query_hash) into result from ows_cache where query_id = query_hash; if result is not null then return result; end if; end if; - select timestamps || jsonb_build_object('token', query_hash) into result from timestamps_cache where query_id = query_hash; + select value || jsonb_build_object('token', query_hash) into result from ows_cache where query_id = query_hash; if result is not null then return result; end if; @@ -657,7 +657,7 @@ create or replace function mas_timestamps( ), '[]'::jsonb), 'token', query_hash); - insert into timestamps_cache (query_id, timestamps) values (query_hash, result) + insert into ows_cache (query_id, value) values (query_hash, result) on conflict (query_id) do nothing; perform mas_reset(); @@ -666,6 +666,69 @@ create or replace function mas_timestamps( end $$; +create or replace function mas_put_ows_cache( + gpath text, + query text, + val jsonb +) + returns jsonb language plpgsql as $$ + declare + query_hash uuid; + shard text; + begin + + if gpath is null then + raise exception 'invalid search path'; + end if; + + perform mas_reset(); + shard := mas_view(gpath); + + if shard = '' then + raise exception 'invalid search path'; + end if; + + query_hash := md5(query)::uuid; + insert into ows_cache (query_id, value) values (query_hash, val) + on conflict (query_id) do update set value = val; + + perform mas_reset(); + return jsonb_build_object('error', ''); + end +$$; + +create or replace function mas_get_ows_cache( + gpath text, + query text +) + returns jsonb language plpgsql as $$ + declare + query_hash uuid; + shard text; + result jsonb; + begin + + if gpath is null then + raise exception 'invalid search path'; + end if; + + perform mas_reset(); + shard := mas_view(gpath); + + if shard = '' then + raise exception 'invalid search path'; + end if; + + query_hash := md5(query)::uuid; + + result := jsonb_build_object('value', + (select value from ows_cache where query_id = query_hash)); + + perform mas_reset(); + return result; + end +$$; + -- Find geospatial and temporal extents create or replace function mas_spatial_temporal_extents( diff --git a/mas/db/shard.sql b/mas/db/shard.sql index 030d5263..58e509b6 100644 --- a/mas/db/shard.sql +++ b/mas/db/shard.sql @@ -660,19 +660,17 @@ create or replace function refresh_polygons() end $$; -drop table if exists timestamps_cache cascade; - --- cache for timestamps -create unlogged table timestamps_cache ( - query_id text primary key, - timestamps jsonb not null +drop table if exists ows_cache cascade; +create table ows_cache ( + query_id uuid primary key, + value jsonb not null ); create or replace function refresh_caches() returns boolean language plpgsql as $$ begin raise notice 'refresh caches'; - truncate timestamps_cache; + truncate ows_cache; return true; end $$; diff --git a/mas/db/shard_create.sh b/mas/db/shard_create.sh index 7e3e0cb3..7411f740 100755 --- a/mas/db/shard_create.sh +++ b/mas/db/shard_create.sh @@ -31,7 +31,7 @@ insert into public.shards (sh_code, sh_path) \\i shard.sql -grant select,insert,update on ${shard}.timestamps_cache to api; +grant select,insert,update on ${shard}.ows_cache to api; EOD ) diff --git a/mas/db/shard_reset.sh b/mas/db/shard_reset.sh index 81e75eaa..a95d1009 100755 --- a/mas/db/shard_reset.sh +++ b/mas/db/shard_reset.sh @@ -15,7 +15,7 @@ alter default privileges for role mas in schema ${shard}_tmp grant select on tab \\i shard.sql -grant select,insert,update on ${shard}_tmp.timestamps_cache to api; +grant select,insert,update on ${shard}_tmp.ows_cache to api; EOD ) diff --git a/ows.go b/ows.go index b2bd1cc5..bbd9eb4b 100644 --- a/ows.go +++ b/ows.go @@ -186,15 +186,48 @@ func serveWMS(ctx context.Context, params utils.WMSParams, conf *utils.Config, r return } - newConf := conf.Copy(r) - utils.LoadConfigTimestamps(newConf, *verbose) + query := fmt.Sprintf("%swms_getcaps", r.URL.Path) + gpath := utils.FindConfigGPath(conf) + owsCache := utils.NewOWSCache(conf.ServiceConfig.MASAddress, gpath, *verbose) + newConf, err := owsCache.GetConfig(query) + cacheMiss := false + if err != nil { + if *verbose { + log.Printf("WMS GetCapabilities get cache error: %v", err) + } + cacheMiss = true + } else if newConf == nil { + cacheMiss = true + } else if len(newConf.Layers) == 0 { + cacheMiss = true + } - err := utils.ExecuteWriteTemplateFile(w, newConf, + if cacheMiss { + newConf = conf.Copy(r) + utils.LoadConfigTimestamps(newConf, *verbose) + } + + err = utils.ExecuteWriteTemplateFile(w, newConf, utils.DataDir+"/templates/WMS_GetCapabilities.tpl") if err != nil { metricsCollector.Info.HTTPStatus = 500 http.Error(w, err.Error(), 500) } + + if cacheMiss { + jsonBytes, mErr := json.Marshal(newConf) + if mErr == nil { + err = owsCache.Put(query, string(jsonBytes)) + if err != nil { + if *verbose { + log.Printf("WMS GetCapabilities put cache error: %v", err) + } + } + } else { + log.Printf("json.Marshal failed for WMS GetCapabilities") + } + } + case "GetFeatureInfo": x, y, err := utils.GetCoordinates(params) if err != nil { diff --git a/utils/config.go b/utils/config.go index 8d8a9abf..e0c7e47b 100644 --- a/utils/config.go +++ b/utils/config.go @@ -80,6 +80,7 @@ type ServiceConfig struct { TempDir string `json:"temp_dir"` MaxGrpcBufferSize int `json:"max_grpc_buffer_size"` EnableAutoLayers bool `json:"enable_auto_layers"` + OWSCacheGPath string `json:"ows_cache_gpath"` } type Mask struct { diff --git a/utils/ows_cache.go b/utils/ows_cache.go new file mode 100644 index 00000000..2c6e34f4 --- /dev/null +++ b/utils/ows_cache.go @@ -0,0 +1,125 @@ +package utils + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "log" + "net/http" + "net/url" + "sort" + "strings" +) + +type OWSCache struct { + MASAddress string + GPath string + verbose bool +} + +func NewOWSCache(masAddress, gpath string, verbose bool) *OWSCache { + return &OWSCache{ + MASAddress: masAddress, + GPath: gpath, + verbose: verbose, + } +} + +func (o *OWSCache) Put(query string, value string) error { + reqURL := fmt.Sprintf("http://%s%s?put_ows_cache&query=%s", o.MASAddress, o.GPath, query) + postBody := url.Values{"value": {value}} + if o.verbose { + log.Printf("querying MAS for OWSCache Put: %v", reqURL) + } + resp, err := http.PostForm(reqURL, postBody) + if err != nil { + return err + } + defer resp.Body.Close() + + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return err + } + + type PutStatus struct { + Error string `json:"error"` + } + + var status PutStatus + err = json.Unmarshal(body, &status) + if err != nil { + return err + } + + if len(status.Error) > 0 { + return fmt.Errorf("%s", status.Error) + } + return nil +} + +func (o *OWSCache) Get(query string) ([]byte, error) { + var result []byte + url := fmt.Sprintf("http://%s%s?get_ows_cache&query=%s", o.MASAddress, o.GPath, query) + if o.verbose { + log.Printf("querying MAS for OWSCache Get: %v", url) + } + + resp, err := http.Get(url) + if err != nil { + return result, err + } + defer resp.Body.Close() + + result, err = ioutil.ReadAll(resp.Body) + if err != nil { + return result, err + } + return result, nil +} + +func (o *OWSCache) GetConfig(query string) (*Config, error) { + value, err := o.Get(query) + if err != nil { + return nil, err + } + type cacheResult struct { + Error string `json:"error"` + Config *Config `json:"value"` + } + + var result cacheResult + err = json.Unmarshal(value, &result) + if err != nil { + return nil, err + } + + if len(result.Error) > 0 { + return nil, fmt.Errorf("%s", result.Error) + } + return result.Config, nil +} + +func FindConfigGPath(config *Config) string { + if len(strings.TrimSpace(config.ServiceConfig.OWSCacheGPath)) > 0 { + return config.ServiceConfig.OWSCacheGPath + } + + if len(config.Layers) == 0 { + return "" + } + var layerList []*Layer + for iLayer := range config.Layers { + layer := &config.Layers[iLayer] + if hasBlendedService(layer) { + continue + } + if len(strings.TrimSpace(layer.DataSource)) == 0 { + continue + } + layerList = append(layerList, layer) + } + + sort.Slice(layerList, func(i, j int) bool { return layerList[i].Name <= layerList[j].Name }) + return layerList[0].DataSource +}