Skip to content
This repository has been archived by the owner on Nov 14, 2020. It is now read-only.

Flock Agent tells the server when twig settings change, or server is enabled/disabled #59

Merged
merged 13 commits into from
Oct 18, 2019
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions flock_agent/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def log(self, module, func, msg="", always=False):
with open(self.log_filename, "a") as f:
time_str = time.strftime("%Y-%m-%d %H:%M:%S")
f.write("{} {}\n".format(time_str, final_msg))
os.chmod(self.log_filename, 0o600)

def get_resource_path(self, filename):
# In dev mode, look for resources directory relative to python file
Expand Down
10 changes: 8 additions & 2 deletions flock_agent/daemon/api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,18 @@ def ping(self):

def submit(self, data):
"""
Submit data to the Flock server.
data should a string, not an object.
Submit data to the Flock server
"""
self.c.log("FlockApiClient", "submit")
self._make_request("/submit", "post", True, data)

def submit_flock_logs(self, data):
"""
Submit flock logs to the Flock server
"""
self.c.log("FlockApiClient", "submit_flock_logs")
self._make_request("/submit_flock_logs", "post", True, data)

def _make_request(self, path, method, auth, data=None):
url = self._build_url(path)
if method == "get":
Expand Down
142 changes: 112 additions & 30 deletions flock_agent/daemon/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
import grp
import asyncio
import json
import time
from urllib.parse import urlparse
from aiohttp import web
from aiohttp.abc import AbstractAccessLogger
from aiohttp.web_runner import GracefulExit

from .global_settings import GlobalSettings
from .osquery import Osquery
from .flock_logs import FlockLog, FlockLogTypes
from .api_client import (
FlockApiClient,
PermissionDenied,
Expand Down Expand Up @@ -47,15 +49,18 @@ def __init__(self, common):

self.api_client = FlockApiClient(self.c)

# Start with refreshing osqueryd
self.osquery.refresh_osqueryd()

# Prepare the unix socket path
# Flock Agent lib directory
if Platform.current() == Platform.MACOS:
lib_dir = "/usr/local/var/lib/flock-agent"
else:
lib_dir = "/var/lib/flock-agent"
os.makedirs(lib_dir, exist_ok=True)

# Flock Agent keeps its own submission queue separate from osqueryd, for when users
# enable/disable the server, or enable/disable twigs
self.flock_log = FlockLog(self.c, lib_dir)

# Prepare the unix socket path
self.unix_socket_path = os.path.join(lib_dir, "socket")
if os.path.exists(self.unix_socket_path):
os.remove(self.unix_socket_path)
Expand All @@ -77,6 +82,9 @@ def __init__(self, common):
# Unknown, so make the group root
self.gid = 0

# Start with refreshing osqueryd
self.osquery.refresh_osqueryd()

async def start(self):
await asyncio.gather(self.submit_loop(), self.http_server())

Expand All @@ -96,6 +104,9 @@ async def submit_loop(self):
"Exception submitting logs: {}".format(exception_type),
)

# Submit Flock Agent logs
self.flock_log.submit_logs()
micahflee marked this conversation as resolved.
Show resolved Hide resolved

# Wait a minute
await asyncio.sleep(60)

Expand Down Expand Up @@ -135,16 +146,31 @@ async def get_setting(request):
async def set_setting(request):
key = request.match_info.get("key", None)
val = await request.json()
self.global_settings.set(key, val)
self.global_settings.save()
return response_object()

async def get_twig(request):
twig_id = request.match_info.get("twig_id", None)
try:
return response_object(self.global_settings.get_twig(twig_id))
except:
return response_object(error="invalid twig_id")
# Only change the setting if it's actually changing
old_val = self.global_settings.get(key)
if old_val == val:
self.c.log(
"Daemon",
"http_server.set_settings",
"skipping {}={}, because it's already set".format(key, val),
)
else:
self.c.log(
"Daemon",
"http_server.set_settings",
"setting {}={}".format(key, val),
)
self.global_settings.set(key, val)
self.global_settings.save()

if key == "use_server":
micahflee marked this conversation as resolved.
Show resolved Hide resolved
if val:
self.flock_log.log(FlockLogTypes.SERVER_ENABLED)
else:
self.flock_log.log(FlockLogTypes.SERVER_DISABLED)
micahflee marked this conversation as resolved.
Show resolved Hide resolved

return response_object()

