Skip to content

Commit

Permalink
Refactor rest of state keeping into the Lock redis cacher
Browse files Browse the repository at this point in the history
  • Loading branch information
Jinna Kiisuo committed Sep 3, 2023
1 parent e8c79dd commit 06f88e6
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 64 deletions.
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
This is highly WIP. The design intent is to have a tiny service running on the node with borgbackup repos to handle locking them. This allows dependent upstream services, such as offsite sync jobs, to perform their work without having write access to the repositories.

## Known issues to get this out of early-WIP-devstate
- While creating, querying and releasing a lock works even with multiworker, not so much for a queued second lock.
- Stale redis state is left behind if locks are released by process control.
- The API response language is super unstable and subject to change.
- Metrics exporting needs to be added to aid in monitoring stale locks.
- Process control in unconfirmed with docker + multiworker

## Runtime requirements
- A Redis service, the address provided with the `--redis_host` flag.
- A Bearer Token, provided with the `--token` flag. Providing a matching token is required for all API calls.
Expand Down
1 change: 1 addition & 0 deletions borg_lockservice/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
PREFIX = "BORG_LOCKSERVICE"
67 changes: 67 additions & 0 deletions borg_lockservice/lock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
from typing import Optional

from aiocache import Cache
import psutil
import signal
import os

import borg_lockservice as service


class Lock:
__repo: str
cache: Cache

@classmethod
async def create(
cls, repo: str, pid: int, redis_host: str, redis_port: int
) -> "Lock":
if not psutil.pid_exists(pid):
raise ValueError(f"Not a valid pid: {pid}")

self = Lock()
self.__repo = repo
self.cache = Cache(
Cache.REDIS,
endpoint=redis_host,
port=redis_port,
namespace=f"{service.PREFIX}:{repo}",
)
await self.cache.set("pid", pid)
return self

@classmethod
async def find(
cls, repo: str, redis_host: str, redis_port: int, pid: Optional[int] = None
) -> Optional["Lock"]:
if pid and not psutil.pid_exists(pid):
return None

self = Lock()
self.cache = Cache(
Cache.REDIS,
endpoint=redis_host,
port=redis_port,
namespace=f"{service.PREFIX}:{repo}",
)

value = await self.cache.get("pid")
if (pid and value and pid == value) or (not pid and value):
self.__repo = repo
return self
return None

@property
def repo(self) -> str:
return self.__repo

@property
async def pid(self) -> int:
return await self.cache.get("pid")

async def kill(self) -> None:
pid = await self.pid
os.kill(pid, signal.SIGTERM)

async def terminate(self) -> None:
await self.cache.delete("pid")
90 changes: 26 additions & 64 deletions borg_lockservice/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,58 +12,56 @@
from fastapi.security.http import HTTPAuthorizationCredentials
from fastapi.logger import logger
from contextlib import asynccontextmanager
from aiocache import Cache
from pathlib import Path

import subprocess
import signal
import psutil
import socket
import tempfile

import borg_lockservice as service
from .lock import Lock

FLAGS = flags.FLAGS
PREFIX = "BORG_LOCKSERVICE"
auth = HTTPBearer()

flags.DEFINE_string(
"token",
os.getenv(f"{PREFIX}_TOKEN", None),
os.getenv(f"{service.PREFIX}_TOKEN", None),
"Bearer token required to access the API.",
)

flags.DEFINE_string(
"repodir",
os.getenv(f"{PREFIX}_REPODIR", None),
os.getenv(f"{service.PREFIX}_REPODIR", None),
"Directory containing repos.",
)

flags.DEFINE_string(
"host",
os.getenv(f"{PREFIX}_HOST", "0.0.0.0"),
os.getenv(f"{service.PREFIX}_HOST", "0.0.0.0"),
"Listen address for the host.",
)

flags.DEFINE_integer(
"port",
os.getenv(f"{PREFIX}_PORT", 8000),
os.getenv(f"{service.PREFIX}_PORT", 8000),
"Listen port for the service. Defaults to 8000",
)

flags.DEFINE_boolean(
"dev",
os.getenv(f"{PREFIX}_DEV", False),
os.getenv(f"{service.PREFIX}_DEV", False),
"Enable development mode. Defaults to False, should not be enabled in production.",
)

