-
Notifications
You must be signed in to change notification settings - Fork 4.9k
/
test_metricsets.py
94 lines (70 loc) · 4.01 KB
/
test_metricsets.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import jinja2
import os
import platform
import sys
import time
import unittest
from auditbeat_xpack import *
COMMON_FIELDS = ["@timestamp", "host.name", "event.module", "event.dataset"]
class Test(AuditbeatXPackTest):
def test_metricset_host(self):
"""
host metricset collects general information about a server.
"""
fields = ["system.audit.host.uptime", "system.audit.host.ip", "system.audit.host.os.name"]
# Metricset is experimental and that generates a warning, TODO: remove later
self.check_metricset("system", "host", COMMON_FIELDS + fields, warnings_allowed=True)
@unittest.skipUnless(sys.platform == "linux2", "Only implemented for Linux")
@unittest.skipIf(sys.byteorder != "little", "Test only implemented for little-endian systems")
def test_metricset_login(self):
"""
login metricset collects information about logins (successful and failed) and system restarts.
"""
fields = ["event.origin", "event.outcome", "message", "process.pid", "source.ip",
"user.name", "user.terminal"]
config = {
"login.wtmp_file_pattern": os.path.abspath(os.path.join(self.beat_path, "tests/files/wtmp")),
"login.btmp_file_pattern": "-1"
}
# Metricset is experimental and that generates a warning, TODO: remove later
self.check_metricset("system", "login", COMMON_FIELDS + fields, config, warnings_allowed=True)
@unittest.skipIf(sys.platform == "win32", "Not implemented for Windows")
@unittest.skipIf(sys.platform == "linux2" and platform.linux_distribution()[0] != "debian",
"Only implemented for Debian")
def test_metricset_package(self):
"""
package metricset collects information about installed packages on a system.
"""
fields = ["system.audit.package.name", "system.audit.package.version", "system.audit.package.installtime"]
# Metricset is experimental and that generates a warning, TODO: remove later
self.check_metricset("system", "package", COMMON_FIELDS + fields, warnings_allowed=True)
def test_metricset_process(self):
"""
process metricset collects information about processes running on a system.
"""
fields = ["process.pid", "process.ppid", "process.name", "process.executable", "process.args",
"process.start", "process.working_directory", "user.id", "user.group.id"]
# Windows does not have effective and saved IDs, and user.name is not always filled for system processes.
if sys.platform != "win32":
fields.extend(["user.effective.id", "user.saved.id", "user.effective.group.id", "user.saved.group.id",
"user.name", "user.group.name"])
# Metricset is experimental and that generates a warning, TODO: remove later
self.check_metricset("system", "process", COMMON_FIELDS + fields, warnings_allowed=True)
@unittest.skipUnless(sys.platform == "linux2", "Only implemented for Linux")
def test_metricset_socket(self):
"""
socket metricset collects information about open sockets on a system.
"""
fields = ["destination.port"]
# errors_allowed=True - The socket metricset fills the `error` field if the process enrichment fails
# (e.g. process has exited). This should not fail the test.
# warnings_allowed=True - Metricset is experimental and that generates a warning, TODO: remove later
self.check_metricset("system", "socket", COMMON_FIELDS + fields, errors_allowed=True, warnings_allowed=True)
@unittest.skipUnless(sys.platform == "linux2", "Only implemented for Linux")
def test_metricset_user(self):
"""
user metricset collects information about users on a server.
"""
fields = ["system.audit.user.name"]
# Metricset is experimental and that generates a warning, TODO: remove later
self.check_metricset("system", "user", COMMON_FIELDS + fields, warnings_allowed=True)