Skip to content
This repository has been archived by the owner on Nov 14, 2020. It is now read-only.

Commit

Permalink
Merge pull request #59 from firstlookmedia/38_flock_logs
Browse files Browse the repository at this point in the history
Flock Agent tells the server when twig settings change, or server is enabled/disabled
  • Loading branch information
micahflee authored Oct 18, 2019
2 parents 2855b79 + c2de454 commit 8520fe8
Show file tree
Hide file tree
Showing 11 changed files with 336 additions and 120 deletions.
1 change: 1 addition & 0 deletions flock_agent/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def log(self, module, func, msg="", always=False):
with open(self.log_filename, "a") as f:
time_str = time.strftime("%Y-%m-%d %H:%M:%S")
f.write("{} {}\n".format(time_str, final_msg))
os.chmod(self.log_filename, 0o600)

def get_resource_path(self, filename):
# In dev mode, look for resources directory relative to python file
Expand Down
10 changes: 8 additions & 2 deletions flock_agent/daemon/api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,18 @@ def ping(self):

def submit(self, data):
"""
Submit data to the Flock server.
data should a string, not an object.
Submit data to the Flock server
"""
self.c.log("FlockApiClient", "submit")
self._make_request("/submit", "post", True, data)

def submit_flock_logs(self, data):
"""
Submit flock logs to the Flock server
"""
self.c.log("FlockApiClient", "submit_flock_logs")
self._make_request("/submit_flock_logs", "post", True, data)

def _make_request(self, path, method, auth, data=None):
url = self._build_url(path)
if method == "get":
Expand Down
178 changes: 135 additions & 43 deletions flock_agent/daemon/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
import grp
import asyncio
import json
import time
from urllib.parse import urlparse
from aiohttp import web
from aiohttp.abc import AbstractAccessLogger
from aiohttp.web_runner import GracefulExit

