Skip to content

Commit

Permalink
Add multiqueue capabilities
Browse files Browse the repository at this point in the history
  • Loading branch information
gabriel-piles committed Sep 19, 2024
1 parent 4881c3f commit 6437dfe
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 150 deletions.
18 changes: 10 additions & 8 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,14 +1,7 @@
FROM python:3.9-slim-bullseye AS base

ENV VIRTUAL_ENV=/opt/venv
RUN python -m venv $VIRTUAL_ENV
ENV PATH="$VIRTUAL_ENV/bin:$PATH"

COPY requirements.txt requirements.txt
RUN pip install --upgrade pip
RUN pip install -r requirements.txt

RUN apt-get update
RUN apt-get install -y git
RUN apt-get install -y ocrmypdf
RUN apt-get install -y tesseract-ocr-fra
RUN apt-get install -y tesseract-ocr-spa
Expand All @@ -19,6 +12,15 @@ RUN apt-get install -y tesseract-ocr-hin
RUN apt-get install -y tesseract-ocr-tam
RUN apt-get install -y tesseract-ocr-tha
RUN apt-get install -y tesseract-ocr-chi-sim

ENV VIRTUAL_ENV=/opt/venv
RUN python -m venv $VIRTUAL_ENV
ENV PATH="$VIRTUAL_ENV/bin:$PATH"

COPY requirements.txt requirements.txt
RUN pip install --upgrade pip
RUN pip install -r requirements.txt

# Add more languages as needed

RUN mkdir /app
Expand Down
9 changes: 5 additions & 4 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
git+https://github.com/huridocs/queue-processor@bab1f4419b0768df518d06795afd5df2ba0e331c

aiofiles==0.6.0
fastapi~=0.67.0
graypy~=2.1.0
gunicorn==20.1.0
pydantic~=1.8.2
PyRSMQ==0.4.5
python-multipart==0.0.5
PyYAML~=5.4.1
redis~=3.5.3
requests~=2.26.0
uvicorn==0.20.0

uvicorn==0.20.0
flake8==6.0.0
pdfplumber==0.7.6
black==23.1.0
sentry-sdk==1.15.0

PyYAML==6.0.2
120 changes: 40 additions & 80 deletions src/QueueProcessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,106 +3,61 @@

import redis
from pydantic import ValidationError
from queue_processor.QueueProcessor import QueueProcessor
from rsmq.consumer import RedisSMQConsumer
from rsmq import RedisSMQ
from sentry_sdk.integrations.redis import RedisIntegration
import sentry_sdk

from ServiceConfig import ServiceConfig
from ServiceConfig import ServiceConfig, QUEUES_NAMES
from ExtractionMessage import ExtractionMessage

from Task import Task
from ocr_pdf import ocr_pdf


class QueueProcessor:
def __init__(self):
self.config = ServiceConfig()
self.logger = self.config.get_logger("redis_tasks")

self.results_queue = RedisSMQ(
host=self.config.redis_host,
port=self.config.redis_port,
qname=self.config.results_queue_name,
)

def process(self, id, message, rc, ts):
try:
task = Task(**message)
except ValidationError:
self.logger.error(f"Not a valid message: {message}")
return True

self.logger.info(f"Valid message: {message}")

try:
processed_pdf_filepath = ocr_pdf(
task.params.filename, task.tenant, task.params.language
)
def process(message):
service_config = ServiceConfig()
logger = service_config.get_logger("redis_tasks")
try:
task = Task(**message)
except ValidationError:
logger.error(f"Not a valid message: {message}")
return None

if not processed_pdf_filepath:
extraction_message = ExtractionMessage(
tenant=task.tenant,
task=task.task,
params=task.params,
success=False,
error_message="Error during pdf ocr",
)
logger.info(f"Valid message: {message}")

self.results_queue.sendMessage().message(
extraction_message.dict()
).execute()
self.logger.error(extraction_message.json())
return True
try:
processed_pdf_filepath = ocr_pdf(
task.params.filename, task.tenant, task.params.language
)

processed_pdf_url = f"{self.config.service_url}/processed_pdf/{task.tenant}/{task.params.filename}"
if not processed_pdf_filepath:
extraction_message = ExtractionMessage(
tenant=task.tenant,
task=task.task,
params=task.params,
success=True,
file_url=processed_pdf_url,
success=False,
error_message="Error during pdf ocr",
)

self.logger.info(extraction_message.json())
self.results_queue.sendMessage(delay=3).message(
extraction_message.dict()
).execute()
return True
except Exception:
self.logger.error("error", exc_info=1)
return True