flags.DEFINE_string(
"redis_host",
os.getenv(f"{PREFIX}_REDIS_HOST", None),
os.getenv(f"{service.PREFIX}_REDIS_HOST", None),
"Host portion of a redis server used for keeping state.",
)
flags.DEFINE_integer(
"redis_port",
os.getenv(f"{PREFIX}_REDIS_PORT", 6379),
os.getenv(f"{service.PREFIX}_REDIS_PORT", 6379),
"Port of the redis server.",
)

Expand All @@ -72,61 +70,25 @@
flags.mark_flag_as_required("redis_host")


# The lifespan takes care of static state that is not modified by workers
# Any state writes by workers will cause desync! Don't do it!
@asynccontextmanager
async def lifespan(app: FastAPI):
# Manually init flags so they're available to workers
FLAGS(sys.argv)
app.state.repos = get_available_repos(FLAGS.repodir)
app.state.locks = {}
logger.setLevel(logging.DEBUG if FLAGS.dev else logging.INFO)
app.state.log = logging.getLogger("uvicorn.error")
yield


# FLAGS(sys.argv)
app = FastAPI(lifespan=lifespan)


class Lock:
__repo: str
cache: Cache

@classmethod
async def create(cls, repo: str, pid: int) -> "Lock":
if not psutil.pid_exists(pid):
raise ValueError(f"Not a valid pid: {pid}")

self = Lock()
self.__repo = repo
self.cache = Cache(
Cache.REDIS,
endpoint=FLAGS.redis_host,
port=FLAGS.redis_port,
namespace=f"{PREFIX}:{repo}",
)
await self.cache.set("pid", pid)
return self

@property
def repo(self) -> str:
return self.__repo

@property
async def pid(self) -> int:
return await self.cache.get("pid")

async def kill(self) -> None:
pid = await self.pid
os.kill(pid, signal.SIGTERM)

async def terminate(self) -> None:
await self.cache.delete("pid")


@app.get("/")
async def root():
return {
"message": PREFIX,
"state": "OK",
}


Expand All @@ -147,7 +109,7 @@ async def lock(
with tempfile.TemporaryDirectory() as socket_dir:
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.settimeout(timeout_seconds)
sock_path = socket_dir / Path(f"{PREFIX}_envoy.sock")
sock_path = socket_dir / Path(f"{service.PREFIX}_envoy.sock")
start_envoy(repo_path, sock_path, timeout_seconds)
app.state.log.info(f"Started envoy, waiting on {sock_path}")
sock.bind(bytes(sock_path))
Expand All @@ -168,13 +130,12 @@ async def lock(
break
else:
app.state.log.info("Envoy confirmed lock")
lock = await Lock.create(repo, pid)
app.state.locks[repo] = lock
await Lock.create(repo, pid, FLAGS.redis_host, FLAGS.redis_port)
break

finally:
connection.close()
return {"message": f"Locked", "pid": pid}
return {"state": "locked", "pid": pid}


@app.get("/unlock/{repo}")
Expand All @@ -185,20 +146,19 @@ async def unlock(
):
if token.credentials != FLAGS.token:
raise HTTPException(status_code=403)
if repo not in app.state.locks:
lock = await Lock.find(
repo, pid=pid, redis_host=FLAGS.redis_host, redis_port=FLAGS.redis_port
)
if not lock:
raise HTTPException(status_code=404)
lock = app.state.locks[repo]
if pid != await lock.pid:
raise HTTPException(status_code=403)

try:
await lock.kill() # Kill the envoy releasing the lock
except OSError as e:
raise ValueError(f"Unable to kill envoy: {e}")
finally:
await lock.terminate() # Clear lock state from cache
del app.state.locks[repo]
return {"message": "Unlocked"}
return {"state": "unlocked"}


@app.get("/status/{repo}")
Expand All @@ -210,11 +170,13 @@ async def status(
raise HTTPException(status_code=403)
if not get_repo_path(repo):
raise HTTPException(status_code=404)
if repo not in app.state.locks:
return {"message": "Unknown"}
lock = await Lock.find(
repo, redis_host=FLAGS.redis_host, redis_port=FLAGS.redis_port
)
if not lock:
return {"state": "unknown"}

pid = await app.state.locks[repo].pid
return {"message": "Locked", "pid": pid}
return {"state": "locked", "pid": await lock.pid}


@app.get("/list")
Expand Down

0 comments on commit 06f88e6

Please sign in to comment.