from .global_settings import GlobalSettings
from .osquery import Osquery
from .flock_logs import FlockLog, FlockLogTypes
from .api_client import (
FlockApiClient,
PermissionDenied,
Expand Down Expand Up @@ -47,15 +49,18 @@ def __init__(self, common):

self.api_client = FlockApiClient(self.c)

# Start with refreshing osqueryd
self.osquery.refresh_osqueryd()

# Prepare the unix socket path
# Flock Agent lib directory
if Platform.current() == Platform.MACOS:
lib_dir = "/usr/local/var/lib/flock-agent"
else:
lib_dir = "/var/lib/flock-agent"
os.makedirs(lib_dir, exist_ok=True)

# Flock Agent keeps its own submission queue separate from osqueryd, for when users
# enable/disable the server, or enable/disable twigs
self.flock_log = FlockLog(self.c, lib_dir)

# Prepare the unix socket path
self.unix_socket_path = os.path.join(lib_dir, "socket")
if os.path.exists(self.unix_socket_path):
os.remove(self.unix_socket_path)
Expand All @@ -77,6 +82,9 @@ def __init__(self, common):
# Unknown, so make the group root
self.gid = 0

# Start with refreshing osqueryd
self.osquery.refresh_osqueryd()

async def start(self):
await asyncio.gather(self.submit_loop(), self.http_server())

Expand All @@ -85,20 +93,34 @@ async def submit_loop(self):
if self.global_settings.get("use_server") and self.global_settings.get(
"gateway_token"
):
# Submit osquery logs
try:
self.osquery.submit_logs()
except Exception as e:
exception_type = type(e).__name__
self.c.log(
"Daemon",
"submit_loop",
"Exception submitting logs: {}".format(exception_type),
)
await self.submit_logs_osquery()
await self.submit_logs_flock()

# Wait a minute
await asyncio.sleep(60)

async def submit_logs_osquery(self):
# Submit osquery logs
try:
self.osquery.submit_logs()
except Exception as e:
exception_type = type(e).__name__
self.c.log(
"Daemon", "submit_loop", f"Exception submitting logs: {exception_type}"
)

async def submit_logs_flock(self):
# Submit Flock Agent logs
try:
self.flock_log.submit_logs()
except Exception as e:
exception_type = type(e).__name__
self.c.log(
"Daemon",
"submit_loop",
f"Exception submitting flock logs: {exception_type}",
)

async def http_server(self):
self.c.log("Daemon", "http_server", "Starting http server")

Expand All @@ -114,7 +136,7 @@ def log(self, request, response, time):
common_log(
"Daemon.http_server",
"AccessLogger",
"{} {} {}".format(request.method, request.path, response.status),
f"{request.method} {request.path} {response.status}",
)

# Routes
Expand All @@ -135,16 +157,29 @@ async def get_setting(request):
async def set_setting(request):
key = request.match_info.get("key", None)
val = await request.json()
self.global_settings.set(key, val)
self.global_settings.save()
return response_object()

async def get_twig(request):
twig_id = request.match_info.get("twig_id", None)
try:
return response_object(self.global_settings.get_twig(twig_id))
except:
return response_object(error="invalid twig_id")
# Only change the setting if it's actually changing
old_val = self.global_settings.get(key)
if old_val == val:
self.c.log(
"Daemon",
"http_server.set_settings",
f"skipping {key}={val}, because it's already set",
)
else:
self.c.log("Daemon", "http_server.set_settings", f"setting {key}={val}")
self.global_settings.set(key, val)
self.global_settings.save()

if key == "use_server":
if val:
self.flock_log.log(FlockLogTypes.SERVER_ENABLED)
else:
self.flock_log.log(FlockLogTypes.SERVER_DISABLED)
# Submit flock logs right away
await self.submit_logs_flock()

return response_object()

async def exec_twig(request):
twig_id = request.match_info.get("twig_id", None)
Expand All @@ -155,16 +190,25 @@ async def exec_twig(request):
except:
return response_object(error="invalid twig_id")

async def enable_twig(request):
twig_id = await request.json()
self.global_settings.enable_twig(twig_id)
self.global_settings.save()
return response_object()
async def enable_undecided_twigs(request):
# If the user choose to automatically opt-in to new twigs, this enables them all
enabled_twig_ids = []
twig_ids = self.global_settings.get_undecided_twig_ids()
for twig_id in twig_ids:
if not self.global_settings.is_twig_enabled(twig_id):
self.global_settings.enable_twig(twig_id)
enabled_twig_ids.append(twig_id)

if enabled_twig_ids:
self.c.log(
"Daemon",
"http_server.enable_undecided_twigs",
f"enabled twigs: {enabled_twig_ids}",
)
self.global_settings.save()
self.osquery.refresh_osqueryd()
self.flock_log.log(FlockLogTypes.TWIGS_ENABLED, enabled_twig_ids)

async def disable_twig(request):
twig_id = await request.json()
self.global_settings.disable_twig(twig_id)
self.global_settings.save()
return response_object()

async def get_decided_twig_ids(request):
Expand All @@ -176,6 +220,59 @@ async def get_undecided_twig_ids(request):
async def get_enabled_twig_ids(request):
return response_object(self.global_settings.get_enabled_twig_ids())

async def get_twig_enabled_statuses(request):
return response_object(self.global_settings.get_twig_enabled_statuses())

async def update_twig_status(request):
twig_status = await request.json()

# Validate twig_status
if type(twig_status) != dict:
return response_object(error="twig_status must be a dict")
for twig_id in twig_status:
if twig_id not in self.global_settings.settings["twigs"]:
return response_object(
error="twig_status contains invalid twig_ids"
)
if type(twig_status[twig_id]) != bool:
return response_object(error="twig_status is in an invalid format")

enabled_twig_ids = []
disabled_twig_ids = []

for twig_id in twig_status:
if twig_status[twig_id] and not self.global_settings.is_twig_enabled(
twig_id
):
self.global_settings.enable_twig(twig_id)
enabled_twig_ids.append(twig_id)
if not twig_status[twig_id] and self.global_settings.is_twig_enabled(
twig_id
):
self.global_settings.disable_twig(twig_id)
disabled_twig_ids.append(twig_id)

if enabled_twig_ids or disabled_twig_ids:
self.global_settings.save()
self.osquery.refresh_osqueryd()

if enabled_twig_ids:
self.c.log(
"Daemon",
"http_server.update_twig_status",
f"enabled twigs: {enabled_twig_ids}",
)
self.flock_log.log(FlockLogTypes.TWIGS_ENABLED, enabled_twig_ids)
if disabled_twig_ids:
self.c.log(
"Daemon",
"http_server.update_twig_status",
f"disabled twigs: {disabled_twig_ids}",
)
self.flock_log.log(FlockLogTypes.TWIGS_DISABLED, disabled_twig_ids)

return response_object()

async def exec_health(request):
health_item_name = request.match_info.get("health_item_name", None)
query = None
Expand All @@ -192,10 +289,6 @@ async def exec_health(request):
else:
return response_object(error="invalid health_item_name")

async def refresh_osqueryd(request):
self.osquery.refresh_osqueryd()
return response_object()

async def register_server(request):
data = await request.json()
try:
Expand Down Expand Up @@ -227,11 +320,11 @@ async def register_server(request):
except PermissionDenied:
return response_object(error="Permission denied")
except BadStatusCode as e:
return response_object(error="Bad status code: {}".format(e))
return response_object(error=f"Bad status code: {e}")
except ResponseIsNotJson:
return response_object(error="Server response is not JSON")
except RespondedWithError as e:
return response_object(error="Server error: {}".format(e))
return response_object(error=f"Server error: {e}")
except InvalidResponse:
return response_object(error="Server returned an invalid response")
except ConnectionError:
Expand All @@ -245,15 +338,14 @@ async def register_server(request):
app.router.add_post("/shutdown", shutdown)
app.router.add_get("/setting/{key}", get_setting)
app.router.add_post("/setting/{key}", set_setting)
app.router.add_get("/twig/{twig_id}", get_twig)
app.router.add_get("/exec_twig/{twig_id}", exec_twig)
app.router.add_post("/enable_twig", enable_twig)
app.router.add_post("/disable_twig", disable_twig)
app.router.add_post("/enable_undecided_twigs", enable_undecided_twigs)
app.router.add_get("/decided_twig_ids", get_decided_twig_ids)
app.router.add_get("/undecided_twig_ids", get_undecided_twig_ids)
app.router.add_get("/enabled_twig_ids", get_enabled_twig_ids)
app.router.add_get("/twig_enabled_statuses", get_twig_enabled_statuses)
app.router.add_post("/update_twig_status", update_twig_status)
app.router.add_get("/exec_health/{health_item_name}", exec_health)
app.router.add_get("/refresh_osqueryd", refresh_osqueryd)
app.router.add_post("/register_server", register_server)

loop = asyncio.get_event_loop()
Expand Down
Loading

0 comments on commit 8520fe8

Please sign in to comment.