def subscribe_to_extractions_tasks_queue(self):
while True:
try:
self.results_queue.createQueue().vt(120).exceptions(False).execute()
extractions_tasks_queue = RedisSMQ(
host=self.config.redis_host,
port=self.config.redis_port,
qname=self.config.tasks_queue_name,
)
logger.error(extraction_message.json())
return extraction_message.dict()

extractions_tasks_queue.createQueue().vt(120).exceptions(
False
).execute()

self.logger.info(
f"Connecting to redis: {self.config.redis_host}:{self.config.redis_port}"
)
processed_pdf_url = f"{service_config.service_url}/processed_pdf/{task.tenant}/{task.params.filename}"
extraction_message = ExtractionMessage(
tenant=task.tenant,
task=task.task,
params=task.params,
success=True,
file_url=processed_pdf_url,
)

redis_smq_consumer = RedisSMQConsumer(
qname=self.config.tasks_queue_name,
processor=self.process,
host=self.config.redis_host,
port=self.config.redis_port,
)
redis_smq_consumer.run()
except redis.exceptions.ConnectionError:
self.logger.error(
f"Error connecting to redis: {self.config.redis_host}:{self.config.redis_port}"
)
sleep(20)
logger.info(extraction_message.json())
return extraction_message.dict()
except Exception:
logger.error("error", exc_info=1)
return None


if __name__ == "__main__":
Expand All @@ -116,5 +71,10 @@ def subscribe_to_extractions_tasks_queue(self):
except Exception:
pass

redis_tasks_processor = QueueProcessor()
redis_tasks_processor.subscribe_to_extractions_tasks_queue()
service_config = ServiceConfig()
logger = service_config.get_logger("redis_tasks")
queues_names = QUEUES_NAMES.split(" ")
queue_processor = QueueProcessor(
service_config.redis_host, service_config.redis_port, queues_names, logger
)
queue_processor.start(process)
60 changes: 2 additions & 58 deletions src/ServiceConfig.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@


OPTIONS = ["redis_host", "redis_port", "service_host", "service_port"]
SERVICE_NAME = "ocr"


APP_PATH = Path(__file__).parent.absolute()
Expand All @@ -17,11 +16,11 @@
PDF_PROCESSED_PATH = f"{DATA_PATH}/processed_pdfs"
PDF_FAILED = f"{DATA_PATH}/failed_pdfs"

QUEUES_NAMES = os.environ.get("QUEUES_NAMES", "ocr")


class ServiceConfig:
def __init__(self):
self.tasks_queue_name = SERVICE_NAME + "_tasks"
self.results_queue_name = SERVICE_NAME + "_results"
self.paths: Dict[str, str] = dict(
{
"app": APP_PATH,
Expand Down Expand Up @@ -82,58 +81,3 @@ def get_logger(self, logger_name):
)
logger.addHandler(handler)
return logger

def write_configuration(self, config_dict: Dict[str, str]):
config_to_write = dict()
for config_key, config_value in config_dict.items():
if not config_value and config_key not in self.config_from_yml:
continue

if not config_value and config_key in self.config_from_yml:
config_to_write[config_key] = self.config_from_yml[config_key]
continue

config_to_write[config_key] = config_value

if "graylog_ip" in self.config_from_yml:
config_to_write["graylog_ip"] = self.config_from_yml["graylog_ip"]

if len(config_to_write) == 0:
return

with open("config.yml", "w") as config_file:
config_file.write(
"\n".join([f"{k}: {v}" for k, v in config_to_write.items()])
)

def create_configuration(self):
config_dict = dict()

config_dict["redis_host"] = self.redis_host
config_dict["redis_port"] = self.redis_port
config_dict["service_host"] = self.service_host
config_dict["service_port"] = self.service_port

print(":::::::::: Actual configuration :::::::::::\n")
for config_key in config_dict:
print(f"{config_key}: {config_dict[config_key]}")

user_input = None

while user_input not in ("yes", "n", "y", "no", "N", "Y", ""):
user_input = input("\nDo you want to change the configuration? [Y/n]\n")

if user_input != "" and user_input[0].lower() == "n":
return

print("[Enter to DO NOT modify it]")
for option in OPTIONS:
configuration_input = input(f"{option}: [{config_dict[option]}] ")
config_dict[option] = configuration_input

self.write_configuration(config_dict)


if __name__ == "__main__":
config = ServiceConfig()
config.create_configuration()

0 comments on commit 6437dfe

Please sign in to comment.