async def exec_twig(request):
twig_id = request.match_info.get("twig_id", None)
Expand All @@ -155,16 +181,24 @@ async def exec_twig(request):
except:
return response_object(error="invalid twig_id")

async def enable_twig(request):
twig_id = await request.json()
self.global_settings.enable_twig(twig_id)
self.global_settings.save()
return response_object()
async def enable_undecided_twigs(request):
micahflee marked this conversation as resolved.
Show resolved Hide resolved
enabled_twig_ids = []
twig_ids = self.global_settings.get_undecided_twig_ids()
for twig_id in twig_ids:
if not self.global_settings.is_twig_enabled(twig_id):
self.global_settings.enable_twig(twig_id)
enabled_twig_ids.append(twig_id)

if len(enabled_twig_ids) > 0:
self.c.log(
"Daemon",
"http_server.enable_undecided_twigs",
"enabled twigs: {}".format(enabled_twig_ids),
)
self.global_settings.save()
self.osquery.refresh_osqueryd()
self.flock_log.log(FlockLogTypes.TWIGS_ENABLED, enabled_twig_ids)

async def disable_twig(request):
twig_id = await request.json()
self.global_settings.disable_twig(twig_id)
self.global_settings.save()
return response_object()

async def get_decided_twig_ids(request):
Expand All @@ -176,6 +210,59 @@ async def get_undecided_twig_ids(request):
async def get_enabled_twig_ids(request):
return response_object(self.global_settings.get_enabled_twig_ids())

async def get_twig_enabled_statuses(request):
return response_object(self.global_settings.get_twig_enabled_statuses())

async def update_twig_status(request):
twig_status = await request.json()

# Validate twig_status
if type(twig_status) != dict:
return response_object(error="twig_status must be a dict")
for twig_id in twig_status:
if twig_id not in self.global_settings.settings["twigs"]:
return response_object(
error="twig_status contains invalid twig_ids"
)
if type(twig_status[twig_id]) != bool:
return response_object(error="twig_status is in an invalid format")

enabled_twig_ids = []
disabled_twig_ids = []

for twig_id in twig_status:
if twig_status[twig_id] and not self.global_settings.is_twig_enabled(
twig_id
):
self.global_settings.enable_twig(twig_id)
enabled_twig_ids.append(twig_id)
if not twig_status[twig_id] and self.global_settings.is_twig_enabled(
twig_id
):
self.global_settings.disable_twig(twig_id)
disabled_twig_ids.append(twig_id)

if len(enabled_twig_ids) > 0 or len(disabled_twig_ids) > 0:
micahflee marked this conversation as resolved.
Show resolved Hide resolved
self.global_settings.save()
self.osquery.refresh_osqueryd()

if len(enabled_twig_ids) > 0:
self.c.log(
"Daemon",
"http_server.update_twig_status",
"enabled twigs: {}".format(enabled_twig_ids),
)
self.flock_log.log(FlockLogTypes.TWIGS_ENABLED, enabled_twig_ids)
if len(disabled_twig_ids) > 0:
self.c.log(
"Daemon",
"http_server.update_twig_status",
"disabled twigs: {}".format(disabled_twig_ids),
)
self.flock_log.log(FlockLogTypes.TWIGS_DISABLED, disabled_twig_ids)

return response_object()

async def exec_health(request):
health_item_name = request.match_info.get("health_item_name", None)
query = None
Expand All @@ -192,10 +279,6 @@ async def exec_health(request):
else:
return response_object(error="invalid health_item_name")

async def refresh_osqueryd(request):
self.osquery.refresh_osqueryd()
return response_object()

async def register_server(request):
data = await request.json()
try:
Expand Down Expand Up @@ -245,15 +328,14 @@ async def register_server(request):
app.router.add_post("/shutdown", shutdown)
app.router.add_get("/setting/{key}", get_setting)
app.router.add_post("/setting/{key}", set_setting)
app.router.add_get("/twig/{twig_id}", get_twig)
app.router.add_get("/exec_twig/{twig_id}", exec_twig)
app.router.add_post("/enable_twig", enable_twig)
app.router.add_post("/disable_twig", disable_twig)
app.router.add_post("/enable_undecided_twigs", enable_undecided_twigs)
app.router.add_get("/decided_twig_ids", get_decided_twig_ids)
app.router.add_get("/undecided_twig_ids", get_undecided_twig_ids)
app.router.add_get("/enabled_twig_ids", get_enabled_twig_ids)
app.router.add_get("/twig_enabled_statuses", get_twig_enabled_statuses)
app.router.add_post("/update_twig_status", update_twig_status)
app.router.add_get("/exec_health/{health_item_name}", exec_health)
app.router.add_get("/refresh_osqueryd", refresh_osqueryd)
app.router.add_post("/register_server", register_server)

