Skip to content
This repository has been archived by the owner on Nov 14, 2020. It is now read-only.

Flock Agent tells the server when twig settings change, or server is enabled/disabled #59

Merged
merged 13 commits into from
Oct 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions flock_agent/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def log(self, module, func, msg="", always=False):
with open(self.log_filename, "a") as f:
time_str = time.strftime("%Y-%m-%d %H:%M:%S")
f.write("{} {}\n".format(time_str, final_msg))
os.chmod(self.log_filename, 0o600)

def get_resource_path(self, filename):
# In dev mode, look for resources directory relative to python file
Expand Down
10 changes: 8 additions & 2 deletions flock_agent/daemon/api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,18 @@ def ping(self):

def submit(self, data):
"""
Submit data to the Flock server.
data should a string, not an object.
Submit data to the Flock server
"""
self.c.log("FlockApiClient", "submit")
self._make_request("/submit", "post", True, data)

def submit_flock_logs(self, data):
"""
Submit flock logs to the Flock server
"""
self.c.log("FlockApiClient", "submit_flock_logs")
self._make_request("/submit_flock_logs", "post", True, data)

def _make_request(self, path, method, auth, data=None):
url = self._build_url(path)
if method == "get":
Expand Down
178 changes: 135 additions & 43 deletions flock_agent/daemon/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
import grp
import asyncio
import json
import time
from urllib.parse import urlparse
from aiohttp import web
from aiohttp.abc import AbstractAccessLogger
from aiohttp.web_runner import GracefulExit

