Skip to content

Commit

Permalink
#828 refactor and split functions
Browse files Browse the repository at this point in the history
  • Loading branch information
D-GopalKrishna committed Dec 11, 2023
1 parent 644911d commit 7aa30ec
Showing 1 changed file with 76 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,36 +52,63 @@ def check(self):

def search_qs(self, filter=None, q=None, tags=None, user_id=None, show_all=False, *args, **kwargs):
q_base = self.model.query
q_base = self.filter_by_user_and_fieldkey(
filter, user_id, show_all, q_base)

if filter is None:
if tags:
q_base = self.filter_by_tags(tags, q_base)
else:
q_base = self.filter_by_name_description(filter, q_base)
q_base = self.filter_by_search_tags(filter, q_base)

if tags:
q_base = q_base.intersect(self.filter_by_tags(tags, q_base))

q_base = q_base.filter(
*[self._create_filter(*f) for f in filter if f[0].key != "name"])

return q_base.order_by(desc(WorkspaceEntity.timestamp_updated))

def filter_by_user_and_fieldkey(self, filter, user_id, show_all, q_base):
if filter and any(field for field, condition, value in filter if field.key == "publicable" and value):
pass
elif user_id is not None:
# Admins see all workspaces, non admin users can see only their own workspaces or shared with them
if not show_all:
q_base = q_base.filter_by(user_id=user_id)
q_base = q_base.union(q_base.filter(
q_base = q_base.union(self.model.query.filter(
WorkspaceEntity.collaborators.any(user_id=user_id)))
else:
q_base = q_base
else:
# No logged in user, show only public (in case was not specified)
q_base = q_base.filter(WorkspaceEntity.publicable == True)
return q_base

def filter_by_tags(self, tags, q_base):
q_base = q_base.join(self.model.tags).filter(
func.lower(Tag.tag).in_(func.lower(t) for t in tags.split("+")))
return q_base

if filter is not None:
if tags:
q_base = q_base.filter(
*[self._create_filter(*f) for f in filter if f[0].key == "name"] )
q_base = q_base.intersect(self.model.query.join(self.model.tags).filter(
func.lower(Tag.tag).in_(func.lower(t) for t in tags.split("+"))))

q_base = q_base.filter(
*[self._create_filter(*f) for f in filter if f[0].key != "name"])
elif tags:
q_base = q_base.join(self.model.tags).filter(
func.lower(Tag.tag).in_(func.lower(t) for t in tags.split("+")))
def filter_by_search_tags(self, filter, q_base):
search_tags = self.tags_from_search(filter)
q_base = q_base.union(q_base.join(self.model.tags).filter(
func.lower(Tag.tag).in_(func.lower(t) for t in search_tags.split("+"))))

return q_base.order_by(desc(WorkspaceEntity.timestamp_updated))
return q_base

def tags_from_search(self, filter):
tags = ''
for field, condition, value in filter:
if field.key == "name":
tags = value.replace("+", "").replace("%", "")
break
return tags

def filter_by_name_description(self, filter, q_base):
return q_base.filter(
or_(*[self._create_filter(*f) for f in filter if (f[0].key == "name" or f[0].key == "description")]))

def delete(self, id):
super().delete(id)
Expand All @@ -100,21 +127,48 @@ def get(self, id):

def search_qs(self, filter=None, q=None, tags=None, types=None, user_id=None):
q_base = self.model.query
if tags:
q_base = q_base.join(self.model.tags).filter(
Tag.tag.in_(tags.split("+")))
if filter:
q_base = q_base.filter(
or_(*[self._create_filter(*f) for f in filter]))
if user_id is not None:
q_base = q_base.filter_by(user_id=user_id)

if filter is None:
if tags:
q_base = self.filter_by_tags(tags, q_base)
else:
q_base = self.filter_by_name_summary(filter, q_base)
q_base = self.filter_by_search_tags(filter, q_base)
if tags:
q_base = q_base.intersect(self.filter_by_tags(tags, q_base))

if types is not None:
q_base = q_base.filter(
or_(self.model.content_types.ilike(f"%{t}%") for t in types.split("+")))

if user_id is not None:
q_base = q_base.filter_by(user_id=user_id)
return q_base.order_by(desc(OSBRepositoryEntity.timestamp_updated))

def filter_by_name_summary(self, filter, q_base):
q_base = q_base.filter(
or_(*[self._create_filter(*f) for f in filter]))
return q_base

def filter_by_tags(self, tags, q_base):
q_base = q_base.join(self.model.tags).filter(
func.lower(Tag.tag).in_(func.lower(t) for t in tags.split("+")))
return q_base

def filter_by_search_tags(self, filter, q_base):
search_tags = self.tags_from_search(filter)
q_base = q_base.union(self.model.query.join(self.model.tags).filter(
func.lower(Tag.tag).in_(func.lower(t) for t in search_tags.split("+"))))
return q_base

def tags_from_search(self, filter):
tags = ''
for field, condition, value in filter:
if field.key == "name":
tags = value.replace("+", "").replace("%", "")
break
return tags


class VolumeStorageRepository(BaseModelRepository):
model = VolumeStorage
Expand Down

0 comments on commit 7aa30ec

Please sign in to comment.