loop = asyncio.get_event_loop()
Expand Down
140 changes: 140 additions & 0 deletions flock_agent/daemon/flock_logs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
import os
import time
import json

from .api_client import FlockApiClient


class FlockLog:
def __init__(self, common, lib_dir):
self.c = common
self.filename = os.path.join(lib_dir, "flock.log")
self.c.log("FlockLog", "__init__", self.filename)

# If the log file doesn't exist, create an empty one
if not os.path.exists(self.filename):
open(self.filename, "a").close()
os.chmod(self.filename, 0o600)

def log(self, flock_log_type, twig_ids=None):
with open(self.filename, "a") as f:
micahflee marked this conversation as resolved.
Show resolved Hide resolved
f.write(
json.dumps(
{
"type": flock_log_type,
"twig_ids": twig_ids,
"timestamp": int(time.time()),
}
)
+ "\n"
)
os.chmod(self.filename, 0o600)

def submit_logs(self):
# Keep track of the biggest timestamp we see
biggest_timestamp = self.c.global_settings.get("last_flock_log_timestamp")

# What's the results file's modified timestamp, before we start the import
try:
mtime = os.path.getmtime(self.filename)

# Load the log file
with open(self.filename, "r") as f:
lines = f.readlines()
if len(lines) > 0:
micahflee marked this conversation as resolved.
Show resolved Hide resolved
self.c.log("FlockLog", "submit_logs", "{} lines".format(len(lines)))

# Start an API client
api_client = FlockApiClient(self.c)
try:
api_client.ping()
except:
self.c.log(
"FlockLog",
"submit_logs",
"API is not configured properly",
always=True,
)
return
micahflee marked this conversation as resolved.
Show resolved Hide resolved

# Make a list of logs
logs = []
for line in lines:
line = line.strip()
try:
obj = json.loads(line)
if "timestamp" in obj:
micahflee marked this conversation as resolved.
Show resolved Hide resolved
if "type" not in obj:
obj["type"] = "unknown"

# If we haven't submitted this yet
if obj["timestamp"] > self.c.global_settings.get(
micahflee marked this conversation as resolved.
Show resolved Hide resolved
"last_flock_log_timestamp"
):
logs.append(obj)
else:
# Already submitted
self.c.log(
"FlockLog",
"submit_logs",
'skipping "{}" result, already submitted'.format(
micahflee marked this conversation as resolved.
Show resolved Hide resolved
obj["type"]
),
)
else:
self.c.log(
"FlockLog",
"submit_logs",
"warning: timestamp not in line: {}".format(line),
)

except json.decoder.JSONDecodeError:
micahflee marked this conversation as resolved.
Show resolved Hide resolved
self.c.log(
"FlockLog",
"submit_logs",
"warning: line is not valid JSON: {}".format(line),
)

# Submit them
api_client.submit_flock_logs(json.dumps(logs))
self.c.log(
"FlockLog",
"submit_logs",
"submitted logs: {}".format(
", ".join([obj["type"] for obj in logs])
),
)

# Update the biggest timestamp, if needed
if len(logs) > 0:
if logs[-1]["timestamp"] > biggest_timestamp:
biggest_timestamp = logs[-1]["timestamp"]

# Update timestamp in settings
if (
self.c.global_settings.get("last_flock_log_timestamp")
< biggest_timestamp
):
self.c.global_settings.set(
"last_flock_log_timestamp", biggest_timestamp
)
self.c.global_settings.save()

# If the log file hasn't been modified since we started the import, truncate it
if mtime == os.path.getmtime(self.filename):
micahflee marked this conversation as resolved.
Show resolved Hide resolved
with open(self.filename, "w") as f:
f.truncate()

except FileNotFoundError:
self.c.log(
"FlockLog",
"submit_logs",
"warning: file not found: {}".format(self.filename),
)


class FlockLogTypes:
SERVER_ENABLED = "server_enabled"
SERVER_DISABLED = "server_disabled"
TWIGS_ENABLED = "twigs_enabled"
TWIGS_DISABLED = "twigs_disabled"
Loading