Skip to content

Commit

Permalink
[Fixes #10537] Improve rules creation using GeoFence batch - more imp…
Browse files Browse the repository at this point in the history
…rovements
  • Loading branch information
etj committed Jan 26, 2023
1 parent 63016b0 commit 83f00ff
Show file tree
Hide file tree
Showing 10 changed files with 479 additions and 383 deletions.
27 changes: 16 additions & 11 deletions geonode/api/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from guardian.shortcuts import get_anonymous_user

from geonode import geoserver
from geonode.geoserver.manager import GeoServerResourceManager
from geonode.maps.models import Map
from geonode.layers.models import Dataset
from geonode.documents.models import Document
Expand Down Expand Up @@ -166,26 +167,31 @@ def test_dataset_get_detail_unauth_dataset_not_public(self):
'api_name': 'api',
'resource_name': 'datasets'})

resp = self.client.get(list_url)
self.assertEqual(len(self.deserialize(resp)['objects']), 8)

layer = Dataset.objects.first()
layer.set_permissions(self.perm_spec)
layer.clear_dirty_state()
self.assertHttpNotFound(self.api_client.get(
f"{list_url + str(layer.id)}/"))

self.api_client.client.login(username=self.user, password=self.passwd)
resp = self.api_client.get(f"{list_url + str(layer.id)}/")
self.assertValidJSONResponse(resp)
resp = self.client.get(list_url)
self.assertEqual(len(self.deserialize(resp)['objects']), 7)
self.assertHttpNotFound(self.client.get(f"{list_url + str(layer.id)}/"))

self.client.login(username=self.user, password=self.passwd)
self.assertValidJSONResponse(self.client.get(f"{list_url + str(layer.id)}/"))

# with delayed security
with self.settings(DELAYED_SECURITY_SIGNALS=True):
with self.settings(DELAYED_SECURITY_SIGNALS=True, GEOFENCE_SECURITY_ENABLED=True):
if check_ogc_backend(geoserver.BACKEND_PACKAGE):
from geonode.geoserver.security import sync_geofence_with_guardian
sync_geofence_with_guardian(layer, self.perm_spec)

gm = GeoServerResourceManager()
gm.set_permissions(layer.uuid, instance=layer, permissions=self.perm_spec)
self.assertTrue(layer.dirty_state)

self.client.login(username=self.user, password=self.passwd)
resp = self.client.get(list_url)
self.assertEqual(len(self.deserialize(resp)['objects']), 7)
self.assertEqual(len(self.deserialize(resp)['objects']), 7) # admin can't see resources in dirty_state

self.client.logout()
resp = self.client.get(list_url)
Expand All @@ -194,8 +200,7 @@ def test_dataset_get_detail_unauth_dataset_not_public(self):
from django.contrib.auth import get_user_model
get_user_model().objects.create(
username='imnew',
password='pbkdf2_sha256$12000$UE4gAxckVj4Z$N\
6NbOXIQWWblfInIoq/Ta34FdRiPhawCIZ+sOO3YQs=')
password='pbkdf2_sha256$12000$UE4gAxckVj4Z$N6NbOXIQWWblfInIoq/Ta34FdRiPhawCIZ+sOO3YQs=')
self.client.login(username='imnew', password='thepwd')
resp = self.client.get(list_url)
self.assertEqual(len(self.deserialize(resp)['objects']), 7)
Expand Down
182 changes: 127 additions & 55 deletions geonode/geoserver/geofence.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,24 @@
#
#########################################################################

import itertools
import logging
import urllib
import requests

from django.conf import settings
from requests.auth import HTTPBasicAuth
import traceback
import urllib

logger = logging.getLogger(__name__)

ogc_server_settings = settings.OGC_SERVER['default']


class GeofenceException(Exception):
pass