from .global_settings import GlobalSettings
from .osquery import Osquery
from .flock_logs import FlockLog, FlockLogTypes
from .api_client import (
FlockApiClient,
PermissionDenied,
Expand Down Expand Up @@ -47,15 +49,18 @@ def __init__(self, common):

self.api_client = FlockApiClient(self.c)

# Start with refreshing osqueryd
self.osquery.refresh_osqueryd()

# Prepare the unix socket path
# Flock Agent lib directory
if Platform.current() == Platform.MACOS:
lib_dir = "/usr/local/var/lib/flock-agent"
else:
lib_dir = "/var/lib/flock-agent"
os.makedirs(lib_dir, exist_ok=True)

# Flock Agent keeps its own submission queue separate from osqueryd, for when users
# enable/disable the server, or enable/disable twigs
self.flock_log = FlockLog(self.c, lib_dir)

# Prepare the unix socket path
self.unix_socket_path = os.path.join(lib_dir, "socket")
if os.path.exists(self.unix_socket_path):
os.remove(self.unix_socket_path)
Expand All @@ -77,6 +82,9 @@ def __init__(self, common):
# Unknown, so make the group root
self.gid = 0

# Start with refreshing osqueryd
self.osquery.refresh_osqueryd()

async def start(self):
await asyncio.gather(self.submit_loop(), self.http_server())

Expand All @@ -85,20 +93,34 @@ async def submit_loop(self):
if self.global_settings.get("use_server") and self.global_settings.get(
"gateway_token"
):
# Submit osquery logs
try:
self.osquery.submit_logs()
except Exception as e:
exception_type = type(e).__name__
self.c.log(
"Daemon",
"submit_loop",
"Exception submitting logs: {}".format(exception_type),
)
await self.submit_logs_osquery()
await self.submit_logs_flock()

# Wait a minute
await asyncio.sleep(60)

async def submit_logs_osquery(self):
# Submit osquery logs
try:
self.osquery.submit_logs()
except Exception as e:
exception_type = type(e).__name__
self.c.log(
"Daemon", "submit_loop", f"Exception submitting logs: {exception_type}"
)

async def submit_logs_flock(self):
# Submit Flock Agent logs
try:
self.flock_log.submit_logs()
except Exception as e:
exception_type = type(e).__name__
self.c.log(
"Daemon",
"submit_loop",
f"Exception submitting flock logs: {exception_type}",
)

async def http_server(self):
self.c.log("Daemon", "http_server", "Starting http server")

Expand All @@ -114,7 +136,7 @@ def log(self, request, response, time):
common_log(
"Daemon.http_server",
"AccessLogger",
"{} {} {}".format(request.method, request.path, response.status),
f"{request.method} {request.path} {response.status}",
)

# Routes
Expand All @@ -135,16 +157,29 @@ async def get_setting(request):
async def set_setting(request):
key = request.match_info.get("key", None)
val = await request.json()
self.global_settings.set(key, val)
self.global_settings.save()
return response_object()

async def get_twig(request):
twig_id = request.match_info.get("twig_id", None)
try:
return response_object(self.global_settings.get_twig(twig_id))
except:
return response_object(error="invalid twig_id")
# Only change the setting if it's actually changing
old_val = self.global_settings.get(key)
if old_val == val:
self.c.log(
"Daemon",
"http_server.set_settings",
f"skipping {key}={val}, because it's already set",
)
else:
self.c.log("Daemon", "http_server.set_settings", f"setting {key}={val}")
self.global_settings.set(key, val)
self.global_settings.save()

if key == "use_server":
micahflee marked this conversation as resolved.
Show resolved Hide resolved
if val:
self.flock_log.log(FlockLogTypes.SERVER_ENABLED)
else:
self.flock_log.log(FlockLogTypes.SERVER_DISABLED)
micahflee marked this conversation as resolved.
Show resolved Hide resolved
# Submit flock logs right away
await self.submit_logs_flock()

return response_object()

async def exec_twig(request):
twig_id = request.match_info.get("twig_id", None)
Expand All @@ -155,16 +190,25 @@ async def exec_twig(request):
except:
return response_object(error="invalid twig_id")

async def enable_twig(request):
twig_id = await request.json()
self.global_settings.enable_twig(twig_id)
self.global_settings.save()
return response_object()
async def enable_undecided_twigs(request):
micahflee marked this conversation as resolved.
Show resolved Hide resolved
# If the user choose to automatically opt-in to new twigs, this enables them all
enabled_twig_ids = []
twig_ids = self.global_settings.get_undecided_twig_ids()
for twig_id in twig_ids:
if not self.global_settings.is_twig_enabled(twig_id):
self.global_settings.enable_twig(twig_id)
enabled_twig_ids.append(twig_id)

if enabled_twig_ids:
self.c.log(
"Daemon",
"http_server.enable_undecided_twigs",
f"enabled twigs: {enabled_twig_ids}",
)
self.global_settings.save()
self.osquery.refresh_osqueryd()
self.flock_log.log(FlockLogTypes.TWIGS_ENABLED, enabled_twig_ids)

async def disable_twig(request):
twig_id = await request.json()
self.global_settings.disable_twig(twig_id)
self.global_settings.save()
return response_object()

async def get_decided_twig_ids(request):
Expand All @@ -176,6 +220,59 @@ async def get_undecided_twig_ids(request):
async def get_enabled_twig_ids(request):
return response_object(self.global_settings.get_enabled_twig_ids())

async def get_twig_enabled_statuses(request):
return response_object(self.global_settings.get_twig_enabled_statuses())

async def update_twig_status(request):
twig_status = await request.json()

# Validate twig_status
if type(twig_status) != dict:
return response_object(error="twig_status must be a dict")
for twig_id in twig_status:
if twig_id not in self.global_settings.settings["twigs"]:
return response_object(
error="twig_status contains invalid twig_ids"
)
if type(twig_status[twig_id]) != bool:
return response_object(error="twig_status is in an invalid format")

enabled_twig_ids = []
disabled_twig_ids = []

for twig_id in twig_status:
if twig_status[twig_id] and not self.global_settings.is_twig_enabled(
twig_id
):
self.global_settings.enable_twig(twig_id)
enabled_twig_ids.append(twig_id)
if not twig_status[twig_id] and self.global_settings.is_twig_enabled(
twig_id
):
self.global_settings.disable_twig(twig_id)
disabled_twig_ids.append(twig_id)

if enabled_twig_ids or disabled_twig_ids:
self.global_settings.save()
self.osquery.refresh_osqueryd()

if enabled_twig_ids:
self.c.log(
"Daemon",
"http_server.update_twig_status",
f"enabled twigs: {enabled_twig_ids}",
)
self.flock_log.log(FlockLogTypes.TWIGS_ENABLED, enabled_twig_ids)
if disabled_twig_ids:
self.c.log(
"Daemon",
"http_server.update_twig_status",
f"disabled twigs: {disabled_twig_ids}",
)
self.flock_log.log(FlockLogTypes.TWIGS_DISABLED, disabled_twig_ids)

return response_object()

async def exec_health(request):
health_item_name = request.match_info.get("health_item_name", None)
query = None
Expand All @@ -192,10 +289,6 @@ async def exec_health(request):
else:
return response_object(error="invalid health_item_name")

async def refresh_osqueryd(request):
self.osquery.refresh_osqueryd()
return response_object()

async def register_server(request):
data = await request.json()
try:
Expand Down Expand Up @@ -227,11 +320,11 @@ async def register_server(request):
except PermissionDenied:
return response_object(error="Permission denied")
except BadStatusCode as e:
return response_object(error="Bad status code: {}".format(e))
return response_object(error=f"Bad status code: {e}")
except ResponseIsNotJson:
return response_object(error="Server response is not JSON")
except RespondedWithError as e:
return response_object(error="Server error: {}".format(e))
return response_object(error=f"Server error: {e}")
except InvalidResponse:
return response_object(error="Server returned an invalid response")
except ConnectionError:
Expand All @@ -245,15 +338,14 @@ async def register_server(request):
app.router.add_post("/shutdown", shutdown)
app.router.add_get("/setting/{key}", get_setting)
app.router.add_post("/setting/{key}", set_setting)
app.router.add_get("/twig/{twig_id}", get_twig)
app.router.add_get("/exec_twig/{twig_id}", exec_twig)
app.router.add_post("/enable_twig", enable_twig)
app.router.add_post("/disable_twig", disable_twig)
app.router.add_post("/enable_undecided_twigs", enable_undecided_twigs)
app.router.add_get("/decided_twig_ids", get_decided_twig_ids)
app.router.add_get("/undecided_twig_ids", get_undecided_twig_ids)
app.router.add_get("/enabled_twig_ids", get_enabled_twig_ids)
app.router.add_get("/twig_enabled_statuses", get_twig_enabled_statuses)
app.router.add_post("/update_twig_status", update_twig_status)
app.router.add_get("/exec_health/{health_item_name}", exec_health)
app.router.add_get("/refresh_osqueryd", refresh_osqueryd)
app.router.add_post("/register_server", register_server)

loop = asyncio.get_event_loop()
Expand Down
Loading