class Rule:
"""_summary_
JSON representation of a GeoFence Rule
A GeoFence Rule.
Provides the object to be rendered as JSON.
e.g.:
{"Rule":
Expand All @@ -56,9 +55,11 @@ class Rule:
ALLOW = "ALLOW"
DENY = "DENY"
LIMIT = "LIMIT"

CM_MIXED = "MIXED"

def __init__(self, priority, workspace, layer, access: (str, bool),
def __init__(self, access: (str, bool),
priority=None, workspace=None, layer=None,
user=None, group=None,
service=None, request=None, subfield=None,
geo_limit=None, catalog_mode=None) -> None:
Expand Down Expand Up @@ -99,14 +100,18 @@ def __init__(self, priority, workspace, layer, access: (str, bool),
if limits:
self.fields['limits'] = limits

def get_object(self):
def set_priority(self, pri: int):
self.fields['priority'] = pri

def get_object(self) -> dict:
logger.debug(f"Creating Rule object: {self.fields}")
return {'Rule': self.fields}


class Batch:
"""_summary_
Returns a list of Operations that GeoFence can execute in a batch
A GeoFence Batch.
It's a list of operations that will be executed transactionally inside GeoFence.
e.g.:
{
Expand Down Expand Up @@ -201,44 +206,64 @@ def add_insert_rule(self, rule: Rule):
operation.update(rule.get_object())
self.operations.append(operation)

def get_batch_length(self):
def get_batch_length(self) -> int:
return len(self.operations)

def get_object(self):
logger.debug(f"Creating Batch object {self.log_name} with {len(self.operations)} operations")
def get_object(self) -> dict:
return {
'Batch': {
'operations': self.operations
}
}


class GeofenceClient:
class AutoPriorityBatch(Batch):
"""_summary_
Instance of a simple GeoFence REST client allowing to interact with the GeoServer APIs.
Exposes few utility methods to insert or purge the rules and run batches of operations.
A Batch that handles the priority of the inserted rules.
The first rule will have the declared `start_rule_pri`, next Rules will have the priority incremented.
"""

Returns:
_type_: Rule
def __init__(self, start_rule_pri: int, log_name=None) -> None:
super().__init__(log_name)
self.pri = itertools.count(start_rule_pri)

def add_insert_rule(self, rule: Rule):
rule.set_priority(self.pri.__next__())
super().add_insert_rule(rule)


class GeoFenceClient:
"""_summary_
A GeoFence REST client allowing to interact with the embedded GeoFence API (which is slightly incompatible
with the original standalone GeoFence API.)
The class methods map on GeoFence operations.
Functionalities needing more than one call are implemented within GeoFenceUtils.
"""

def __init__(self, baseurl: str, username: str, pw: str) -> None:
if not baseurl.endswith('/'):
baseurl += '/'

self.baseurl = baseurl
self.username = username
self.pw = pw
self.timeout = 60

def set_timeout(self, timeout: int):
self.timeout = timeout

def invalidate_cache(self):
r = requests.put(
f'{self.baseurl.rstrip("/")}/geofence/ruleCache/invalidate',
f'{self.baseurl}ruleCache/invalidate',
auth=HTTPBasicAuth(self.username, self.pw))

if r.status_code != 200:
logger.debug("Could not invalidate cache")
raise GeofenceException("Could not invalidate cache")

def get_rules(self, page=None, entries=None,
workspace=None, workspace_any=None,
layer=None, layer_any=None):
def get_rules(self, page: int = None, entries: int = None,
workspace: str = None, workspace_any: bool = None,
layer: str = None, layer_any: bool = None):
if (page is None and entries is not None) or (page is not None and entries is None):
raise GeofenceException(f"Bad page/entries combination {page}/{entries}")

Expand All @@ -261,13 +286,13 @@ def get_rules(self, page=None, entries=None,
if value is not None:
params[param] = value

url = f'{self.baseurl.rstrip("/")}/geofence/rules.json?{urllib.parse.urlencode(params)}'
url = f'{self.baseurl}rules.json?{urllib.parse.urlencode(params)}'

r = requests.get(
url,
headers={'Content-type': 'application/json'},
auth=HTTPBasicAuth(self.username, self.pw),
timeout=ogc_server_settings.get('TIMEOUT', 10),
timeout=self.timeout,
verify=False)

if r.status_code != 200:
Expand All @@ -287,10 +312,10 @@ def get_rules_count(self):
http://<host>:<port>/geoserver/rest/geofence/rules/count.json
"""
r = requests.get(
f'{self.baseurl.rstrip("/")}/geofence/rules/count.json',
f'{self.baseurl}rules/count.json',
headers={'Content-type': 'application/json'},
auth=HTTPBasicAuth(self.username, self.pw),
timeout=ogc_server_settings.get('TIMEOUT', 10),
timeout=self.timeout,
verify=False)

if r.status_code != 200:
Expand All @@ -312,11 +337,10 @@ def insert_rule(self, rule: Rule):
http://<host>:<port>/geoserver/rest/geofence/rules
"""
r = requests.post(
f'{self.baseurl.rstrip("/")}/geofence/rules',
# headers={'Content-type': 'application/json'},
f'{self.baseurl}rules',
json=rule.get_object(),
auth=HTTPBasicAuth(self.username, self.pw),
timeout=ogc_server_settings.get('TIMEOUT', 60),
timeout=self.timeout,
verify=False)

if r.status_code not in (200, 201):
Expand All @@ -327,58 +351,106 @@ def insert_rule(self, rule: Rule):
logger.debug("Error while inserting rule", exc_info=e)
raise GeofenceException(f"Error while inserting rule: {e}")

def run_batch(self, batch: Batch):
def run_batch(self, batch: Batch, timeout: int = None) -> bool:
if batch.get_batch_length() == 0:
logger.debug(f'Skipping batch execution {batch.log_name}')
return
return False

logger.debug(f"Running batch {batch.log_name} with {batch.get_batch_length()} operations")
try:
"""
curl -X GET -u admin:geoserver \
http://<host>:<port>/geoserver/rest/geofence/rules/count.json
"""
r = requests.post(
f'{self.baseurl.rstrip("/")}/geofence/batch/exec',
f'{self.baseurl}batch/exec',
json=batch.get_object(),
auth=HTTPBasicAuth(self.username, self.pw),
timeout=ogc_server_settings.get('TIMEOUT', 60),
timeout=timeout or self.timeout,
verify=False)

if r.status_code != 200:
logger.debug(f"Error while running batch {batch.log_name}: [{r.status_code}] - {r.content}")
logger.debug(f"Error while running batch {batch.log_name}: [{r.status_code}] - {r.content}"
f"\n {batch.get_object()}")
raise GeofenceException(f"Error while running batch {batch.log_name}: [{r.status_code}]")

return
return True

except Exception as e:
logger.debug(f"Error while requesting batch execution {batch.log_name}", exc_info=e)
logger.info(f"Error while requesting batch exec {batch.log_name}")
logger.debug(f"Error while requesting batch exec {batch.log_name} --> {batch.get_object()}", exc_info=e)
raise GeofenceException(f"Error while requesting batch execution {batch.log_name}: {e}")

def purge_all_rules(self):

class GeoFenceUtils:
def __init__(self, client: GeoFenceClient):
self.geofence = client

def delete_all_rules(self):
"""purge all existing GeoFence Cache Rules"""
rules_objs = self.get_rules()
rules_objs = self.geofence.get_rules()
rules = rules_objs['rules']

batch = Batch('Purge All')
for rule in rules:
batch.add_delete_rule(rule['id'])

logger.debug(f"Going to remove all {len(rules)} rules in geofence")
self.run_batch(batch)

def purge_layer_rules(self, layer_name: str, workspace: str = None):
"""purge existing GeoFence Cache Rules related to a specific Layer"""
gs_rules = self.get_rules(
workspace=workspace, workspace_any=False,
layer=layer_name, layer_any=False)

batch = Batch(f'Purge {workspace}:{layer_name}')

if gs_rules and gs_rules['rules']:
logger.debug(f"Going to remove {len(gs_rules['rules'])} rules for layer '{layer_name}'")
for r in gs_rules['rules']:
if r['layer'] and r['layer'] == layer_name:
batch.add_delete_rule(r['id'])
else:
logger.debug(f"Bad rule retrieved for dataset '{layer_name}': {r}")
self.run_batch(batch)
self.geofence.run_batch(batch)

def collect_delete_layer_rules(self, workspace_name: str, layer_name: str, batch: Batch = None) -> Batch:
"""Collect delete operations in a Batch for all rules related to a layer"""

try:
# Scan GeoFence Rules associated to the Dataset
gs_rules = self.geofence.get_rules(
workspace=workspace_name, workspace_any=False,
layer=layer_name, layer_any=False)

if not batch:
batch = Batch(f'Delete {workspace_name}:{layer_name}')

cnt = 0
if gs_rules and gs_rules['rules']:
logger.debug(f"Going to collect {len(gs_rules['rules'])} rules for layer '{workspace_name}:{layer_name}'")
for r in gs_rules['rules']:
if r['layer'] and r['layer'] == layer_name:
batch.add_delete_rule(r['id'])
cnt += 1
else:
logger.warning(f"Bad rule retrieved for dataset '{workspace_name or ''}:{layer_name}': {r}")

logger.debug(f"Adding {cnt} rule deletion operations for '{workspace_name or ''}:{layer_name}")
return batch

except Exception as e:
logger.error(f"Error collecting rules for {workspace_name}:{layer_name}", exc_info=e)
tb = traceback.format_exc()
logger.debug(tb)

def delete_layer_rules(self, workspace_name: str, layer_name: str) -> bool:
"""Delete all Rules related to a specific Layer"""
try:
batch = self.collect_delete_layer_rules(workspace_name, layer_name)
return self.geofence.run_batch(batch)

except Exception as e:
logger.error(f"Error removing rules for {workspace_name}:{layer_name}", exc_info=e)
tb = traceback.format_exc()
logger.debug(tb)
return False

def get_first_available_priority(self):
"""Get the highest Rules priority"""
try:
rules_count = self.geofence.get_rules_count()
rules_objs = self.geofence.get_rules(page=rules_count - 1, entries=1)
if len(rules_objs['rules']) > 0:
highest_priority = rules_objs['rules'][0]['priority']
else:
highest_priority = 0
return int(highest_priority) + 1
except Exception:
tb = traceback.format_exc()
logger.debug(tb)
return -1
Loading

0 comments on commit 83f00ff

Please sign in to comment.