From 13b654d9a8db496f9143a4683c1e681423d64c95 Mon Sep 17 00:00:00 2001 From: Liu Shilong Date: Tue, 6 Sep 2022 11:30:27 +0800 Subject: [PATCH] Revert "Streaming structured events implementation (#11848)" This reverts commit 6a54bc439a980c7fc10e7a74de6472ee7731f8a5. --- dockers/docker-eventd/Dockerfile.j2 | 36 - dockers/docker-eventd/critical_processes | 1 - dockers/docker-eventd/start.sh | 6 - dockers/docker-eventd/supervisord.conf | 52 - dockers/docker-fpm-frr/Dockerfile.j2 | 6 - dockers/docker-fpm-frr/bgp_regex.json | 8 - dockers/docker-fpm-frr/events_info.json | 10 - files/build_templates/docker_image_ctl.j2 | 1 - files/build_templates/eventd.service.j2 | 17 - files/build_templates/init_cfg.json.j2 | 3 +- files/build_templates/rsyslog_plugin.conf.j2 | 19 - .../build_templates/sonic_debian_extension.j2 | 4 - rules/docker-config-engine-bullseye.mk | 4 +- rules/docker-config-engine-buster.mk | 1 - rules/docker-eventd.dep | 11 - rules/docker-eventd.mk | 47 - rules/eventd.dep | 10 - rules/eventd.mk | 19 - rules/scripts.mk | 4 - rules/telemetry.mk | 5 +- slave.mk | 2 - src/sonic-eventd/Makefile | 84 -- src/sonic-eventd/debian/changelog | 5 - src/sonic-eventd/debian/compat | 1 - src/sonic-eventd/debian/control | 14 - src/sonic-eventd/debian/rules | 6 - src/sonic-eventd/rsyslog_plugin/main.cpp | 57 -- .../rsyslog_plugin/rsyslog_plugin.cpp | 135 --- .../rsyslog_plugin/rsyslog_plugin.h | 40 - src/sonic-eventd/rsyslog_plugin/subdir.mk | 13 - .../rsyslog_plugin/syslog_parser.cpp | 65 -- .../rsyslog_plugin/syslog_parser.h | 46 - .../rsyslog_plugin/timestamp_formatter.cpp | 74 -- .../rsyslog_plugin/timestamp_formatter.h | 27 - .../rsyslog_plugin_ut.cpp | 274 ------ .../rsyslog_plugin_tests/subdir.mk | 12 - .../rsyslog_plugin_tests/test_regex_1.rc.json | 0 .../rsyslog_plugin_tests/test_regex_2.rc.json | 7 - .../rsyslog_plugin_tests/test_regex_3.rc.json | 6 - .../rsyslog_plugin_tests/test_regex_4.rc.json | 7 - .../rsyslog_plugin_tests/test_regex_5.rc.json | 7 - .../rsyslog_plugin_tests/test_syslogs.txt | 4 - .../rsyslog_plugin_tests/test_syslogs_2.txt | 3 - src/sonic-eventd/src/eventd.cpp | 798 --------------- src/sonic-eventd/src/eventd.h | 268 ----- src/sonic-eventd/src/main.cpp | 18 - src/sonic-eventd/src/subdir.mk | 13 - src/sonic-eventd/tests/eventd_ut.cpp | 915 ------------------ src/sonic-eventd/tests/main.cpp | 97 -- .../database_config.json | 112 --- .../database_config0.json | 92 -- .../database_config1.json | 92 -- .../database_global.json | 16 - src/sonic-eventd/tests/subdir.mk | 12 - src/sonic-eventd/tools/events_publish_tool.py | 97 -- src/sonic-eventd/tools/events_tool.cpp | 328 ------- src/sonic-eventd/tools/events_volume_test.py | 68 -- src/sonic-eventd/tools/sample_ip.json | 1 - src/sonic-eventd/tools/subdir.mk | 12 - 59 files changed, 4 insertions(+), 4088 deletions(-) delete mode 100644 dockers/docker-eventd/Dockerfile.j2 delete mode 100644 dockers/docker-eventd/critical_processes delete mode 100755 dockers/docker-eventd/start.sh delete mode 100644 dockers/docker-eventd/supervisord.conf delete mode 100644 dockers/docker-fpm-frr/bgp_regex.json delete mode 100644 dockers/docker-fpm-frr/events_info.json delete mode 100644 files/build_templates/eventd.service.j2 delete mode 100644 files/build_templates/rsyslog_plugin.conf.j2 delete mode 100644 rules/docker-eventd.dep delete mode 100644 rules/docker-eventd.mk delete mode 100644 rules/eventd.dep delete mode 100644 rules/eventd.mk delete mode 100644 src/sonic-eventd/Makefile delete mode 100644 src/sonic-eventd/debian/changelog delete mode 100644 src/sonic-eventd/debian/compat delete mode 100644 src/sonic-eventd/debian/control delete mode 100755 src/sonic-eventd/debian/rules delete mode 100644 src/sonic-eventd/rsyslog_plugin/main.cpp delete mode 100644 src/sonic-eventd/rsyslog_plugin/rsyslog_plugin.cpp delete mode 100644 src/sonic-eventd/rsyslog_plugin/rsyslog_plugin.h delete mode 100644 src/sonic-eventd/rsyslog_plugin/subdir.mk delete mode 100644 src/sonic-eventd/rsyslog_plugin/syslog_parser.cpp delete mode 100644 src/sonic-eventd/rsyslog_plugin/syslog_parser.h delete mode 100644 src/sonic-eventd/rsyslog_plugin/timestamp_formatter.cpp delete mode 100644 src/sonic-eventd/rsyslog_plugin/timestamp_formatter.h delete mode 100644 src/sonic-eventd/rsyslog_plugin_tests/rsyslog_plugin_ut.cpp delete mode 100644 src/sonic-eventd/rsyslog_plugin_tests/subdir.mk delete mode 100644 src/sonic-eventd/rsyslog_plugin_tests/test_regex_1.rc.json delete mode 100644 src/sonic-eventd/rsyslog_plugin_tests/test_regex_2.rc.json delete mode 100644 src/sonic-eventd/rsyslog_plugin_tests/test_regex_3.rc.json delete mode 100644 src/sonic-eventd/rsyslog_plugin_tests/test_regex_4.rc.json delete mode 100644 src/sonic-eventd/rsyslog_plugin_tests/test_regex_5.rc.json delete mode 100644 src/sonic-eventd/rsyslog_plugin_tests/test_syslogs.txt delete mode 100644 src/sonic-eventd/rsyslog_plugin_tests/test_syslogs_2.txt delete mode 100644 src/sonic-eventd/src/eventd.cpp delete mode 100644 src/sonic-eventd/src/eventd.h delete mode 100644 src/sonic-eventd/src/main.cpp delete mode 100644 src/sonic-eventd/src/subdir.mk delete mode 100644 src/sonic-eventd/tests/eventd_ut.cpp delete mode 100644 src/sonic-eventd/tests/main.cpp delete mode 100644 src/sonic-eventd/tests/redis_multi_db_ut_config/database_config.json delete mode 100644 src/sonic-eventd/tests/redis_multi_db_ut_config/database_config0.json delete mode 100644 src/sonic-eventd/tests/redis_multi_db_ut_config/database_config1.json delete mode 100644 src/sonic-eventd/tests/redis_multi_db_ut_config/database_global.json delete mode 100644 src/sonic-eventd/tests/subdir.mk delete mode 100644 src/sonic-eventd/tools/events_publish_tool.py delete mode 100644 src/sonic-eventd/tools/events_tool.cpp delete mode 100644 src/sonic-eventd/tools/events_volume_test.py delete mode 100644 src/sonic-eventd/tools/sample_ip.json delete mode 100644 src/sonic-eventd/tools/subdir.mk diff --git a/dockers/docker-eventd/Dockerfile.j2 b/dockers/docker-eventd/Dockerfile.j2 deleted file mode 100644 index 8d935dc9f365..000000000000 --- a/dockers/docker-eventd/Dockerfile.j2 +++ /dev/null @@ -1,36 +0,0 @@ -{% from "dockers/dockerfile-macros.j2" import install_debian_packages, install_python_wheels, copy_files %} -FROM docker-config-engine-bullseye-{{DOCKER_USERNAME}}:{{DOCKER_USERTAG}} - -ARG docker_container_name -ARG image_version -RUN [ -f /etc/rsyslog.conf ] && sed -ri "s/%syslogtag%/$docker_container_name#%syslogtag%/;" /etc/rsyslog.conf - -# Make apt-get non-interactive -ENV DEBIAN_FRONTEND=noninteractive - -# Pass the image_version to container -ENV IMAGE_VERSION=$image_version - -# Update apt's cache of available packages -RUN apt-get update - -{% if docker_eventd_debs.strip() -%} -# Copy built Debian packages -{{ copy_files("debs/", docker_eventd_debs.split(' '), "/debs/") }} - -# Install built Debian packages and implicitly install their dependencies -{{ install_debian_packages(docker_eventd_debs.split(' ')) }} -{%- endif %} - -# Clean up -RUN apt-get clean -y && \ - apt-get autoclean -y && \ - apt-get autoremove -y && \ - rm -rf /debs - -COPY ["start.sh", "/usr/bin/"] -COPY ["supervisord.conf", "/etc/supervisor/conf.d/"] -COPY ["files/supervisor-proc-exit-listener", "/usr/bin"] -COPY ["critical_processes", "/etc/supervisor"] - -ENTRYPOINT ["/usr/local/bin/supervisord"] diff --git a/dockers/docker-eventd/critical_processes b/dockers/docker-eventd/critical_processes deleted file mode 100644 index 8ff28edbc148..000000000000 --- a/dockers/docker-eventd/critical_processes +++ /dev/null @@ -1 +0,0 @@ -program:eventd diff --git a/dockers/docker-eventd/start.sh b/dockers/docker-eventd/start.sh deleted file mode 100755 index 60cd6a00aecb..000000000000 --- a/dockers/docker-eventd/start.sh +++ /dev/null @@ -1,6 +0,0 @@ -#!/usr/bin/env bash - -if [ "${RUNTIME_OWNER}" == "" ]; then - RUNTIME_OWNER="kube" -fi - diff --git a/dockers/docker-eventd/supervisord.conf b/dockers/docker-eventd/supervisord.conf deleted file mode 100644 index 5d9a50bca2ae..000000000000 --- a/dockers/docker-eventd/supervisord.conf +++ /dev/null @@ -1,52 +0,0 @@ -[supervisord] -logfile_maxbytes=1MB -logfile_backups=2 -nodaemon=true - -[eventlistener:dependent-startup] -command=python3 -m supervisord_dependent_startup -autostart=true -autorestart=unexpected -startretries=0 -exitcodes=0,3 -events=PROCESS_STATE -buffer_size=1024 - -[eventlistener:supervisor-proc-exit-listener] -command=/usr/bin/supervisor-proc-exit-listener --container-name eventd -events=PROCESS_STATE_EXITED,PROCESS_STATE_RUNNING -autostart=true -autorestart=unexpected -buffer_size=1024 - -[program:rsyslogd] -command=/usr/sbin/rsyslogd -n -iNONE -priority=1 -autostart=false -autorestart=unexpected -stdout_logfile=syslog -stderr_logfile=syslog -dependent_startup=true - -[program:start] -command=/usr/bin/start.sh -priority=2 -autostart=false -autorestart=false -startsecs=0 -stdout_logfile=syslog -stderr_logfile=syslog -dependent_startup=true -dependent_startup_wait_for=rsyslogd:running - - -[program:eventd] -command=/usr/sbin/eventd -priority=3 -autostart=false -autorestart=false -stdout_logfile=syslog -stderr_logfile=syslog -dependent_startup=true -dependent_startup_wait_for=start:exited - diff --git a/dockers/docker-fpm-frr/Dockerfile.j2 b/dockers/docker-fpm-frr/Dockerfile.j2 index fd7ad0f08ed4..ad665e71ceae 100644 --- a/dockers/docker-fpm-frr/Dockerfile.j2 +++ b/dockers/docker-fpm-frr/Dockerfile.j2 @@ -55,15 +55,9 @@ COPY ["TSC", "/usr/bin/TSC"] COPY ["TS", "/usr/bin/TS"] COPY ["files/supervisor-proc-exit-listener", "/usr/bin"] COPY ["zsocket.sh", "/usr/bin/"] -COPY ["*.json", "/etc/rsyslog.d/"] -COPY ["files/rsyslog_plugin.conf.j2", "/etc/rsyslog.d/"] RUN chmod a+x /usr/bin/TSA && \ chmod a+x /usr/bin/TSB && \ chmod a+x /usr/bin/TSC && \ chmod a+x /usr/bin/zsocket.sh -RUN j2 -f json /etc/rsyslog.d/rsyslog_plugin.conf.j2 /etc/rsyslog.d/events_info.json > /etc/rsyslog.d/bgp_events.conf -RUN rm -f /etc/rsyslog.d/rsyslog_plugin.conf.j2* -RUN rm -f /etc/rsyslog.d/events_info.json* - ENTRYPOINT ["/usr/bin/docker_init.sh"] diff --git a/dockers/docker-fpm-frr/bgp_regex.json b/dockers/docker-fpm-frr/bgp_regex.json deleted file mode 100644 index 898b5b060ebe..000000000000 --- a/dockers/docker-fpm-frr/bgp_regex.json +++ /dev/null @@ -1,8 +0,0 @@ -[ - { - "tag": "bgp-state", - "regex": "Peer .default\\|([0-9a-f:.]*[0-9a-f]*). admin state is set to .(up|down).", - "params": [ "ip", "status" ] - } -] - diff --git a/dockers/docker-fpm-frr/events_info.json b/dockers/docker-fpm-frr/events_info.json deleted file mode 100644 index 66fa9a727ae2..000000000000 --- a/dockers/docker-fpm-frr/events_info.json +++ /dev/null @@ -1,10 +0,0 @@ -{ - "yang_module": "sonic-events-bgp", - "proclist": [ - { - "name": "bgp", - "parse_json": "bgp_regex.json" - } - ] -} - diff --git a/files/build_templates/docker_image_ctl.j2 b/files/build_templates/docker_image_ctl.j2 index a77706cad497..99051ee62d8c 100644 --- a/files/build_templates/docker_image_ctl.j2 +++ b/files/build_templates/docker_image_ctl.j2 @@ -515,7 +515,6 @@ start() { {%- endif -%} {%- if docker_container_name == "bgp" %} -v /etc/sonic/frr/$DEV:/etc/frr:rw \ - -v /usr/share/sonic/scripts:/usr/share/sonic/scripts:ro \ {%- endif %} {%- if docker_container_name == "database" %} $DB_OPT \ diff --git a/files/build_templates/eventd.service.j2 b/files/build_templates/eventd.service.j2 deleted file mode 100644 index 0ad7f52ee83d..000000000000 --- a/files/build_templates/eventd.service.j2 +++ /dev/null @@ -1,17 +0,0 @@ -[Unit] -Description=EVENTD container -Requires=updategraph.service -After=updategraph.service -BindsTo=sonic.target -After=sonic.target -StartLimitIntervalSec=1200 -StartLimitBurst=3 - -[Service] -ExecStartPre=/usr/bin/{{docker_container_name}}.sh start -ExecStart=/usr/bin/{{docker_container_name}}.sh wait -ExecStop=/usr/bin/{{docker_container_name}}.sh stop -RestartSec=30 - -[Install] -WantedBy=sonic.target diff --git a/files/build_templates/init_cfg.json.j2 b/files/build_templates/init_cfg.json.j2 index 8e92807f4e2c..7de0ad977807 100644 --- a/files/build_templates/init_cfg.json.j2 +++ b/files/build_templates/init_cfg.json.j2 @@ -39,7 +39,6 @@ ("pmon", "enabled", false, "enabled"), ("radv", "enabled", false, "enabled"), ("snmp", "enabled", true, "enabled"), - ("eventd", "enabled", true, "enabled"), ("swss", "enabled", false, "enabled"), ("syncd", "enabled", false, "enabled"), ("teamd", "enabled", false, "enabled")] %} @@ -70,7 +69,7 @@ "check_up_status" : "false", {%- endif %} {%- if include_kubernetes == "y" %} -{%- if feature in ["lldp", "pmon", "radv", "eventd", "snmp", "telemetry"] %} +{%- if feature in ["lldp", "pmon", "radv", "snmp", "telemetry"] %} "set_owner": "kube", {% else %} "set_owner": "local", {% endif %} {% endif %} "high_mem_alert": "disabled" diff --git a/files/build_templates/rsyslog_plugin.conf.j2 b/files/build_templates/rsyslog_plugin.conf.j2 deleted file mode 100644 index ec19c62a78f6..000000000000 --- a/files/build_templates/rsyslog_plugin.conf.j2 +++ /dev/null @@ -1,19 +0,0 @@ -## rsyslog-plugin for streaming telemetry via gnmi - - - -template(name="prog_msg" type="list") { - property(name="msg") - constant(value="\n") -} - -$ModLoad omprog - -{% for proc in proclist %} -if re_match($programname, "{{ proc.name }}") then { - action(type="omprog" - binary="/usr/share/sonic/scripts/rsyslog_plugin -r /etc/rsyslog.d/{{ proc.parse_json }} -m {{ yang_module }}" - output="/var/log/rsyslog_plugin.log" - template="prog_msg") -} -{% endfor %} diff --git a/files/build_templates/sonic_debian_extension.j2 b/files/build_templates/sonic_debian_extension.j2 index 56b8290cc12e..4b7a77b3151c 100644 --- a/files/build_templates/sonic_debian_extension.j2 +++ b/files/build_templates/sonic_debian_extension.j2 @@ -799,10 +799,6 @@ sudo bash -c "echo { > $FILESYSTEM_ROOT_USR_SHARE_SONIC_TEMPLATES/ctr_image_name {% endfor %} sudo bash -c "echo } >> $FILESYSTEM_ROOT_USR_SHARE_SONIC_TEMPLATES/ctr_image_names.json" -# copy rsyslog plugin binary for use by all dockers that use plugin to publish events. -sudo mkdir -p ${FILESYSTEM_ROOT_USR_SHARE_SONIC_SCRIPTS} -sudo cp ${files_path}/rsyslog_plugin ${FILESYSTEM_ROOT_USR_SHARE_SONIC_SCRIPTS}/ - {% for script in installer_start_scripts.split(' ') -%} if [ -f $TARGET_MACHINE"_{{script}}" ]; then sudo cp $TARGET_MACHINE"_{{script}}" $FILESYSTEM_ROOT/usr/bin/{{script}} diff --git a/rules/docker-config-engine-bullseye.mk b/rules/docker-config-engine-bullseye.mk index ea0ae43b54b9..c125aa65b209 100644 --- a/rules/docker-config-engine-bullseye.mk +++ b/rules/docker-config-engine-bullseye.mk @@ -8,15 +8,13 @@ $(DOCKER_CONFIG_ENGINE_BULLSEYE)_DEPENDS += $(LIBSWSSCOMMON) \ $(LIBYANG_CPP) \ $(LIBYANG_PY3) \ $(PYTHON3_SWSSCOMMON) \ - $(SONIC_DB_CLI) \ - $(SONIC_EVENTD) + $(SONIC_DB_CLI) $(DOCKER_CONFIG_ENGINE_BULLSEYE)_PYTHON_WHEELS += $(SONIC_PY_COMMON_PY3) \ $(SONIC_YANG_MGMT_PY3) \ $(SONIC_YANG_MODELS_PY3) $(DOCKER_CONFIG_ENGINE_BULLSEYE)_PYTHON_WHEELS += $(SONIC_CONFIG_ENGINE_PY3) $(DOCKER_CONFIG_ENGINE_BULLSEYE)_LOAD_DOCKERS += $(DOCKER_BASE_BULLSEYE) $(DOCKER_CONFIG_ENGINE_BULLSEYE)_FILES += $(SWSS_VARS_TEMPLATE) -$(DOCKER_CONFIG_ENGINE_BULLSEYE)_FILES += $(RSYSLOG_PLUGIN_CONF_J2) $(DOCKER_CONFIG_ENGINE_BULLSEYE)_FILES += $($(SONIC_CTRMGRD)_CONTAINER_SCRIPT) $(DOCKER_CONFIG_ENGINE_BULLSEYE)_DBG_DEPENDS = $($(DOCKER_BASE_BULLSEYE)_DBG_DEPENDS) \ diff --git a/rules/docker-config-engine-buster.mk b/rules/docker-config-engine-buster.mk index 38a94bae4c1d..ae5589a59595 100644 --- a/rules/docker-config-engine-buster.mk +++ b/rules/docker-config-engine-buster.mk @@ -15,7 +15,6 @@ $(DOCKER_CONFIG_ENGINE_BUSTER)_PYTHON_WHEELS += $(SONIC_PY_COMMON_PY3) \ $(DOCKER_CONFIG_ENGINE_BUSTER)_PYTHON_WHEELS += $(SONIC_CONFIG_ENGINE_PY3) $(DOCKER_CONFIG_ENGINE_BUSTER)_LOAD_DOCKERS += $(DOCKER_BASE_BUSTER) $(DOCKER_CONFIG_ENGINE_BUSTER)_FILES += $(SWSS_VARS_TEMPLATE) -$(DOCKER_CONFIG_ENGINE_BUSTER)_FILES += $(RSYSLOG_PLUGIN_CONF_J2) $(DOCKER_CONFIG_ENGINE_BUSTER)_FILES += $($(SONIC_CTRMGRD)_CONTAINER_SCRIPT) $(DOCKER_CONFIG_ENGINE_BUSTER)_DBG_DEPENDS = $($(DOCKER_BASE_BUSTER)_DBG_DEPENDS) \ diff --git a/rules/docker-eventd.dep b/rules/docker-eventd.dep deleted file mode 100644 index 382513e5eb82..000000000000 --- a/rules/docker-eventd.dep +++ /dev/null @@ -1,11 +0,0 @@ - -DPATH := $($(DOCKER_EVENTD)_PATH) -DEP_FILES := $(SONIC_COMMON_FILES_LIST) rules/docker-eventd.mk rules/docker-eventd.dep -DEP_FILES += $(SONIC_COMMON_BASE_FILES_LIST) -DEP_FILES += $(shell git ls-files $(DPATH)) - -$(DOCKER_EVENTD)_CACHE_MODE := GIT_CONTENT_SHA -$(DOCKER_EVENTD)_DEP_FLAGS := $(SONIC_COMMON_FLAGS_LIST) -$(DOCKER_EVENTD)_DEP_FILES := $(DEP_FILES) - -$(eval $(call add_dbg_docker,$(DOCKER_EVENTD),$(DOCKER_EVENTD_DBG))) diff --git a/rules/docker-eventd.mk b/rules/docker-eventd.mk deleted file mode 100644 index c69fee09e569..000000000000 --- a/rules/docker-eventd.mk +++ /dev/null @@ -1,47 +0,0 @@ -# docker image for eventd - -DOCKER_EVENTD_STEM = docker-eventd -DOCKER_EVENTD = $(DOCKER_EVENTD_STEM).gz -DOCKER_EVENTD_DBG = $(DOCKER_EVENTD_STEM)-$(DBG_IMAGE_MARK).gz - -$(DOCKER_EVENTD)_DEPENDS += $(SONIC_EVENTD) - -$(DOCKER_EVENTD)_DBG_DEPENDS = $($(DOCKER_CONFIG_ENGINE_BULLSEYE)_DBG_DEPENDS) -$(DOCKER_EVENTD)_DBG_DEPENDS += $(SONIC_EVENTD_DBG) $(LIBSWSSCOMMON_DBG) - -$(DOCKER_EVENTD)_DBG_IMAGE_PACKAGES = $($(DOCKER_CONFIG_ENGINE_BULLSEYE)_DBG_IMAGE_PACKAGES) - -$(DOCKER_EVENTD)_LOAD_DOCKERS = $(DOCKER_CONFIG_ENGINE_BULLSEYE) - -$(DOCKER_EVENTD)_PATH = $(DOCKERS_PATH)/$(DOCKER_EVENTD_STEM) - -$(DOCKER_EVENTD)_INSTALL_PYTHON_WHEELS = $(SONIC_UTILITIES_PY3) -$(DOCKER_EVENTD)_INSTALL_DEBS = $(PYTHON3_SWSSCOMMON) - -$(DOCKER_EVENTD)_VERSION = 1.0.0 -$(DOCKER_EVENTD)_PACKAGE_NAME = eventd - -$(DOCKER_DHCP)_SERVICE_REQUIRES = updategraph -$(DOCKER_DHCP)_SERVICE_AFTER = database - -SONIC_DOCKER_IMAGES += $(DOCKER_EVENTD) -SONIC_INSTALL_DOCKER_IMAGES += $(DOCKER_EVENTD) - -SONIC_DOCKER_DBG_IMAGES += $(DOCKER_EVENTD_DBG) -SONIC_INSTALL_DOCKER_DBG_IMAGES += $(DOCKER_EVENTD_DBG) - -$(DOCKER_EVENTD)_CONTAINER_NAME = eventd -$(DOCKER_EVENTD)_RUN_OPT += --privileged -t -$(DOCKER_EVENTD)_RUN_OPT += -v /etc/sonic:/etc/sonic:ro - -SONIC_BULLSEYE_DOCKERS += $(DOCKER_EVENTD) -SONIC_BULLSEYE_DBG_DOCKERS += $(DOCKER_EVENTD_DBG) - -$(DOCKER_EVENTD)_FILESPATH = $($(SONIC_EVENTD)_SRC_PATH)/rsyslog_plugin - -$(DOCKER_EVENTD)_PLUGIN = rsyslog_plugin -$($(DOCKER_EVENTD)_PLUGIN)_PATH = $($(DOCKER_EVENTD)_FILESPATH) - -SONIC_COPY_FILES += $($(DOCKER_EVENTD)_PLUGIN) -$(DOCKER_EVENTD)_SHARED_FILES = $($(DOCKER_EVENTD)_PLUGIN) - diff --git a/rules/eventd.dep b/rules/eventd.dep deleted file mode 100644 index 12f32a30f2c7..000000000000 --- a/rules/eventd.dep +++ /dev/null @@ -1,10 +0,0 @@ - -SPATH := $($(SONIC_EVENTD)_SRC_PATH) -DEP_FILES := $(SONIC_COMMON_FILES_LIST) rules/eventd.mk rules/eventd.dep -DEP_FILES += $(SONIC_COMMON_BASE_FILES_LIST) -DEP_FILES := $(addprefix $(SPATH)/,$(shell cd $(SPATH) && git ls-files)) - -$(SONIC_EVENTD)_CACHE_MODE := GIT_CONTENT_SHA -$(SONIC_EVENTD)_DEP_FLAGS := $(SONIC_COMMON_FLAGS_LIST) -$(SONIC_EVENTD)_DEP_FILES := $(DEP_FILES) - diff --git a/rules/eventd.mk b/rules/eventd.mk deleted file mode 100644 index 9eea21a4cfb5..000000000000 --- a/rules/eventd.mk +++ /dev/null @@ -1,19 +0,0 @@ -# eventd package - -SONIC_EVENTD_VERSION = 1.0.0-0 -SONIC_EVENTD_PKG_NAME = eventd - -SONIC_EVENTD = sonic-$(SONIC_EVENTD_PKG_NAME)_$(SONIC_EVENTD_VERSION)_$(CONFIGURED_ARCH).deb -$(SONIC_EVENTD)_SRC_PATH = $(SRC_PATH)/sonic-eventd -$(SONIC_EVENTD)_DEPENDS += $(LIBSWSSCOMMON) $(LIBSWSSCOMMON_DEV) - -SONIC_DPKG_DEBS += $(SONIC_EVENTD) - -SONIC_EVENTD_DBG = sonic-$(SONIC_EVENTD_PKG_NAME)-dbgsym_$(SONIC_EVENTD_VERSION)_$(CONFIGURED_ARCH).deb -$(eval $(call add_derived_package,$(SONIC_EVENTD),$(SONIC_EVENTD_DBG))) - -# The .c, .cpp, .h & .hpp files under src/{$DBG_SRC_ARCHIVE list} -# are archived into debug one image to facilitate debugging. -# -DBG_SRC_ARCHIVE += sonic-eventd - diff --git a/rules/scripts.mk b/rules/scripts.mk index 12919d520b09..ce6a8eb90025 100644 --- a/rules/scripts.mk +++ b/rules/scripts.mk @@ -32,9 +32,6 @@ $(SWSS_VARS_TEMPLATE)_PATH = files/build_templates COPP_CONFIG_TEMPLATE = copp_cfg.j2 $(COPP_CONFIG_TEMPLATE)_PATH = files/image_config/copp -RSYSLOG_PLUGIN_CONF_J2 = rsyslog_plugin.conf.j2 -$(RSYSLOG_PLUGIN_CONF_J2)_PATH = files/build_templates - SONIC_COPY_FILES += $(CONFIGDB_LOAD_SCRIPT) \ $(ARP_UPDATE_SCRIPT) \ $(ARP_UPDATE_VARS_TEMPLATE) \ @@ -45,5 +42,4 @@ SONIC_COPY_FILES += $(CONFIGDB_LOAD_SCRIPT) \ $(SYSCTL_NET_CONFIG) \ $(UPDATE_CHASSISDB_CONFIG_SCRIPT) \ $(SWSS_VARS_TEMPLATE) \ - $(RSYSLOG_PLUGIN_CONF_J2) \ $(COPP_CONFIG_TEMPLATE) diff --git a/rules/telemetry.mk b/rules/telemetry.mk index 942e9797726a..24fe4ae2fe52 100644 --- a/rules/telemetry.mk +++ b/rules/telemetry.mk @@ -2,7 +2,6 @@ SONIC_TELEMETRY = sonic-gnmi_0.1_$(CONFIGURED_ARCH).deb $(SONIC_TELEMETRY)_SRC_PATH = $(SRC_PATH)/sonic-gnmi -$(SONIC_TELEMETRY)_DEPENDS = $(SONIC_MGMT_COMMON) $(SONIC_MGMT_COMMON_CODEGEN) \ - $(LIBSWSSCOMMON_DEV) $(LIBSWSSCOMMON) -$(SONIC_TELEMETRY)_RDEPENDS = $(LIBSWSSCOMMON) $(LIBSWSSCOMMON_DEV) +$(SONIC_TELEMETRY)_DEPENDS = $(SONIC_MGMT_COMMON) $(SONIC_MGMT_COMMON_CODEGEN) +$(SONIC_TELEMETRY)_RDEPENDS = SONIC_DPKG_DEBS += $(SONIC_TELEMETRY) diff --git a/slave.mk b/slave.mk index f720061b2e52..7cdee954ad73 100644 --- a/slave.mk +++ b/slave.mk @@ -1292,8 +1292,6 @@ $(addprefix $(TARGET_PATH)/, $(SONIC_INSTALLERS)) : $(TARGET_PATH)/% : \ $(if $($(docker:-dbg.gz=.gz)_MACHINE),\ mv $($(docker:-dbg.gz=.gz)_CONTAINER_NAME).sh $($(docker:-dbg.gz=.gz)_MACHINE)_$($(docker:-dbg.gz=.gz)_CONTAINER_NAME).sh ) - $(foreach file, $($(docker)_SHARED_FILES), \ - { cp $($(file)_PATH)/$(file) $(FILES_PATH)/ $(LOG) || exit 1 ; } ; ) ) # Exported variables are used by sonic_debian_extension.sh diff --git a/src/sonic-eventd/Makefile b/src/sonic-eventd/Makefile deleted file mode 100644 index 00d3199a65bc..000000000000 --- a/src/sonic-eventd/Makefile +++ /dev/null @@ -1,84 +0,0 @@ -RM := rm -rf -EVENTD_TARGET := eventd -EVENTD_TEST := tests/tests -EVENTD_TOOL := tools/events_tool -EVENTD_PUBLISH_TOOL := tools/events_publish_tool.py -RSYSLOG-PLUGIN_TARGET := rsyslog_plugin/rsyslog_plugin -RSYSLOG-PLUGIN_TEST := rsyslog_plugin_tests/tests -CP := cp -MKDIR := mkdir -CC := g++ -LIBS := -levent -lhiredis -lswsscommon -lpthread -lboost_thread -lboost_system -lzmq -lboost_serialization -luuid -llua5.1 -TEST_LIBS := -L/usr/src/gtest -lgtest -lgtest_main -lgmock -lgmock_main - -CFLAGS += -Wall -std=c++17 -fPIE -I$(PWD)/../sonic-swss-common/common -PWD := $(shell pwd) - -ifneq ($(MAKECMDGOALS),clean) -ifneq ($(strip $(C_DEPS)),) --include $(C_DEPS) $(OBJS) -endif -endif - --include src/subdir.mk --include tests/subdir.mk --include tools/subdir.mk --include rsyslog_plugin/subdir.mk --include rsyslog_plugin_tests/subdir.mk - -all: sonic-eventd eventd-tests eventd-tool rsyslog-plugin rsyslog-plugin-tests - -sonic-eventd: $(OBJS) - @echo 'Building target: $@' - @echo 'Invoking: G++ Linker' - $(CC) $(LDFLAGS) -o $(EVENTD_TARGET) $(OBJS) $(LIBS) - @echo 'Finished building target: $@' - @echo ' ' - -eventd-tool: $(TOOL_OBJS) - @echo 'Building target: $@' - @echo 'Invoking: G++ Linker' - $(CC) $(LDFLAGS) -o $(EVENTD_TOOL) $(TOOL_OBJS) $(LIBS) - @echo 'Finished building target: $@' - @echo ' ' - -rsyslog-plugin: $(RSYSLOG-PLUGIN_OBJS) - @echo 'Buidling Target: $@' - @echo 'Invoking: G++ Linker' - $(CC) $(LDFLAGS) -o $(RSYSLOG-PLUGIN_TARGET) $(RSYSLOG-PLUGIN_OBJS) $(LIBS) - @echo 'Finished building target: $@' - @echo ' ' - -eventd-tests: $(TEST_OBJS) - @echo 'Building target: $@' - @echo 'Invoking: G++ Linker' - $(CC) $(LDFLAGS) -o $(EVENTD_TEST) $(TEST_OBJS) $(LIBS) $(TEST_LIBS) - @echo 'Finished building target: $@' - $(EVENTD_TEST) - @echo 'Finished running tests' - @echo ' ' - -rsyslog-plugin-tests: $(RSYSLOG-PLUGIN-TEST_OBJS) - @echo 'BUILDING target: $@' - @echo 'Invoking G++ Linker' - $(CC) $(LDFLAGS) -o $(RSYSLOG-PLUGIN_TEST) $(RSYSLOG-PLUGIN-TEST_OBJS) $(LIBS) $(TEST_LIBS) - @echo 'Finished building target: $@' - $(RSYSLOG-PLUGIN_TEST) - @echo 'Finished running tests' - @echo ' ' - -install: - $(MKDIR) -p $(DESTDIR)/usr/sbin - $(CP) $(EVENTD_TARGET) $(DESTDIR)/usr/sbin - $(CP) $(EVENTD_TOOL) $(DESTDIR)/usr/sbin - $(CP) $(EVENTD_PUBLISH_TOOL) $(DESTDIR)/usr/sbin - -deinstall: - $(RM) $(DESTDIR)/usr/sbin/$(EVENTD_TARGET) - $(RM) $(DESTDIR)/usr/sbin/$(RSYSLOG-PLUGIN_TARGET) - $(RM) -rf $(DESTDIR)/usr/sbin - -clean: - -@echo ' ' - -.PHONY: all clean dependents diff --git a/src/sonic-eventd/debian/changelog b/src/sonic-eventd/debian/changelog deleted file mode 100644 index eba3bf10ea53..000000000000 --- a/src/sonic-eventd/debian/changelog +++ /dev/null @@ -1,5 +0,0 @@ -sonic-eventd (1.0.0-0) UNRELEASED; urgency=medium - - * Initial release. - --- Renuka Manavalan diff --git a/src/sonic-eventd/debian/compat b/src/sonic-eventd/debian/compat deleted file mode 100644 index 48082f72f087..000000000000 --- a/src/sonic-eventd/debian/compat +++ /dev/null @@ -1 +0,0 @@ -12 diff --git a/src/sonic-eventd/debian/control b/src/sonic-eventd/debian/control deleted file mode 100644 index 95ae6fd76452..000000000000 --- a/src/sonic-eventd/debian/control +++ /dev/null @@ -1,14 +0,0 @@ -Source: sonic-eventd -Section: devel -Priority: optional -Maintainer: Renuka Manavalan -Build-Depends: debhelper (>= 12.0.0), libevent-dev, libboost-thread-dev, libboost-system-dev, libswsscommon-dev -Standards-Version: 3.9.3 -Homepage: https://github.com/Azure/sonic-buildimage -XS-Go-Import-Path: github.com/Azure/sonic-buildimage - -Package: sonic-eventd -Architecture: any -Built-Using: ${misc:Built-Using} -Depends: ${shlibs:Depends} -Description: SONiC event service diff --git a/src/sonic-eventd/debian/rules b/src/sonic-eventd/debian/rules deleted file mode 100755 index ac2cd63889ef..000000000000 --- a/src/sonic-eventd/debian/rules +++ /dev/null @@ -1,6 +0,0 @@ -#!/usr/bin/make -f - -export DEB_BUILD_MAINT_OPTIONS=hardening=+all - -%: - dh $@ --parallel diff --git a/src/sonic-eventd/rsyslog_plugin/main.cpp b/src/sonic-eventd/rsyslog_plugin/main.cpp deleted file mode 100644 index 53162608c5a9..000000000000 --- a/src/sonic-eventd/rsyslog_plugin/main.cpp +++ /dev/null @@ -1,57 +0,0 @@ -#include -#include -#include -#include "rsyslog_plugin.h" - -#define SUCCESS_CODE 0 -#define INVALID_REGEX_ERROR_CODE 1 -#define EVENT_INIT_PUBLISH_ERROR_CODE 2 -#define MISSING_ARGS_ERROR_CODE 3 - -void showUsage() { - cout << "Usage for rsyslog_plugin: \n" << "options\n" - << "\t-r,required,type=string\t\tPath to regex file\n" - << "\t-m,required,type=string\t\tYANG module name of source generating syslog message\n" - << "\t-h \t\tHelp" - << endl; -} - -int main(int argc, char** argv) { - string regexPath; - string moduleName; - int optionVal; - - while((optionVal = getopt(argc, argv, "r:m:h")) != -1) { - switch(optionVal) { - case 'r': - regexPath = optarg; - break; - case 'm': - moduleName = optarg; - break; - case 'h': - case '?': - default: - showUsage(); - return 1; - } - } - - if(regexPath.empty() || moduleName.empty()) { // Missing required rc path - cerr << "Error: Missing regexPath and moduleName." << endl; - return MISSING_ARGS_ERROR_CODE; - } - - unique_ptr plugin(new RsyslogPlugin(moduleName, regexPath)); - int returnCode = plugin->onInit(); - if(returnCode == INVALID_REGEX_ERROR_CODE) { - SWSS_LOG_ERROR("Rsyslog plugin was not able to be initialized due to invalid regex file provided.\n"); - return returnCode; - } else if(returnCode == EVENT_INIT_PUBLISH_ERROR_CODE) { - SWSS_LOG_ERROR("Rsyslog plugin was not able to be initialized due to event_init_publish call failing.\n"); - return returnCode; - } - - plugin->run(); - return SUCCESS_CODE; -} diff --git a/src/sonic-eventd/rsyslog_plugin/rsyslog_plugin.cpp b/src/sonic-eventd/rsyslog_plugin/rsyslog_plugin.cpp deleted file mode 100644 index 3786c5f0fea9..000000000000 --- a/src/sonic-eventd/rsyslog_plugin/rsyslog_plugin.cpp +++ /dev/null @@ -1,135 +0,0 @@ -#include -#include -#include -#include -#include -#include -#include "rsyslog_plugin.h" -#include "json.hpp" - -using json = nlohmann::json; - -bool RsyslogPlugin::onMessage(string msg, lua_State* luaState) { - string tag; - event_params_t paramDict; - if(!m_parser->parseMessage(msg, tag, paramDict, luaState)) { - SWSS_LOG_DEBUG("%s was not able to be parsed into a structured event\n", msg.c_str()); - return false; - } else { - int returnCode = event_publish(m_eventHandle, tag, ¶mDict); - if(returnCode != 0) { - SWSS_LOG_ERROR("rsyslog_plugin was not able to publish event for %s.\n", tag.c_str()); - return false; - } - return true; - } -} - -void parseParams(vector params, vector& eventParams) { - for(long unsigned int i = 0; i < params.size(); i++) { - if(params[i].empty()) { - SWSS_LOG_ERROR("Empty param provided in regex file\n"); - continue; - } - EventParam ep = EventParam(); - auto delimPos = params[i].find(':'); - if(delimPos == string::npos) { // no lua code - ep.paramName = params[i]; - } else { - ep.paramName = params[i].substr(0, delimPos); - ep.luaCode = params[i].substr(delimPos + 1); - if(ep.luaCode.empty()) { - SWSS_LOG_ERROR("Lua code missing after :\n"); - } - } - eventParams.push_back(ep); - } -} - -bool RsyslogPlugin::createRegexList() { - fstream regexFile; - json jsonList = json::array(); - regexFile.open(m_regexPath, ios::in); - if (!regexFile) { - SWSS_LOG_ERROR("No such path exists: %s for source %s\n", m_regexPath.c_str(), m_moduleName.c_str()); - return false; - } - try { - regexFile >> jsonList; - } catch (invalid_argument& iaException) { - SWSS_LOG_ERROR("Invalid JSON file: %s, throws exception: %s\n", m_regexPath.c_str(), iaException.what()); - return false; - } - - string regexString; - string timestampRegex = "^([a-zA-Z]{3})?\\s*([0-9]{1,2})?\\s*([0-9]{2}:[0-9]{2}:[0-9]{2}.[0-9]{0,6})?\\s*"; - regex expression; - vector regexList; - - for(long unsigned int i = 0; i < jsonList.size(); i++) { - RegexStruct rs = RegexStruct(); - vector eventParams; - try { - string eventRegex = jsonList[i]["regex"]; - regexString = timestampRegex + eventRegex; - string tag = jsonList[i]["tag"]; - vector params = jsonList[i]["params"]; - vector timestampParams = { "month", "day", "time" }; - params.insert(params.begin(), timestampParams.begin(), timestampParams.end()); - regex expr(regexString); - expression = expr; - parseParams(params, eventParams); - rs.params = eventParams; - rs.tag = tag; - rs.regexExpression = expression; - regexList.push_back(rs); - } catch (domain_error& deException) { - SWSS_LOG_ERROR("Missing required key, throws exception: %s\n", deException.what()); - return false; - } catch (regex_error& reException) { - SWSS_LOG_ERROR("Invalid regex, throws exception: %s\n", reException.what()); - return false; - } - } - - if(regexList.empty()) { - SWSS_LOG_ERROR("Empty list of regex expressions.\n"); - return false; - } - - m_parser->m_regexList = regexList; - - regexFile.close(); - return true; -} - -void RsyslogPlugin::run() { - lua_State* luaState = luaL_newstate(); - luaL_openlibs(luaState); - while(true) { - string line; - getline(cin, line); - if(line.empty()) { - continue; - } - onMessage(line, luaState); - } - lua_close(luaState); -} - -int RsyslogPlugin::onInit() { - m_eventHandle = events_init_publisher(m_moduleName); - bool success = createRegexList(); - if(!success) { - return 1; // invalid regex error code - } else if(m_eventHandle == NULL) { - return 2; // event init publish error code - } - return 0; -} - -RsyslogPlugin::RsyslogPlugin(string moduleName, string regexPath) { - m_parser = unique_ptr(new SyslogParser()); - m_moduleName = moduleName; - m_regexPath = regexPath; -} diff --git a/src/sonic-eventd/rsyslog_plugin/rsyslog_plugin.h b/src/sonic-eventd/rsyslog_plugin/rsyslog_plugin.h deleted file mode 100644 index 0811b5f3032f..000000000000 --- a/src/sonic-eventd/rsyslog_plugin/rsyslog_plugin.h +++ /dev/null @@ -1,40 +0,0 @@ -#ifndef RSYSLOG_PLUGIN_H -#define RSYSLOG_PLUGIN_H - -extern "C" -{ - #include - #include - #include -} -#include -#include -#include "syslog_parser.h" -#include "events.h" -#include "logger.h" - -using namespace std; -using namespace swss; - -/** - * Rsyslog Plugin will utilize an instance of a syslog parser to read syslog messages from rsyslog.d and will continuously read from stdin - * A plugin instance is created for each container/host. - * - */ - -class RsyslogPlugin { -public: - int onInit(); - bool onMessage(string msg, lua_State* luaState); - void run(); - RsyslogPlugin(string moduleName, string regexPath); -private: - unique_ptr m_parser; - event_handle_t m_eventHandle; - string m_regexPath; - string m_moduleName; - bool createRegexList(); -}; - -#endif - diff --git a/src/sonic-eventd/rsyslog_plugin/subdir.mk b/src/sonic-eventd/rsyslog_plugin/subdir.mk deleted file mode 100644 index 17df55c718a0..000000000000 --- a/src/sonic-eventd/rsyslog_plugin/subdir.mk +++ /dev/null @@ -1,13 +0,0 @@ -CC := g++ - -RSYSLOG-PLUGIN-TEST_OBJS += ./rsyslog_plugin/rsyslog_plugin.o ./rsyslog_plugin/syslog_parser.o ./rsyslog_plugin/timestamp_formatter.o -RSYSLOG-PLUGIN_OBJS += ./rsyslog_plugin/rsyslog_plugin.o ./rsyslog_plugin/syslog_parser.o ./rsyslog_plugin/timestamp_formatter.o ./rsyslog_plugin/main.o - -C_DEPS += ./rsyslog_plugin/rsyslog_plugin.d ./rsyslog_plugin/syslog_parser.d ./rsyslog_plugin/timestamp_formatter.d ./rsyslog_plugin/main.d - -rsyslog_plugin/%.o: rsyslog_plugin/%.cpp - @echo 'Building file: $<' - @echo 'Invoking: GCC C++ Compiler' - $(CC) -D__FILENAME__="$(subst rsyslog_plugin/,,$<)" $(CFLAGS) -c -fmessage-length=0 -MMD -MP -MF"$(@:%.o=%.d)" -MT"$(@)" -o "$(@)" "$<" - @echo 'Finished building: $<' - @echo ' ' diff --git a/src/sonic-eventd/rsyslog_plugin/syslog_parser.cpp b/src/sonic-eventd/rsyslog_plugin/syslog_parser.cpp deleted file mode 100644 index ebf7c598d15a..000000000000 --- a/src/sonic-eventd/rsyslog_plugin/syslog_parser.cpp +++ /dev/null @@ -1,65 +0,0 @@ -#include -#include -#include "syslog_parser.h" -#include "logger.h" - -/** - * Parses syslog message and returns structured event - * - * @param nessage us syslog message being fed in by rsyslog.d - * @return return structured event json for publishing - * -*/ - -bool SyslogParser::parseMessage(string message, string& eventTag, event_params_t& paramMap, lua_State* luaState) { - for(long unsigned int i = 0; i < m_regexList.size(); i++) { - smatch matchResults; - if(!regex_search(message, matchResults, m_regexList[i].regexExpression) || m_regexList[i].params.size() != matchResults.size() - 1 || matchResults.size() < 4) { - continue; - } - string formattedTimestamp; - if(!matchResults[1].str().empty() && !matchResults[2].str().empty() && !matchResults[3].str().empty()) { // found timestamp components - formattedTimestamp = m_timestampFormatter->changeTimestampFormat({ matchResults[1].str(), matchResults[2].str(), matchResults[3].str() }); - } - if(!formattedTimestamp.empty()) { - paramMap["timestamp"] = formattedTimestamp; - } else { - SWSS_LOG_INFO("Timestamp is invalid and is not able to be formatted"); - } - - // found matching regex - eventTag = m_regexList[i].tag; - // check params for lua code - for(long unsigned int j = 3; j < m_regexList[i].params.size(); j++) { - string resultValue = matchResults[j + 1].str(); - string paramName = m_regexList[i].params[j].paramName; - const char* luaCode = m_regexList[i].params[j].luaCode.c_str(); - - if(luaCode == NULL || *luaCode == 0) { - SWSS_LOG_INFO("Invalid lua code, empty or missing"); - paramMap[paramName] = resultValue; - continue; - } - - // execute lua code - lua_pushstring(luaState, resultValue.c_str()); - lua_setglobal(luaState, "arg"); - if(luaL_dostring(luaState, luaCode) == 0) { - lua_pop(luaState, lua_gettop(luaState)); - } else { // error in lua code - SWSS_LOG_ERROR("Invalid lua code, unable to do operation.\n"); - paramMap[paramName] = resultValue; - continue; - } - lua_getglobal(luaState, "ret"); - paramMap[paramName] = lua_tostring(luaState, -1); - lua_pop(luaState, 1); - } - return true; - } - return false; -} - -SyslogParser::SyslogParser() { - m_timestampFormatter = unique_ptr(new TimestampFormatter()); -} diff --git a/src/sonic-eventd/rsyslog_plugin/syslog_parser.h b/src/sonic-eventd/rsyslog_plugin/syslog_parser.h deleted file mode 100644 index 6293eb3c4a34..000000000000 --- a/src/sonic-eventd/rsyslog_plugin/syslog_parser.h +++ /dev/null @@ -1,46 +0,0 @@ -#ifndef SYSLOG_PARSER_H -#define SYSLOG_PARSER_H - -extern "C" -{ - #include - #include - #include -} - -#include -#include -#include -#include "json.hpp" -#include "events.h" -#include "timestamp_formatter.h" - -using namespace std; -using json = nlohmann::json; - -struct EventParam { - string paramName; - string luaCode; -}; - -struct RegexStruct { - regex regexExpression; - vector params; - string tag; -}; - -/** - * Syslog Parser is responsible for parsing log messages fed by rsyslog.d and returns - * matched result to rsyslog_plugin to use with events publish API - * - */ - -class SyslogParser { -public: - unique_ptr m_timestampFormatter; - vector m_regexList; - bool parseMessage(string message, string& tag, event_params_t& paramDict, lua_State* luaState); - SyslogParser(); -}; - -#endif diff --git a/src/sonic-eventd/rsyslog_plugin/timestamp_formatter.cpp b/src/sonic-eventd/rsyslog_plugin/timestamp_formatter.cpp deleted file mode 100644 index cc179adbbc75..000000000000 --- a/src/sonic-eventd/rsyslog_plugin/timestamp_formatter.cpp +++ /dev/null @@ -1,74 +0,0 @@ -#include -#include "timestamp_formatter.h" -#include "logger.h" -#include "events.h" - -using namespace std; - -/*** - * - * Formats given string into string needed by YANG model - * - * @param timestamp parsed from syslog message - * @return formatted timestamp that conforms to YANG model - * - */ - -static const unordered_map g_monthDict { - { "Jan", "01" }, - { "Feb", "02" }, - { "Mar", "03" }, - { "Apr", "04" }, - { "May", "05" }, - { "Jun", "06" }, - { "Jul", "07" }, - { "Aug", "08" }, - { "Sep", "09" }, - { "Oct", "10" }, - { "Nov", "11" }, - { "Dec", "12" } -}; - -string TimestampFormatter::getYear(string timestamp) { - if(!m_storedTimestamp.empty()) { - if(m_storedTimestamp.compare(timestamp) <= 0) { - m_storedTimestamp = timestamp; - return m_storedYear; - } - } - // no last timestamp or year change - time_t currentTime = time(nullptr); - tm* const localTime = localtime(¤tTime); - stringstream ss; - auto currentYear = 1900 + localTime->tm_year; - ss << currentYear; // get current year - string year = ss.str(); - m_storedTimestamp = timestamp; - m_storedYear = year; - return year; -} - -string TimestampFormatter::changeTimestampFormat(vector dateComponents) { - if(dateComponents.size() < 3) { - SWSS_LOG_ERROR("Timestamp formatter unable to format due to invalid input"); - return ""; - } - string formattedTimestamp; // need to change format of Mmm dd hh:mm:ss.SSSSSS to YYYY-mm-ddThh:mm:ss.SSSSSSZ - string month; - auto it = g_monthDict.find(dateComponents[0]); - if(it != g_monthDict.end()) { - month = it->second; - } else { - SWSS_LOG_ERROR("Timestamp month was given in wrong format.\n"); - return ""; - } - string day = dateComponents[1]; - if(day.size() == 1) { // convert 1 -> 01 - day.insert(day.begin(), '0'); - } - string time = dateComponents[2]; - string currentTimestamp = month + day + time; - string year = getYear(currentTimestamp); - formattedTimestamp = year + "-" + month + "-" + day + "T" + time + "Z"; - return formattedTimestamp; -} diff --git a/src/sonic-eventd/rsyslog_plugin/timestamp_formatter.h b/src/sonic-eventd/rsyslog_plugin/timestamp_formatter.h deleted file mode 100644 index ea99c4cfcb8c..000000000000 --- a/src/sonic-eventd/rsyslog_plugin/timestamp_formatter.h +++ /dev/null @@ -1,27 +0,0 @@ -#ifndef TIMESTAMP_FORMATTER_H -#define TIMESTAMP_FORMATTER_H - -#include -#include -#include -#include -#include - -using namespace std; - -/*** - * - * TimestampFormatter is responsible for formatting the timestamps received in syslog messages and to format them into the type needed by YANG model - * - */ - -class TimestampFormatter { -public: - string changeTimestampFormat(vector dateComponents); - string m_storedTimestamp; - string m_storedYear; -private: - string getYear(string timestamp); -}; - -#endif diff --git a/src/sonic-eventd/rsyslog_plugin_tests/rsyslog_plugin_ut.cpp b/src/sonic-eventd/rsyslog_plugin_tests/rsyslog_plugin_ut.cpp deleted file mode 100644 index be5a19ad5a5b..000000000000 --- a/src/sonic-eventd/rsyslog_plugin_tests/rsyslog_plugin_ut.cpp +++ /dev/null @@ -1,274 +0,0 @@ -extern "C" -{ - #include - #include - #include -} -#include -#include -#include -#include -#include "gtest/gtest.h" -#include "json.hpp" -#include "events.h" -#include "../rsyslog_plugin/rsyslog_plugin.h" -#include "../rsyslog_plugin/syslog_parser.h" -#include "../rsyslog_plugin/timestamp_formatter.h" - -using namespace std; -using namespace swss; -using json = nlohmann::json; - -vector createEventParams(vector params, vector luaCodes) { - vector eventParams; - for(long unsigned int i = 0; i < params.size(); i++) { - EventParam ep = EventParam(); - ep.paramName = params[i]; - ep.luaCode = luaCodes[i]; - eventParams.push_back(ep); - } - return eventParams; -} - -TEST(syslog_parser, matching_regex) { - json jList = json::array(); - vector regexList; - string regexString = "^([a-zA-Z]{3})?\\s*([0-9]{1,2})?\\s*([0-9]{2}:[0-9]{2}:[0-9]{2}.[0-9]{0,6})?\\s*message (.*) other_data (.*) even_more_data (.*)"; - vector params = { "month", "day", "time", "message", "other_data", "even_more_data" }; - vector luaCodes = { "", "", "", "", "", "" }; - regex expression(regexString); - - RegexStruct rs = RegexStruct(); - rs.tag = "test_tag"; - rs.regexExpression = expression; - rs.params = createEventParams(params, luaCodes); - regexList.push_back(rs); - - string tag; - event_params_t paramDict; - - event_params_t expectedDict; - expectedDict["message"] = "test_message"; - expectedDict["other_data"] = "test_data"; - expectedDict["even_more_data"] = "test_data"; - - unique_ptr parser(new SyslogParser()); - parser->m_regexList = regexList; - lua_State* luaState = luaL_newstate(); - luaL_openlibs(luaState); - - bool success = parser->parseMessage("message test_message other_data test_data even_more_data test_data", tag, paramDict, luaState); - EXPECT_EQ(true, success); - EXPECT_EQ("test_tag", tag); - EXPECT_EQ(expectedDict, paramDict); - - lua_close(luaState); -} - -TEST(syslog_parser, matching_regex_timestamp) { - json jList = json::array(); - vector regexList; - string regexString = "^([a-zA-Z]{3})?\\s*([0-9]{1,2})?\\s*([0-9]{2}:[0-9]{2}:[0-9]{2}.[0-9]{0,6})?\\s*message (.*) other_data (.*)"; - vector params = { "month", "day", "time", "message", "other_data" }; - vector luaCodes = { "", "", "", "", "" }; - regex expression(regexString); - - RegexStruct rs = RegexStruct(); - rs.tag = "test_tag"; - rs.regexExpression = expression; - rs.params = createEventParams(params, luaCodes); - regexList.push_back(rs); - - string tag; - event_params_t paramDict; - - event_params_t expectedDict; - expectedDict["message"] = "test_message"; - expectedDict["other_data"] = "test_data"; - expectedDict["timestamp"] = "2022-07-21T02:10:00.000000Z"; - - unique_ptr parser(new SyslogParser()); - parser->m_regexList = regexList; - lua_State* luaState = luaL_newstate(); - luaL_openlibs(luaState); - - bool success = parser->parseMessage("Jul 21 02:10:00.000000 message test_message other_data test_data", tag, paramDict, luaState); - EXPECT_EQ(true, success); - EXPECT_EQ("test_tag", tag); - EXPECT_EQ(expectedDict, paramDict); - - lua_close(luaState); -} - -TEST(syslog_parser, no_matching_regex) { - json jList = json::array(); - vector regexList; - string regexString = "^([a-zA-Z]{3})?\\s*([0-9]{1,2})?\\s*([0-9]{2}:[0-9]{2}:[0-9]{2}.[0-9]{0,6})?\\s*no match"; - vector params = { "month", "day", "time" }; - vector luaCodes = { "", "", "" }; - regex expression(regexString); - - RegexStruct rs = RegexStruct(); - rs.tag = "test_tag"; - rs.regexExpression = expression; - rs.params = createEventParams(params, luaCodes); - regexList.push_back(rs); - - string tag; - event_params_t paramDict; - - unique_ptr parser(new SyslogParser()); - parser->m_regexList = regexList; - lua_State* luaState = luaL_newstate(); - luaL_openlibs(luaState); - - bool success = parser->parseMessage("Test Message", tag, paramDict, luaState); - EXPECT_EQ(false, success); - - lua_close(luaState); -} - -TEST(syslog_parser, lua_code_valid_1) { - json jList = json::array(); - vector regexList; - string regexString = "^([a-zA-Z]{3})?\\s*([0-9]{1,2})?\\s*([0-9]{2}:[0-9]{2}:[0-9]{2}.[0-9]{0,6})?\\s*.* (sent|received) (?:to|from) .* ([0-9]{2,3}.[0-9]{2,3}.[0-9]{2,3}.[0-9]{2,3}) active ([1-9]{1,3})/([1-9]{1,3}) .*"; - vector params = { "month", "day", "time", "is-sent", "ip", "major-code", "minor-code" }; - vector luaCodes = { "", "", "", "ret=tostring(arg==\"sent\")", "", "", "" }; - regex expression(regexString); - - RegexStruct rs = RegexStruct(); - rs.tag = "test_tag"; - rs.regexExpression = expression; - rs.params = createEventParams(params, luaCodes); - regexList.push_back(rs); - - string tag; - event_params_t paramDict; - - event_params_t expectedDict; - expectedDict["is-sent"] = "true"; - expectedDict["ip"] = "100.95.147.229"; - expectedDict["major-code"] = "2"; - expectedDict["minor-code"] = "2"; - - unique_ptr parser(new SyslogParser()); - parser->m_regexList = regexList; - lua_State* luaState = luaL_newstate(); - luaL_openlibs(luaState); - - bool success = parser->parseMessage("NOTIFICATION: sent to neighbor 100.95.147.229 active 2/2 (peer in wrong AS) 2 bytes", tag, paramDict, luaState); - EXPECT_EQ(true, success); - EXPECT_EQ("test_tag", tag); - EXPECT_EQ(expectedDict, paramDict); - - lua_close(luaState); -} - -TEST(syslog_parser, lua_code_valid_2) { - json jList = json::array(); - vector regexList; - string regexString = "([a-zA-Z]{3})?\\s*([0-9]{1,2})?\\s*([0-9]{2}:[0-9]{2}:[0-9]{2}.[0-9]{0,6})?\\s*.* (sent|received) (?:to|from) .* ([0-9]{2,3}.[0-9]{2,3}.[0-9]{2,3}.[0-9]{2,3}) active ([1-9]{1,3})/([1-9]{1,3}) .*"; - vector params = { "month", "day", "time", "is-sent", "ip", "major-code", "minor-code" }; - vector luaCodes = { "", "", "", "ret=tostring(arg==\"sent\")", "", "", "" }; - regex expression(regexString); - - RegexStruct rs = RegexStruct(); - rs.tag = "test_tag"; - rs.regexExpression = expression; - rs.params = createEventParams(params, luaCodes); - regexList.push_back(rs); - - string tag; - event_params_t paramDict; - - event_params_t expectedDict; - expectedDict["is-sent"] = "false"; - expectedDict["ip"] = "10.10.24.216"; - expectedDict["major-code"] = "6"; - expectedDict["minor-code"] = "2"; - expectedDict["timestamp"] = "2022-12-03T12:36:24.503424Z"; - - unique_ptr parser(new SyslogParser()); - parser->m_regexList = regexList; - lua_State* luaState = luaL_newstate(); - luaL_openlibs(luaState); - - bool success = parser->parseMessage("Dec 3 12:36:24.503424 NOTIFICATION: received from neighbor 10.10.24.216 active 6/2 (Administrative Shutdown) 0 bytes", tag, paramDict, luaState); - EXPECT_EQ(true, success); - EXPECT_EQ("test_tag", tag); - EXPECT_EQ(expectedDict, paramDict); - - lua_close(luaState); -} - -TEST(rsyslog_plugin, onInit_emptyJSON) { - unique_ptr plugin(new RsyslogPlugin("test_mod_name", "./rsyslog_plugin_tests/test_regex_1.rc.json")); - EXPECT_NE(0, plugin->onInit()); -} - -TEST(rsyslog_plugin, onInit_missingRegex) { - unique_ptr plugin(new RsyslogPlugin("test_mod_name", "./rsyslog_plugin_tests/test_regex_3.rc.json")); - EXPECT_NE(0, plugin->onInit()); -} - -TEST(rsyslog_plugin, onInit_invalidRegex) { - unique_ptr plugin(new RsyslogPlugin("test_mod_name", "./rsyslog_plugin_tests/test_regex_4.rc.json")); - EXPECT_NE(0, plugin->onInit()); -} - -TEST(rsyslog_plugin, onMessage) { - unique_ptr plugin(new RsyslogPlugin("test_mod_name", "./rsyslog_plugin_tests/test_regex_2.rc.json")); - EXPECT_EQ(0, plugin->onInit()); - ifstream infile("./rsyslog_plugin_tests/test_syslogs.txt"); - string logMessage; - bool parseResult; - lua_State* luaState = luaL_newstate(); - luaL_openlibs(luaState); - while(infile >> logMessage >> parseResult) { - EXPECT_EQ(parseResult, plugin->onMessage(logMessage, luaState)); - } - lua_close(luaState); - infile.close(); -} - -TEST(rsyslog_plugin, onMessage_noParams) { - unique_ptr plugin(new RsyslogPlugin("test_mod_name", "./rsyslog_plugin_tests/test_regex_5.rc.json")); - EXPECT_EQ(0, plugin->onInit()); - ifstream infile("./rsyslog_plugin_tests/test_syslogs_2.txt"); - string logMessage; - bool parseResult; - lua_State* luaState = luaL_newstate(); - luaL_openlibs(luaState); - while(infile >> logMessage >> parseResult) { - EXPECT_EQ(parseResult, plugin->onMessage(logMessage, luaState)); - } - lua_close(luaState); - infile.close(); -} - -TEST(timestampFormatter, changeTimestampFormat) { - unique_ptr formatter(new TimestampFormatter()); - - vector timestampOne = { "Jul", "20", "10:09:40.230874" }; - vector timestampTwo = { "Jan", "1", "00:00:00.000000" }; - vector timestampThree = { "Dec", "31", "23:59:59.000000" }; - - string formattedTimestampOne = formatter->changeTimestampFormat(timestampOne); - EXPECT_EQ("2022-07-20T10:09:40.230874Z", formattedTimestampOne); - - EXPECT_EQ("072010:09:40.230874", formatter->m_storedTimestamp); - - string formattedTimestampTwo = formatter->changeTimestampFormat(timestampTwo); - EXPECT_EQ("2022-01-01T00:00:00.000000Z", formattedTimestampTwo); - - formatter->m_storedTimestamp = "010100:00:00.000000"; - formatter->m_storedYear = "2025"; - - string formattedTimestampThree = formatter->changeTimestampFormat(timestampThree); - EXPECT_EQ("2025-12-31T23:59:59.000000Z", formattedTimestampThree); -} - -int main(int argc, char* argv[]) { - testing::InitGoogleTest(&argc, argv); - return RUN_ALL_TESTS(); -} diff --git a/src/sonic-eventd/rsyslog_plugin_tests/subdir.mk b/src/sonic-eventd/rsyslog_plugin_tests/subdir.mk deleted file mode 100644 index 6be7ef09786a..000000000000 --- a/src/sonic-eventd/rsyslog_plugin_tests/subdir.mk +++ /dev/null @@ -1,12 +0,0 @@ -CC := g++ - -RSYSLOG-PLUGIN-TEST_OBJS += ./rsyslog_plugin_tests/rsyslog_plugin_ut.o - -C_DEPS += ./rsyslog_plugin_tests/rsyslog_plugin_ut.d - -rsyslog_plugin_tests/%.o: rsyslog_plugin_tests/%.cpp - @echo 'Building file: $<' - @echo 'Invoking: GCC C++ Compiler' - $(CC) -D__FILENAME__="$(subst rsyslog_plugin_tests/,,$<)" $(CFLAGS) -c -fmessage-length=0 -MMD -MP -MF"$(@:%.o=%.d)" -MT"$(@)" -o "$@" "$<" - @echo 'Finished building: $<' - @echo ' ' diff --git a/src/sonic-eventd/rsyslog_plugin_tests/test_regex_1.rc.json b/src/sonic-eventd/rsyslog_plugin_tests/test_regex_1.rc.json deleted file mode 100644 index e69de29bb2d1..000000000000 diff --git a/src/sonic-eventd/rsyslog_plugin_tests/test_regex_2.rc.json b/src/sonic-eventd/rsyslog_plugin_tests/test_regex_2.rc.json deleted file mode 100644 index 66788d326331..000000000000 --- a/src/sonic-eventd/rsyslog_plugin_tests/test_regex_2.rc.json +++ /dev/null @@ -1,7 +0,0 @@ -[ - { - "tag": "bgp-state", - "regex": ".* %ADJCHANGE: neighbor (.*) (Up|Down) .*", - "params": ["neighbor_ip", "state" ] - } -] diff --git a/src/sonic-eventd/rsyslog_plugin_tests/test_regex_3.rc.json b/src/sonic-eventd/rsyslog_plugin_tests/test_regex_3.rc.json deleted file mode 100644 index 2e67e88f8448..000000000000 --- a/src/sonic-eventd/rsyslog_plugin_tests/test_regex_3.rc.json +++ /dev/null @@ -1,6 +0,0 @@ -[ - { - "tag": "TEST-TAG-NO-REGEX", - "param": [] - } -] diff --git a/src/sonic-eventd/rsyslog_plugin_tests/test_regex_4.rc.json b/src/sonic-eventd/rsyslog_plugin_tests/test_regex_4.rc.json deleted file mode 100644 index c3a875aded0f..000000000000 --- a/src/sonic-eventd/rsyslog_plugin_tests/test_regex_4.rc.json +++ /dev/null @@ -1,7 +0,0 @@ -[ - { - "tag": "TEST-TAG-INVALID-REGEX", - "regex": "+++ ++++(", - "params": [] - } -] diff --git a/src/sonic-eventd/rsyslog_plugin_tests/test_regex_5.rc.json b/src/sonic-eventd/rsyslog_plugin_tests/test_regex_5.rc.json deleted file mode 100644 index ddaf37c931a8..000000000000 --- a/src/sonic-eventd/rsyslog_plugin_tests/test_regex_5.rc.json +++ /dev/null @@ -1,7 +0,0 @@ -[ - { - "tag": "test_tag", - "regex": ".*", - "params": [] - } -] diff --git a/src/sonic-eventd/rsyslog_plugin_tests/test_syslogs.txt b/src/sonic-eventd/rsyslog_plugin_tests/test_syslogs.txt deleted file mode 100644 index 78f89aec3d28..000000000000 --- a/src/sonic-eventd/rsyslog_plugin_tests/test_syslogs.txt +++ /dev/null @@ -1,4 +0,0 @@ -"Aug 17 02:39:21.286611 SN6-0101-0114-02T0 INFO bgp#bgpd[62]: %ADJCHANGE: neighbor 100.126.188.90 Down Neighbor deleted" true -"Aug 17 02:46:42.615668 SN6-0101-0114-02T0 INFO bgp#bgpd[62]: %ADJCHANGE: neighbor 100.126.188.90 Up" true -"Aug 17 04:46:51.290979 SN6-0101-0114-02T0 INFO bgp#bgpd[62]: %ADJCHANGE: neighbor 100.126.188.78 Down Neighbor deleted" true -"Aug 17 04:46:51.290979 SN6-0101-0114-02T0 INFO bgp#bgpd[62]: %NOEVENT: no event" false diff --git a/src/sonic-eventd/rsyslog_plugin_tests/test_syslogs_2.txt b/src/sonic-eventd/rsyslog_plugin_tests/test_syslogs_2.txt deleted file mode 100644 index d56615f61681..000000000000 --- a/src/sonic-eventd/rsyslog_plugin_tests/test_syslogs_2.txt +++ /dev/null @@ -1,3 +0,0 @@ -testMessage true -another_test_message true - true diff --git a/src/sonic-eventd/src/eventd.cpp b/src/sonic-eventd/src/eventd.cpp deleted file mode 100644 index 1ff9dd8be20b..000000000000 --- a/src/sonic-eventd/src/eventd.cpp +++ /dev/null @@ -1,798 +0,0 @@ -#include -#include "eventd.h" -#include "dbconnector.h" - -/* - * There are 5 threads, including the main - * - * (0) main thread -- Runs eventd service that accepts commands event_req_type_t - * This can be used to control caching events and a no-op echo service. - * - * (1) capture/cache service - * Saves all the events between cache start & stop. - * Update missed cached counter in memory. - * - * (2) Main proxy service that runs XSUB/XPUB ends - * - * (3) Get stats for total published counter in memory. This thread also sends - * heartbeat message. It accomplishes by counting upon receive missed due - * to event receive timeout. - * - * (4) Thread to update counters from memory to redis periodically. - * - */ - -using namespace std; -using namespace swss; - -#define MB(N) ((N) * 1024 * 1024) -#define EVT_SIZE_AVG 150 - -#define MAX_CACHE_SIZE (MB(100) / (EVT_SIZE_AVG)) - -/* Count of elements returned in each read */ -#define READ_SET_SIZE 100 - -#define VEC_SIZE(p) ((int)p.size()) - -/* Sock read timeout in milliseconds, to enable look for control signals */ -#define CAPTURE_SOCK_TIMEOUT 800 - -#define HEARTBEAT_INTERVAL_SECS 2 /* Default: 2 seconds */ - -/* Source & tag for heartbeat events */ -#define EVENTD_PUBLISHER_SOURCE "sonic-events-eventd" -#define EVENTD_HEARTBEAT_TAG "heartbeat" - - -const char *counter_keys[COUNTERS_EVENTS_TOTAL] = { - COUNTERS_EVENTS_PUBLISHED, - COUNTERS_EVENTS_MISSED_CACHE -}; - -static bool s_unit_testing = false; - -int -eventd_proxy::init() -{ - int ret = -1, rc = 0; - SWSS_LOG_INFO("Start xpub/xsub proxy"); - - m_frontend = zmq_socket(m_ctx, ZMQ_XSUB); - RET_ON_ERR(m_frontend != NULL, "failing to get ZMQ_XSUB socket"); - - rc = zmq_bind(m_frontend, get_config(string(XSUB_END_KEY)).c_str()); - RET_ON_ERR(rc == 0, "Failing to bind XSUB to %s", get_config(string(XSUB_END_KEY)).c_str()); - - m_backend = zmq_socket(m_ctx, ZMQ_XPUB); - RET_ON_ERR(m_backend != NULL, "failing to get ZMQ_XPUB socket"); - - rc = zmq_bind(m_backend, get_config(string(XPUB_END_KEY)).c_str()); - RET_ON_ERR(rc == 0, "Failing to bind XPUB to %s", get_config(string(XPUB_END_KEY)).c_str()); - - m_capture = zmq_socket(m_ctx, ZMQ_PUB); - RET_ON_ERR(m_capture != NULL, "failing to get ZMQ_PUB socket for capture"); - - rc = zmq_bind(m_capture, get_config(string(CAPTURE_END_KEY)).c_str()); - RET_ON_ERR(rc == 0, "Failing to bind capture PUB to %s", get_config(string(CAPTURE_END_KEY)).c_str()); - - m_thr = thread(&eventd_proxy::run, this); - ret = 0; -out: - return ret; -} - -void -eventd_proxy::run() -{ - SWSS_LOG_INFO("Running xpub/xsub proxy"); - - /* runs forever until zmq context is terminated */ - zmq_proxy(m_frontend, m_backend, m_capture); - - SWSS_LOG_INFO("Stopped xpub/xsub proxy"); -} - - -stats_collector::stats_collector() : - m_shutdown(false), m_pause_heartbeat(false), m_heartbeats_published(0), - m_heartbeats_interval_cnt(0) -{ - set_heartbeat_interval(HEARTBEAT_INTERVAL_SECS); - for (int i=0; i < COUNTERS_EVENTS_TOTAL; ++i) { - m_lst_counters[i] = 0; - } - m_updated = false; -} - - -void -stats_collector::set_heartbeat_interval(int val) -{ - if (val > 0) { - /* Round to highest possible multiples of MIN */ - m_heartbeats_interval_cnt = - (((val * 1000) + STATS_HEARTBEAT_MIN - 1) / STATS_HEARTBEAT_MIN); - } - else if (val == 0) { - /* Least possible */ - m_heartbeats_interval_cnt = 1; - } - else if (val == -1) { - /* Turn off heartbeat */ - m_heartbeats_interval_cnt = 0; - SWSS_LOG_INFO("Heartbeat turned OFF"); - } - /* Any other value is ignored as invalid */ - - SWSS_LOG_INFO("Set heartbeat: val=%d secs cnt=%d min=%d ms final=%d secs", - val, m_heartbeats_interval_cnt, STATS_HEARTBEAT_MIN, - (m_heartbeats_interval_cnt * STATS_HEARTBEAT_MIN / 1000)); -} - - -int -stats_collector::get_heartbeat_interval() -{ - return m_heartbeats_interval_cnt * STATS_HEARTBEAT_MIN / 1000; -} - -int -stats_collector::start() -{ - int rc = -1; - - if (!s_unit_testing) { - try { - m_counters_db = make_shared("COUNTERS_DB", 0, true); - } - catch (exception &e) - { - SWSS_LOG_ERROR("Unable to get DB Connector, e=(%s)\n", e.what()); - } - RET_ON_ERR(m_counters_db != NULL, "Failed to get COUNTERS_DB"); - - m_stats_table = make_shared( - m_counters_db.get(), COUNTERS_EVENTS_TABLE); - RET_ON_ERR(m_stats_table != NULL, "Failed to get events table"); - - m_thr_writer = thread(&stats_collector::run_writer, this); - } - m_thr_collector = thread(&stats_collector::run_collector, this); - rc = 0; -out: - return rc; -} - -void -stats_collector::run_writer() -{ - while (true) { - if (m_updated.exchange(false)) { - /* Update if there had been any update */ - - for (int i = 0; i < COUNTERS_EVENTS_TOTAL; ++i) { - vector fv; - - fv.emplace_back(EVENTS_STATS_FIELD_NAME, to_string(m_lst_counters[i])); - - m_stats_table->set(counter_keys[i], fv); - } - } - if (m_shutdown) { - break; - } - this_thread::sleep_for(chrono::milliseconds(10)); - /* - * After sleep always do an update if needed before checking - * shutdown flag, as any counters collected during sleep - * needs to be updated. - */ - } - - m_stats_table.reset(); - m_counters_db.reset(); -} - -void -stats_collector::run_collector() -{ - int hb_cntr = 0; - string hb_key = string(EVENTD_PUBLISHER_SOURCE) + ":" + EVENTD_HEARTBEAT_TAG; - event_handle_t pub_handle = NULL; - event_handle_t subs_handle = NULL; - - /* - * A subscriber is required to set a subscription. Else all published - * events will be dropped at the point of publishing itself. - */ - pub_handle = events_init_publisher(EVENTD_PUBLISHER_SOURCE); - RET_ON_ERR(pub_handle != NULL, - "failed to create publisher handle for heartbeats"); - - subs_handle = events_init_subscriber(false, STATS_HEARTBEAT_MIN); - RET_ON_ERR(subs_handle != NULL, "failed to subscribe to all"); - - /* - * Though we can count off of capture socket, then we need to duplicate - * code in event_receive which has the logic to count all missed per - * runtime id. It also has logic to retire closed runtime IDs. - * - * So use regular subscriber API w/o cache but timeout to enable - * exit, upon shutdown. - */ - /* - * The collector service runs until shutdown. - * The only task is to update total_published & total_missed_internal. - * The write of these counters into redis is done by another thread. - */ - - while(!m_shutdown) { - event_receive_op_t op; - int rc = 0; - - try { - rc = event_receive(subs_handle, op); - } - catch (exception& e) - { - rc = -1; - stringstream ss; - ss << e.what(); - SWSS_LOG_ERROR("Receive event failed with %s", ss.str().c_str()); - } - - if ((rc == 0) && (op.key != hb_key)) { - /* TODO: Discount EVENT_STR_CTRL_DEINIT messages too */ - increment_published(1+op.missed_cnt); - - /* reset counter on receive to restart. */ - hb_cntr = 0; - } - else { - if (rc < 0) { - SWSS_LOG_ERROR( - "event_receive failed with rc=%d; stats:published(%lu)", rc, - m_lst_counters[INDEX_COUNTERS_EVENTS_PUBLISHED]); - } - if (!m_pause_heartbeat && (m_heartbeats_interval_cnt > 0) && - ++hb_cntr >= m_heartbeats_interval_cnt) { - rc = event_publish(pub_handle, EVENTD_HEARTBEAT_TAG); - if (rc != 0) { - SWSS_LOG_ERROR("Failed to publish heartbeat rc=%d", rc); - } - hb_cntr = 0; - ++m_heartbeats_published; - } - } - } - -out: - /* - * NOTE: A shutdown could lose messages in cache. - * But consider, that eventd shutdown is a critical shutdown as it would - * bring down all other features. Hence done only at system level shutdown, - * hence losing few messages in flight is acceptable. Any more complex code - * to handle is unwanted. - */ - - events_deinit_subscriber(subs_handle); - events_deinit_publisher(pub_handle); - m_shutdown = true; -} - -capture_service::~capture_service() -{ - stop_capture(); -} - -void -capture_service::stop_capture() -{ - m_ctrl = STOP_CAPTURE; - - if (m_thr.joinable()) { - m_thr.join(); - } -} - -static bool -validate_event(const internal_event_t &event, runtime_id_t &rid, sequence_t &seq) -{ - bool ret = false; - - internal_event_t::const_iterator itc_r, itc_s, itc_e; - itc_r = event.find(EVENT_RUNTIME_ID); - itc_s = event.find(EVENT_SEQUENCE); - itc_e = event.find(EVENT_STR_DATA); - - if ((itc_r != event.end()) && (itc_s != event.end()) && (itc_e != event.end())) { - ret = true; - rid = itc_r->second; - seq = str_to_seq(itc_s->second); - } - else { - SWSS_LOG_ERROR("Invalid evt: %s", map_to_str(event).c_str()); - } - - return ret; -} - - -/* - * Initialize cache with set of events provided. - * Events read by cache service will be appended - */ -void -capture_service::init_capture_cache(const event_serialized_lst_t &lst) -{ - /* Cache given events as initial stock. - * Save runtime ID with last seen seq to avoid duplicates, while reading - * from capture socket. - * No check for max cache size here, as most likely not needed. - */ - for (event_serialized_lst_t::const_iterator itc = lst.begin(); itc != lst.end(); ++itc) { - internal_event_t event; - - if (deserialize(*itc, event) == 0) { - runtime_id_t rid; - sequence_t seq; - - if (validate_event(event, rid, seq)) { - m_pre_exist_id[rid] = seq; - m_events.push_back(*itc); - } - } - } -} - - -void -capture_service::do_capture() -{ - int rc; - int block_ms=CAPTURE_SOCK_TIMEOUT; - int init_cnt; - void *cap_sub_sock = NULL; - counters_t total_overflow = 0; - - typedef enum { - /* - * In this state every event read is compared with init cache given - * Only new events are saved. - */ - CAP_STATE_INIT = 0, - - /* In this state, all events read are cached until max limit */ - CAP_STATE_ACTIVE, - - /* Cache has hit max. Hence only save last event for each runime ID */ - CAP_STATE_LAST - } cap_state_t; - - cap_state_t cap_state = CAP_STATE_INIT; - - /* - * Need subscription for publishers to publish. - * The stats collector service already has active subscriber for all. - */ - - cap_sub_sock = zmq_socket(m_ctx, ZMQ_SUB); - RET_ON_ERR(cap_sub_sock != NULL, "failing to get ZMQ_SUB socket"); - - rc = zmq_connect(cap_sub_sock, get_config(string(CAPTURE_END_KEY)).c_str()); - RET_ON_ERR(rc == 0, "Failing to bind capture SUB to %s", get_config(string(CAPTURE_END_KEY)).c_str()); - - rc = zmq_setsockopt(cap_sub_sock, ZMQ_SUBSCRIBE, "", 0); - RET_ON_ERR(rc == 0, "Failing to ZMQ_SUBSCRIBE"); - - rc = zmq_setsockopt(cap_sub_sock, ZMQ_RCVTIMEO, &block_ms, sizeof (block_ms)); - RET_ON_ERR(rc == 0, "Failed to ZMQ_RCVTIMEO to %d", block_ms); - - m_cap_run = true; - - while (m_ctrl != START_CAPTURE) { - /* Wait for capture start */ - this_thread::sleep_for(chrono::milliseconds(10)); - } - - /* - * The cache service connects but defers any reading until caller provides - * the startup cache. But all events that arrived since connect, though not read - * will be held by ZMQ in its local cache. - * - * When cache service starts reading, check against the initial stock for duplicates. - * m_pre_exist_id caches the last seq number in initial stock for each runtime id. - * So only allow sequence number greater than cached number. - * - * Theoretically all the events provided via initial stock could be duplicates. - * Hence until as many events as in initial stock or until the cached id map - * is empty, do this check. - */ - init_cnt = (int)m_events.size(); - - /* Read until STOP_CAPTURE */ - while(m_ctrl == START_CAPTURE) { - runtime_id_t rid; - sequence_t seq; - internal_event_t event; - string source, evt_str; - - if ((rc = zmq_message_read(cap_sub_sock, 0, source, event)) != 0) { - /* - * The capture socket captures SUBSCRIBE requests too. - * The messge could contain subscribe filter strings and binary code. - * Empty string with binary code will fail to deserialize. - * Else would fail event validation. - */ - RET_ON_ERR((rc == EAGAIN) || (rc == ERR_MESSAGE_INVALID), - "0:Failed to read from capture socket"); - continue; - } - if (!validate_event(event, rid, seq)) { - continue; - } - serialize(event, evt_str); - - switch(cap_state) { - case CAP_STATE_INIT: - /* - * In this state check against cache, if duplicate - * When duplicate or new one seen, remove the entry from pre-exist map - * Stay in this state, until the pre-exist cache is empty or as many - * messages as in cache are seen, as in worst case even if you see - * duplicate of each, it will end with first m_events.size() - */ - { - bool add = true; - init_cnt--; - pre_exist_id_t::iterator it = m_pre_exist_id.find(rid); - - if (it != m_pre_exist_id.end()) { - if (seq <= it->second) { - /* Duplicate; Later/same seq in cache. */ - add = false; - } - if (seq >= it->second) { - /* new one; This runtime ID need not be checked again */ - m_pre_exist_id.erase(it); - } - } - if (add) { - m_events.push_back(evt_str); - } - } - if(m_pre_exist_id.empty() || (init_cnt <= 0)) { - /* Init check is no more needed. */ - pre_exist_id_t().swap(m_pre_exist_id); - cap_state = CAP_STATE_ACTIVE; - } - break; - - case CAP_STATE_ACTIVE: - /* Save until max allowed */ - try - { - m_events.push_back(evt_str); - if (VEC_SIZE(m_events) >= m_cache_max) { - cap_state = CAP_STATE_LAST; - /* Clear the map, created to ensure memory space available */ - m_last_events.clear(); - m_last_events_init = true; - } - break; - } - catch (bad_alloc& e) - { - stringstream ss; - ss << e.what(); - SWSS_LOG_ERROR("Cache save event failed with %s events:size=%d", - ss.str().c_str(), VEC_SIZE(m_events)); - cap_state = CAP_STATE_LAST; - // fall through to save this event in last set. - } - - case CAP_STATE_LAST: - total_overflow++; - m_last_events[rid] = evt_str; - if (total_overflow > m_last_events.size()) { - m_total_missed_cache++; - m_stats_instance->increment_missed_cache(1); - } - break; - } - } - -out: - /* - * Capture stop will close the socket which fail the read - * and hence bail out. - */ - zmq_close(cap_sub_sock); - m_cap_run = false; - return; -} - - -int -capture_service::set_control(capture_control_t ctrl, event_serialized_lst_t *lst) -{ - int ret = -1; - - /* Can go in single step only. */ - RET_ON_ERR((ctrl - m_ctrl) == 1, "m_ctrl(%d)+1 < ctrl(%d)", m_ctrl, ctrl); - - switch(ctrl) { - case INIT_CAPTURE: - m_thr = thread(&capture_service::do_capture, this); - for(int i=0; !m_cap_run && (i < 100); ++i) { - /* Wait max a second for thread to init */ - this_thread::sleep_for(chrono::milliseconds(10)); - } - RET_ON_ERR(m_cap_run, "Failed to init capture"); - m_ctrl = ctrl; - ret = 0; - break; - - case START_CAPTURE: - - /* - * Reserve a MAX_PUBLISHERS_COUNT entries for last events, as we use it only - * upon m_events/vector overflow, which might block adding new entries in map - * if overall mem consumption is too high. Clearing the map just before use - * is likely to help. - */ - for (int i=0; iempty())) { - init_capture_cache(*lst); - } - m_ctrl = ctrl; - ret = 0; - break; - - - case STOP_CAPTURE: - /* - * Caller would have initiated SUBS channel. - * Read for CACHE_DRAIN_IN_MILLISECS to drain off cache - * before stopping. - */ - this_thread::sleep_for(chrono::milliseconds(CACHE_DRAIN_IN_MILLISECS)); - stop_capture(); - ret = 0; - break; - - default: - SWSS_LOG_ERROR("Unexpected code=%d", ctrl); - break; - } -out: - return ret; -} - -int -capture_service::read_cache(event_serialized_lst_t &lst_fifo, - last_events_t &lst_last, counters_t &overflow_cnt) -{ - lst_fifo.swap(m_events); - if (m_last_events_init) { - lst_last.swap(m_last_events); - } else { - last_events_t().swap(lst_last); - } - last_events_t().swap(m_last_events); - event_serialized_lst_t().swap(m_events); - overflow_cnt = m_total_missed_cache; - return 0; -} - -static int -process_options(stats_collector *stats, const event_serialized_lst_t &req_data, - event_serialized_lst_t &resp_data) -{ - int ret = -1; - if (!req_data.empty()) { - RET_ON_ERR(req_data.size() == 1, "Expect only one options string %d", - (int)req_data.size()); - const auto &data = nlohmann::json::parse(*(req_data.begin())); - RET_ON_ERR(data.size() == 1, "Only one supported option. Expect 1. size=%d", - (int)data.size()); - const auto it = data.find(GLOBAL_OPTION_HEARTBEAT); - RET_ON_ERR(it != data.end(), "Expect HEARTBEAT_INTERVAL; got %s", - data.begin().key().c_str()); - stats->set_heartbeat_interval(it.value()); - ret = 0; - } - else { - nlohmann::json msg = nlohmann::json::object(); - msg[GLOBAL_OPTION_HEARTBEAT] = stats->get_heartbeat_interval(); - resp_data.push_back(msg.dump()); - ret = 0; - } -out: - return ret; -} - - -void -run_eventd_service() -{ - int code = 0; - int cache_max; - event_service service; - stats_collector stats_instance; - eventd_proxy *proxy = NULL; - capture_service *capture = NULL; - - event_serialized_lst_t capture_fifo_events; - last_events_t capture_last_events; - - SWSS_LOG_INFO("Eventd service starting\n"); - - void *zctx = zmq_ctx_new(); - RET_ON_ERR(zctx != NULL, "Failed to get zmq ctx"); - - cache_max = get_config_data(string(CACHE_MAX_CNT), (int)MAX_CACHE_SIZE); - RET_ON_ERR(cache_max > 0, "Failed to get CACHE_MAX_CNT"); - - proxy = new eventd_proxy(zctx); - RET_ON_ERR(proxy != NULL, "Failed to create proxy"); - - RET_ON_ERR(proxy->init() == 0, "Failed to init proxy"); - - RET_ON_ERR(service.init_server(zctx) == 0, "Failed to init service"); - - RET_ON_ERR(stats_instance.start() == 0, "Failed to start stats collector"); - - /* Pause heartbeat during caching */ - stats_instance.heartbeat_ctrl(true); - - /* - * Start cache service, right upon eventd starts so as not to lose - * events until telemetry starts. - * Telemetry will send a stop & collect cache upon startup - */ - capture = new capture_service(zctx, cache_max, &stats_instance); - RET_ON_ERR(capture->set_control(INIT_CAPTURE) == 0, "Failed to init capture"); - RET_ON_ERR(capture->set_control(START_CAPTURE) == 0, "Failed to start capture"); - - this_thread::sleep_for(chrono::milliseconds(200)); - RET_ON_ERR(stats_instance.is_running(), "Failed to start stats instance"); - - while(code != EVENT_EXIT) { - int resp = -1; - event_serialized_lst_t req_data, resp_data; - - RET_ON_ERR(service.channel_read(code, req_data) == 0, - "Failed to read request"); - - switch(code) { - case EVENT_CACHE_INIT: - /* connect only*/ - if (capture != NULL) { - delete capture; - } - event_serialized_lst_t().swap(capture_fifo_events); - last_events_t().swap(capture_last_events); - - capture = new capture_service(zctx, cache_max, &stats_instance); - if (capture != NULL) { - resp = capture->set_control(INIT_CAPTURE); - } - break; - - - case EVENT_CACHE_START: - if (capture == NULL) { - SWSS_LOG_ERROR("Cache is not initialized to start"); - resp = -1; - break; - } - /* Pause heartbeat during caching */ - stats_instance.heartbeat_ctrl(true); - - resp = capture->set_control(START_CAPTURE, &req_data); - break; - - - case EVENT_CACHE_STOP: - if (capture == NULL) { - SWSS_LOG_ERROR("Cache is not initialized to stop"); - resp = -1; - break; - } - resp = capture->set_control(STOP_CAPTURE); - if (resp == 0) { - counters_t overflow; - resp = capture->read_cache(capture_fifo_events, capture_last_events, - overflow); - } - delete capture; - capture = NULL; - - /* Unpause heartbeat upon stop caching */ - stats_instance.heartbeat_ctrl(); - break; - - - case EVENT_CACHE_READ: - if (capture != NULL) { - SWSS_LOG_ERROR("Cache is not stopped yet."); - resp = -1; - break; - } - resp = 0; - - if (capture_fifo_events.empty()) { - for (last_events_t::iterator it = capture_last_events.begin(); - it != capture_last_events.end(); ++it) { - capture_fifo_events.push_back(it->second); - } - last_events_t().swap(capture_last_events); - } - - { - int sz = VEC_SIZE(capture_fifo_events) < READ_SET_SIZE ? - VEC_SIZE(capture_fifo_events) : READ_SET_SIZE; - - if (sz != 0) { - auto it = std::next(capture_fifo_events.begin(), sz); - move(capture_fifo_events.begin(), capture_fifo_events.end(), - back_inserter(resp_data)); - - if (sz == VEC_SIZE(capture_fifo_events)) { - event_serialized_lst_t().swap(capture_fifo_events); - } else { - capture_fifo_events.erase(capture_fifo_events.begin(), it); - } - } - } - break; - - - case EVENT_ECHO: - resp = 0; - resp_data.swap(req_data); - break; - - case EVENT_OPTIONS: - resp = process_options(&stats_instance, req_data, resp_data); - break; - - case EVENT_EXIT: - resp = 0; - break; - - default: - SWSS_LOG_ERROR("Unexpected request: %d", code); - assert(false); - break; - } - RET_ON_ERR(service.channel_write(resp, resp_data) == 0, - "Failed to write response back"); - } -out: - service.close_service(); - stats_instance.stop(); - - if (proxy != NULL) { - delete proxy; - } - if (capture != NULL) { - delete capture; - } - if (zctx != NULL) { - zmq_ctx_term(zctx); - } - SWSS_LOG_ERROR("Eventd service exiting\n"); -} - -void set_unit_testing(bool b) -{ - s_unit_testing = b; -} - - diff --git a/src/sonic-eventd/src/eventd.h b/src/sonic-eventd/src/eventd.h deleted file mode 100644 index 8411223b35be..000000000000 --- a/src/sonic-eventd/src/eventd.h +++ /dev/null @@ -1,268 +0,0 @@ -/* - * Header file for eventd daemon - */ -#include "table.h" -#include "events_service.h" -#include "events.h" -#include "events_wrap.h" - -#define ARRAY_SIZE(l) (sizeof(l)/sizeof((l)[0])) - -typedef map last_events_t; - -/* stat counters */ -typedef uint64_t counters_t; - -typedef enum { - INDEX_COUNTERS_EVENTS_PUBLISHED, - INDEX_COUNTERS_EVENTS_MISSED_CACHE, - COUNTERS_EVENTS_TOTAL -} stats_counter_index_t; - -#define EVENTS_STATS_FIELD_NAME "value" -#define STATS_HEARTBEAT_MIN 300 - -/* - * Started by eventd_service. - * Creates XPUB & XSUB end points. - * Bind the same - * Create a PUB socket end point for capture and bind. - * Call run_proxy method with sockets in a dedicated thread. - * Thread runs forever until the zmq context is terminated. - */ -class eventd_proxy -{ - public: - eventd_proxy(void *ctx) : m_ctx(ctx), m_frontend(NULL), m_backend(NULL), - m_capture(NULL) {}; - - ~eventd_proxy() { - zmq_close(m_frontend); - zmq_close(m_backend); - zmq_close(m_capture); - - if (m_thr.joinable()) - m_thr.join(); - } - - int init(); - - private: - void run(); - - void *m_ctx; - void *m_frontend; - void *m_backend; - void *m_capture; - thread m_thr; -}; - - -class stats_collector -{ - public: - stats_collector(); - - ~stats_collector() { stop(); } - - int start(); - - void stop() { - - m_shutdown = true; - - if (m_thr_collector.joinable()) { - m_thr_collector.join(); - } - - if (m_thr_writer.joinable()) { - m_thr_writer.join(); - } - } - - void increment_published(counters_t val) { - _update_stats(INDEX_COUNTERS_EVENTS_PUBLISHED, val); - } - - void increment_missed_cache(counters_t val) { - _update_stats(INDEX_COUNTERS_EVENTS_MISSED_CACHE, val); - } - - counters_t read_counter(stats_counter_index_t index) { - if (index != COUNTERS_EVENTS_TOTAL) { - return m_lst_counters[index]; - } - else { - return 0; - } - } - - /* Sets heartbeat interval in milliseconds */ - void set_heartbeat_interval(int val_in_ms); - - /* - * Get heartbeat interval in milliseconds - * NOTE: Set & get value may not match as the value is rounded - * to a multiple of smallest possible interval. - */ - int get_heartbeat_interval(); - - /* A way to pause heartbeat */ - void heartbeat_ctrl(bool pause = false) { - m_pause_heartbeat = pause; - SWSS_LOG_INFO("Set heartbeat_ctrl pause=%d", pause); - } - - uint64_t heartbeats_published() const { - return m_heartbeats_published; - } - - bool is_running() - { - return !m_shutdown; - } - - private: - void _update_stats(stats_counter_index_t index, counters_t val) { - if (index != COUNTERS_EVENTS_TOTAL) { - m_lst_counters[index] += val; - m_updated = true; - } - else { - SWSS_LOG_ERROR("Internal code error. Invalid index=%d", index); - } - } - - void run_collector(); - - void run_writer(); - - atomic m_updated; - - counters_t m_lst_counters[COUNTERS_EVENTS_TOTAL]; - - bool m_shutdown; - - thread m_thr_collector; - thread m_thr_writer; - - shared_ptr m_counters_db; - shared_ptr m_stats_table; - - bool m_pause_heartbeat; - - uint64_t m_heartbeats_published; - - int m_heartbeats_interval_cnt; -}; - -/* - * Capture/Cache service - * - * The service started in a dedicted thread upon demand. - * It is controlled by the caller. - * On cache init, the thread is created. - * Upon create, it creates a SUB socket to PUB end point of capture. - * PUB end point is maintained by zproxy service. - * - * On Cache start, the thread is signalled to start reading. - * - * On cache stop, it is signalled to stop reading and exit. Caller waits - * for thread to exit, before starting to read cached data, to ensure - * that the data is not handled by two threads concurrently. - * - * This thread maintains its own copy of cache. Reader, does a swap - * after thread exits. - * This thread ensures the cache is empty at the init. - * - * Upon cache start, the thread is blocked in receive call with timeout. - * Only upon receive/timeout, it would notice stop signal. Hence stop - * is not synchronous. The caller may wait for thread to terminate - * via thread.join(). - * - * Each event is 2 parts. It drops the first part, which is - * more for filtering events. It creates string from second part - * and saves it. - * - * The string is the serialized version of internal_event_ref - * - * It keeps two sets of data - * 1) List of all events received in vector in same order as received - * 2) Map of last event from each runtime id upon list overflow max size. - * - * We add to the vector as much as allowed by vector and max limit, - * whichever comes first. - * - * The sequence number in internal event will help assess the missed count - * by the consumer of the cache data. - * - */ -typedef enum { - NEED_INIT = 0, - INIT_CAPTURE, - START_CAPTURE, - STOP_CAPTURE -} capture_control_t; - - -class capture_service -{ - public: - capture_service(void *ctx, int cache_max, stats_collector *stats) : - m_ctx(ctx), m_stats_instance(stats), m_cap_run(false), - m_ctrl(NEED_INIT), m_cache_max(cache_max), - m_last_events_init(false), m_total_missed_cache(0) - {} - - ~capture_service(); - - int set_control(capture_control_t ctrl, event_serialized_lst_t *p=NULL); - - int read_cache(event_serialized_lst_t &lst_fifo, - last_events_t &lst_last, counters_t &overflow_cnt); - - private: - void init_capture_cache(const event_serialized_lst_t &lst); - void do_capture(); - - void stop_capture(); - - void *m_ctx; - stats_collector *m_stats_instance; - - bool m_cap_run; - capture_control_t m_ctrl; - thread m_thr; - - int m_cache_max; - - event_serialized_lst_t m_events; - - last_events_t m_last_events; - bool m_last_events_init; - - typedef map pre_exist_id_t; - pre_exist_id_t m_pre_exist_id; - - counters_t m_total_missed_cache; - -}; - - -/* - * Main server, that starts the zproxy service and honor - * eventd service requests event_req_type_t - * - * For echo, it just echoes - * - * FOr cache start, create the SUB end of capture and kick off - * capture_events thread. Upon cache stop command, close the handle - * which will stop the caching thread with read failure. - * - * for cache read, returns the collected events in chunks. - * - */ -void run_eventd_service(); - -/* To help skip redis access during unit testing */ -void set_unit_testing(bool b); diff --git a/src/sonic-eventd/src/main.cpp b/src/sonic-eventd/src/main.cpp deleted file mode 100644 index 7a20497f0986..000000000000 --- a/src/sonic-eventd/src/main.cpp +++ /dev/null @@ -1,18 +0,0 @@ -#include "logger.h" -#include "eventd.h" - -void run_eventd_service(); - -int main() -{ - swss::Logger::setMinPrio(swss::Logger::SWSS_DEBUG); - SWSS_LOG_INFO("The eventd service started"); - SWSS_LOG_ERROR("ERR:The eventd service started"); - - run_eventd_service(); - - SWSS_LOG_INFO("The eventd service exited"); - - return 0; -} - diff --git a/src/sonic-eventd/src/subdir.mk b/src/sonic-eventd/src/subdir.mk deleted file mode 100644 index a1e2b55f8d13..000000000000 --- a/src/sonic-eventd/src/subdir.mk +++ /dev/null @@ -1,13 +0,0 @@ -CC := g++ - -TEST_OBJS += ./src/eventd.o -OBJS += ./src/eventd.o ./src/main.o - -C_DEPS += ./src/eventd.d ./src/main.d - -src/%.o: src/%.cpp - @echo 'Building file: $<' - @echo 'Invoking: GCC C++ Compiler' - $(CC) -D__FILENAME__="$(subst src/,,$<)" $(CFLAGS) -c -fmessage-length=0 -MMD -MP -MF"$(@:%.o=%.d)" -MT"$(@)" -o "$@" "$<" - @echo 'Finished building: $<' - @echo ' ' diff --git a/src/sonic-eventd/tests/eventd_ut.cpp b/src/sonic-eventd/tests/eventd_ut.cpp deleted file mode 100644 index 399255edb2b8..000000000000 --- a/src/sonic-eventd/tests/eventd_ut.cpp +++ /dev/null @@ -1,915 +0,0 @@ -#include -#include -#include -#include -#include -#include -#include -#include "gtest/gtest.h" -#include "events_common.h" -#include "events.h" -#include "../src/eventd.h" - -using namespace std; -using namespace swss; - -extern bool g_is_redis_available; -extern const char *counter_keys[]; - -typedef struct { - int id; - string source; - string tag; - string rid; - string seq; - event_params_t params; - int missed_cnt; -} test_data_t; - -internal_event_t create_ev(const test_data_t &data) -{ - internal_event_t event_data; - - event_data[EVENT_STR_DATA] = convert_to_json( - data.source + ":" + data.tag, data.params); - event_data[EVENT_RUNTIME_ID] = data.rid; - event_data[EVENT_SEQUENCE] = data.seq; - - return event_data; -} - -/* Mock test data with event parameters and expected missed count */ -static const test_data_t ldata[] = { - { - 0, - "source0", - "tag0", - "guid-0", - "1", - {{"ip", "10.10.10.10"}, {"state", "up"}}, - 0 - }, - { - 1, - "source0", - "tag1", - "guid-1", - "100", - {{"ip", "10.10.27.10"}, {"state", "down"}}, - 0 - }, - { - 2, - "source1", - "tag2", - "guid-2", - "101", - {{"ip", "10.10.24.10"}, {"state", "down"}}, - 0 - }, - { - 3, - "source0", - "tag3", - "guid-1", - "105", - {{"ip", "10.10.10.10"}, {"state", "up"}}, - 4 - }, - { - 4, - "source0", - "tag4", - "guid-0", - "2", - {{"ip", "10.10.20.10"}, {"state", "down"}}, - 0 - }, - { - 5, - "source1", - "tag5", - "guid-2", - "110", - {{"ip", "10.10.24.10"}, {"state", "down"}}, - 8 - }, - { - 6, - "source0", - "tag0", - "guid-0", - "5", - {{"ip", "10.10.10.10"}, {"state", "up"}}, - 2 - }, - { - 7, - "source0", - "tag1", - "guid-1", - "106", - {{"ip", "10.10.27.10"}, {"state", "down"}}, - 0 - }, - { - 8, - "source1", - "tag2", - "guid-2", - "111", - {{"ip", "10.10.24.10"}, {"state", "down"}}, - 0 - }, - { - 9, - "source0", - "tag3", - "guid-1", - "109", - {{"ip", "10.10.10.10"}, {"state", "up"}}, - 2 - }, - { - 10, - "source0", - "tag4", - "guid-0", - "6", - {{"ip", "10.10.20.10"}, {"state", "down"}}, - 0 - }, - { - 11, - "source1", - "tag5", - "guid-2", - "119", - {{"ip", "10.10.24.10"}, {"state", "down"}}, - 7 - }, -}; - - -void run_cap(void *zctx, bool &term, string &read_source, - int &cnt) -{ - void *mock_cap = zmq_socket (zctx, ZMQ_SUB); - string source; - internal_event_t ev_int; - int block_ms = 200; - int i=0; - - EXPECT_TRUE(NULL != mock_cap); - EXPECT_EQ(0, zmq_connect(mock_cap, get_config(CAPTURE_END_KEY).c_str())); - EXPECT_EQ(0, zmq_setsockopt(mock_cap, ZMQ_SUBSCRIBE, "", 0)); - EXPECT_EQ(0, zmq_setsockopt(mock_cap, ZMQ_RCVTIMEO, &block_ms, sizeof (block_ms))); - - while(!term) { - string source; - internal_event_t ev_int; - - if (0 == zmq_message_read(mock_cap, 0, source, ev_int)) { - cnt = ++i; - } - } - zmq_close(mock_cap); -} - -void run_sub(void *zctx, bool &term, string &read_source, internal_events_lst_t &lst, - int &cnt) -{ - void *mock_sub = zmq_socket (zctx, ZMQ_SUB); - string source; - internal_event_t ev_int; - int block_ms = 200; - - EXPECT_TRUE(NULL != mock_sub); - EXPECT_EQ(0, zmq_connect(mock_sub, get_config(XPUB_END_KEY).c_str())); - EXPECT_EQ(0, zmq_setsockopt(mock_sub, ZMQ_SUBSCRIBE, "", 0)); - EXPECT_EQ(0, zmq_setsockopt(mock_sub, ZMQ_RCVTIMEO, &block_ms, sizeof (block_ms))); - - while(!term) { - if (0 == zmq_message_read(mock_sub, 0, source, ev_int)) { - lst.push_back(ev_int); - read_source.swap(source); - cnt = (int)lst.size(); - } - } - - zmq_close(mock_sub); -} - -void *init_pub(void *zctx) -{ - void *mock_pub = zmq_socket (zctx, ZMQ_PUB); - EXPECT_TRUE(NULL != mock_pub); - EXPECT_EQ(0, zmq_connect(mock_pub, get_config(XSUB_END_KEY).c_str())); - - /* Provide time for async connect to complete */ - this_thread::sleep_for(chrono::milliseconds(200)); - - return mock_pub; -} - -void run_pub(void *mock_pub, const string wr_source, internal_events_lst_t &lst) -{ - for(internal_events_lst_t::const_iterator itc = lst.begin(); itc != lst.end(); ++itc) { - EXPECT_EQ(0, zmq_message_send(mock_pub, wr_source, *itc)); - } -} - - -TEST(eventd, proxy) -{ - printf("Proxy TEST started\n"); - bool term_sub = false; - bool term_cap = false; - string rd_csource, rd_source, wr_source("hello"); - internal_events_lst_t rd_evts, wr_evts; - int rd_evts_sz = 0, rd_cevts_sz = 0; - int wr_sz; - - void *zctx = zmq_ctx_new(); - EXPECT_TRUE(NULL != zctx); - - eventd_proxy *pxy = new eventd_proxy(zctx); - EXPECT_TRUE(NULL != pxy); - - /* Starting proxy */ - EXPECT_EQ(0, pxy->init()); - - /* subscriber in a thread */ - thread thr(&run_sub, zctx, ref(term_sub), ref(rd_source), ref(rd_evts), ref(rd_evts_sz)); - - /* capture in a thread */ - thread thrc(&run_cap, zctx, ref(term_cap), ref(rd_csource), ref(rd_cevts_sz)); - - /* Init pub connection */ - void *mock_pub = init_pub(zctx); - - EXPECT_TRUE(5 < ARRAY_SIZE(ldata)); - - for(int i=0; i<5; ++i) { - wr_evts.push_back(create_ev(ldata[i])); - } - - EXPECT_TRUE(rd_evts.empty()); - EXPECT_TRUE(rd_source.empty()); - - /* Publish events. */ - run_pub(mock_pub, wr_source, wr_evts); - - wr_sz = (int)wr_evts.size(); - for(int i=0; (wr_sz != rd_evts_sz) && (i < 100); ++i) { - /* Loop & wait for atmost a second */ - this_thread::sleep_for(chrono::milliseconds(10)); - } - this_thread::sleep_for(chrono::milliseconds(1000)); - - delete pxy; - pxy = NULL; - - term_sub = true; - term_cap = true; - - thr.join(); - thrc.join(); - EXPECT_EQ(rd_evts.size(), wr_evts.size()); - EXPECT_EQ(rd_cevts_sz, wr_evts.size()); - - zmq_close(mock_pub); - zmq_ctx_term(zctx); - - /* Provide time for async proxy removal to complete */ - this_thread::sleep_for(chrono::milliseconds(200)); - - printf("eventd_proxy is tested GOOD\n"); -} - - -TEST(eventd, capture) -{ - printf("Capture TEST started\n"); - - bool term_sub = false; - string sub_source; - int sub_evts_sz = 0; - internal_events_lst_t sub_evts; - stats_collector stats_instance; - - /* run_pub details */ - string wr_source("hello"); - internal_events_lst_t wr_evts; - - /* capture related */ - int init_cache = 3; /* provided along with start capture */ - int cache_max = init_cache + 3; /* capture service cache max */ - - /* startup strings; expected list & read list from capture */ - event_serialized_lst_t evts_start, evts_expect, evts_read; - last_events_t last_evts_exp, last_evts_read; - counters_t overflow, overflow_exp = 0; - - void *zctx = zmq_ctx_new(); - EXPECT_TRUE(NULL != zctx); - - /* Run the proxy; Capture service reads from proxy */ - eventd_proxy *pxy = new eventd_proxy(zctx); - EXPECT_TRUE(NULL != pxy); - - /* Starting proxy */ - EXPECT_EQ(0, pxy->init()); - - /* Run subscriber; Else publisher will drop events on floor, with no subscriber. */ - thread thr_sub(&run_sub, zctx, ref(term_sub), ref(sub_source), ref(sub_evts), ref(sub_evts_sz)); - - /* Create capture service */ - capture_service *pcap = new capture_service(zctx, cache_max, &stats_instance); - - /* Expect START_CAPTURE */ - EXPECT_EQ(-1, pcap->set_control(STOP_CAPTURE)); - - /* Initialize the capture */ - EXPECT_EQ(0, pcap->set_control(INIT_CAPTURE)); - - EXPECT_TRUE(init_cache > 1); - EXPECT_TRUE((cache_max+3) < (int)ARRAY_SIZE(ldata)); - - /* Collect few serailized strings of events for startup cache */ - for(int i=0; i < init_cache; ++i) { - internal_event_t ev(create_ev(ldata[i])); - string evt_str; - serialize(ev, evt_str); - evts_start.push_back(evt_str); - evts_expect.push_back(evt_str); - } - - /* - * Collect events to publish for capture to cache - * re-publishing some events sent in cache. - * Hence i=1, when first init_cache events are already - * in crash. - */ - for(int i=1; i < (int)ARRAY_SIZE(ldata); ++i) { - internal_event_t ev(create_ev(ldata[i])); - string evt_str; - - serialize(ev, evt_str); - - wr_evts.push_back(ev); - - if (i < cache_max) { - if (i >= init_cache) { - /* for i < init_cache, evts_expect is already populated */ - evts_expect.push_back(evt_str); - } - } else { - /* collect last entries for overflow */ - last_evts_exp[ldata[i].rid] = evt_str; - overflow_exp++; - } - } - overflow_exp -= (int)last_evts_exp.size(); - - EXPECT_EQ(0, pcap->set_control(START_CAPTURE, &evts_start)); - - /* Init pub connection */ - void *mock_pub = init_pub(zctx); - - /* Publish events from 1 to all. */ - run_pub(mock_pub, wr_source, wr_evts); - - /* Provide time for async message receive. */ - this_thread::sleep_for(chrono::milliseconds(200)); - - /* Stop capture, closes socket & terminates the thread */ - EXPECT_EQ(0, pcap->set_control(STOP_CAPTURE)); - - /* terminate subs thread */ - term_sub = true; - - /* Read the cache */ - EXPECT_EQ(0, pcap->read_cache(evts_read, last_evts_read, overflow)); - -#ifdef DEBUG_TEST - if ((evts_read.size() != evts_expect.size()) || - (last_evts_read.size() != last_evts_exp.size())) { - printf("size: sub_evts_sz=%d sub_evts=%d\n", sub_evts_sz, (int)sub_evts.size()); - printf("init_cache=%d cache_max=%d\n", init_cache, cache_max); - printf("overflow=%ul overflow_exp=%ul\n", overflow, overflow_exp); - printf("evts_start=%d evts_expect=%d evts_read=%d\n", - (int)evts_start.size(), (int)evts_expect.size(), (int)evts_read.size()); - printf("last_evts_exp=%d last_evts_read=%d\n", (int)last_evts_exp.size(), - (int)last_evts_read.size()); - } -#endif - - EXPECT_EQ(evts_read.size(), evts_expect.size()); - EXPECT_EQ(evts_read, evts_expect); - EXPECT_EQ(last_evts_read.size(), last_evts_exp.size()); - EXPECT_EQ(last_evts_read, last_evts_exp); - EXPECT_EQ(overflow, overflow_exp); - - delete pxy; - pxy = NULL; - - delete pcap; - pcap = NULL; - - thr_sub.join(); - - zmq_close(mock_pub); - zmq_ctx_term(zctx); - - /* Provide time for async proxy removal to complete */ - this_thread::sleep_for(chrono::milliseconds(200)); - - printf("Capture TEST completed\n"); -} - -TEST(eventd, captureCacheMax) -{ - printf("Capture TEST with matchinhg cache-max started\n"); - - /* - * Need to run subscriber; Else publisher would skip publishing - * in the absence of any subscriber. - */ - bool term_sub = false; - string sub_source; - int sub_evts_sz = 0; - internal_events_lst_t sub_evts; - stats_collector stats_instance; - - /* run_pub details */ - string wr_source("hello"); - internal_events_lst_t wr_evts; - - /* capture related */ - int init_cache = 4; /* provided along with start capture */ - int cache_max = ARRAY_SIZE(ldata); /* capture service cache max */ - - /* startup strings; expected list & read list from capture */ - event_serialized_lst_t evts_start, evts_expect, evts_read; - last_events_t last_evts_read; - counters_t overflow; - - void *zctx = zmq_ctx_new(); - EXPECT_TRUE(NULL != zctx); - - /* Run the proxy; Capture service reads from proxy */ - eventd_proxy *pxy = new eventd_proxy(zctx); - EXPECT_TRUE(NULL != pxy); - - /* Starting proxy */ - EXPECT_EQ(0, pxy->init()); - - /* Run subscriber; Else publisher will drop events on floor, with no subscriber. */ - thread thr_sub(&run_sub, zctx, ref(term_sub), ref(sub_source), ref(sub_evts), ref(sub_evts_sz)); - - /* Create capture service */ - capture_service *pcap = new capture_service(zctx, cache_max, &stats_instance); - - /* Expect START_CAPTURE */ - EXPECT_EQ(-1, pcap->set_control(STOP_CAPTURE)); - - EXPECT_TRUE(init_cache > 1); - - /* Collect few serailized strings of events for startup cache */ - for(int i=0; i < init_cache; ++i) { - internal_event_t ev(create_ev(ldata[i])); - string evt_str; - serialize(ev, evt_str); - evts_start.push_back(evt_str); - evts_expect.push_back(evt_str); - } - - /* - * Collect events to publish for capture to cache - * re-publishing some events sent in cache. - */ - for(int i=1; i < (int)ARRAY_SIZE(ldata); ++i) { - internal_event_t ev(create_ev(ldata[i])); - string evt_str; - - serialize(ev, evt_str); - - wr_evts.push_back(ev); - - if (i >= init_cache) { - /* for i < init_cache, evts_expect is already populated */ - evts_expect.push_back(evt_str); - } - } - - EXPECT_EQ(0, pcap->set_control(INIT_CAPTURE)); - EXPECT_EQ(0, pcap->set_control(START_CAPTURE, &evts_start)); - - /* Init pub connection */ - void *mock_pub = init_pub(zctx); - - /* Publish events from 1 to all. */ - run_pub(mock_pub, wr_source, wr_evts); - - /* Provide time for async message receive. */ - this_thread::sleep_for(chrono::milliseconds(100)); - - /* Stop capture, closes socket & terminates the thread */ - EXPECT_EQ(0, pcap->set_control(STOP_CAPTURE)); - - /* terminate subs thread */ - term_sub = true; - - /* Read the cache */ - EXPECT_EQ(0, pcap->read_cache(evts_read, last_evts_read, overflow)); - -#ifdef DEBUG_TEST - if ((evts_read.size() != evts_expect.size()) || - !last_evts_read.empty()) { - printf("size: sub_evts_sz=%d sub_evts=%d\n", sub_evts_sz, (int)sub_evts.size()); - printf("init_cache=%d cache_max=%d\n", init_cache, cache_max); - printf("evts_start=%d evts_expect=%d evts_read=%d\n", - (int)evts_start.size(), (int)evts_expect.size(), (int)evts_read.size()); - printf("last_evts_read=%d\n", (int)last_evts_read.size()); - printf("overflow=%ul overflow_exp=%ul\n", overflow, overflow_exp); - } -#endif - - EXPECT_EQ(evts_read, evts_expect); - EXPECT_TRUE(last_evts_read.empty()); - EXPECT_EQ(overflow, 0); - - delete pxy; - pxy = NULL; - - delete pcap; - pcap = NULL; - - thr_sub.join(); - - zmq_close(mock_pub); - zmq_ctx_term(zctx); - - /* Provide time for async proxy removal to complete */ - this_thread::sleep_for(chrono::milliseconds(200)); - - printf("Capture TEST with matchinhg cache-max completed\n"); -} - -TEST(eventd, service) -{ - /* - * Don't PUB/SUB events as main run_eventd_service itself - * is using zmq_message_read. Any PUB/SUB will cause - * eventd's do_capture running in another thread to call - * zmq_message_read, which will crash as boost:archive is - * not thread safe. - * TEST(eventd, capture) has already tested caching. - */ - printf("Service TEST started\n"); - - /* startup strings; expected list & read list from capture */ - event_service service; - - void *zctx = zmq_ctx_new(); - EXPECT_TRUE(NULL != zctx); - - /* - * Start the eventd server side service - * It runs proxy & capture service - * It uses its own zmq context - * It starts to capture too. - */ - - if (!g_is_redis_available) { - set_unit_testing(true); - } - - thread thread_service(&run_eventd_service); - - /* Need client side service to interact with server side */ - EXPECT_EQ(0, service.init_client(zctx)); - - { - /* eventd_service starts cache too; Test this caching */ - /* Init pub connection */ - void *mock_pub = init_pub(zctx); - EXPECT_TRUE(NULL != mock_pub); - - internal_events_lst_t wr_evts; - int wr_sz = 2; - string wr_source("hello"); - - /* Test service startup caching */ - event_serialized_lst_t evts_start, evts_read; - - for(int i=0; i evts_start_int; - - EXPECT_TRUE(init_cache > 1); - - /* Collect few serailized strings of events for startup cache */ - for(int i=0; i < init_cache; ++i) { - internal_event_t ev(create_ev(ldata[i])); - string evt_str; - serialize(ev, evt_str); - evts_start.push_back(evt_str); - evts_start_int.push_back(ev); - } - - - EXPECT_EQ(0, service.cache_init()); - EXPECT_EQ(0, service.cache_start(evts_start)); - - this_thread::sleep_for(chrono::milliseconds(200)); - - /* Stop capture, closes socket & terminates the thread */ - EXPECT_EQ(0, service.cache_stop()); - - /* Read the cache */ - EXPECT_EQ(0, service.cache_read(evts_read)); - - if (evts_read != evts_start) { - vector evts_read_int; - - for (event_serialized_lst_t::const_iterator itc = evts_read.begin(); - itc != evts_read.end(); ++itc) { - internal_event_t event; - - if (deserialize(*itc, event) == 0) { - evts_read_int.push_back(event); - } - } - EXPECT_EQ(evts_read_int, evts_start_int); - } - } - - { - string set_opt_bad("{\"HEARTBEAT_INTERVAL\": 2000, \"OFFLINE_CACHE_SIZE\": 500}"); - string set_opt_good("{\"HEARTBEAT_INTERVAL\":5}"); - char buff[100]; - buff[0] = 0; - - EXPECT_EQ(-1, service.global_options_set(set_opt_bad.c_str())); - EXPECT_EQ(0, service.global_options_set(set_opt_good.c_str())); - EXPECT_LT(0, service.global_options_get(buff, sizeof(buff))); - - EXPECT_EQ(set_opt_good, string(buff)); - } - - EXPECT_EQ(0, service.send_recv(EVENT_EXIT)); - - service.close_service(); - - thread_service.join(); - - zmq_ctx_term(zctx); - printf("Service TEST completed\n"); -} - - -void -wait_for_heartbeat(stats_collector &stats_instance, long unsigned int cnt, - int wait_ms = 3000) -{ - int diff = 0; - - auto st = duration_cast(system_clock::now().time_since_epoch()).count(); - while (stats_instance.heartbeats_published() == cnt) { - auto en = duration_cast(system_clock::now().time_since_epoch()).count(); - diff = en - st; - if (diff > wait_ms) { - EXPECT_LE(diff, wait_ms); - EXPECT_EQ(cnt, stats_instance.heartbeats_published()); - break; - } - else { - stringstream ss; - ss << (en -st); - } - this_thread::sleep_for(chrono::milliseconds(300)); - } -} - -TEST(eventd, heartbeat) -{ - printf("heartbeat TEST started\n"); - - int rc; - long unsigned int cnt; - stats_collector stats_instance; - - if (!g_is_redis_available) { - set_unit_testing(true); - } - - void *zctx = zmq_ctx_new(); - EXPECT_TRUE(NULL != zctx); - - eventd_proxy *pxy = new eventd_proxy(zctx); - EXPECT_TRUE(NULL != pxy); - - /* Starting proxy */ - EXPECT_EQ(0, pxy->init()); - - rc = stats_instance.start(); - EXPECT_EQ(rc, 0); - - /* Wait for any non-zero heartbeat */ - wait_for_heartbeat(stats_instance, 0); - - /* Pause heartbeat */ - stats_instance.heartbeat_ctrl(true); - - /* Sleep to ensure the other thread noticed the pause request. */ - this_thread::sleep_for(chrono::milliseconds(200)); - - /* Get current count */ - cnt = stats_instance.heartbeats_published(); - - /* Wait for 3 seconds with no new neartbeat */ - this_thread::sleep_for(chrono::seconds(3)); - - EXPECT_EQ(stats_instance.heartbeats_published(), cnt); - - /* Set interval as 1 second */ - stats_instance.set_heartbeat_interval(1); - - /* Turn on heartbeat */ - stats_instance.heartbeat_ctrl(); - - /* Wait for heartbeat count to change from last count */ - wait_for_heartbeat(stats_instance, cnt, 2000); - - stats_instance.stop(); - - delete pxy; - - zmq_ctx_term(zctx); - - printf("heartbeat TEST completed\n"); -} - - -TEST(eventd, testDB) -{ - printf("DB TEST started\n"); - - /* consts used */ - const int pub_count = 7; - const int cache_max = 3; - - stats_collector stats_instance; - event_handle_t pub_handle; - event_serialized_lst_t evts_read; - last_events_t last_evts_read; - counters_t overflow; - string tag; - - if (!g_is_redis_available) { - printf("redis not available; Hence DB TEST skipped\n"); - return; - } - - EXPECT_LT(cache_max, pub_count); - DBConnector db("COUNTERS_DB", 0, true); - - - /* Not testing heartbeat; Hence set high val as 10 seconds */ - stats_instance.set_heartbeat_interval(10000); - - /* Start instance to capture published count & as well writes to DB */ - EXPECT_EQ(0, stats_instance.start()); - - void *zctx = zmq_ctx_new(); - EXPECT_TRUE(NULL != zctx); - - /* Run proxy to enable receive as capture test needs to receive */ - eventd_proxy *pxy = new eventd_proxy(zctx); - EXPECT_TRUE(NULL != pxy); - - /* Starting proxy */ - EXPECT_EQ(0, pxy->init()); - - /* Create capture service */ - capture_service *pcap = new capture_service(zctx, cache_max, &stats_instance); - - /* Initialize the capture */ - EXPECT_EQ(0, pcap->set_control(INIT_CAPTURE)); - - /* Kick off capture */ - EXPECT_EQ(0, pcap->set_control(START_CAPTURE)); - - pub_handle = events_init_publisher("test_db"); - - for(int i=0; i < pub_count; ++i) { - tag = string("test_db_tag_") + to_string(i); - event_publish(pub_handle, tag); - } - - /* Pause to ensure all publisghed events did reach capture service */ - this_thread::sleep_for(chrono::milliseconds(200)); - - EXPECT_EQ(0, pcap->set_control(STOP_CAPTURE)); - - /* Read the cache */ - EXPECT_EQ(0, pcap->read_cache(evts_read, last_evts_read, overflow)); - - /* - * Sent pub_count messages of different tags. - * Upon cache max, only event per sender/runtime-id is saved. Hence - * expected last_evts_read is one. - * expected overflow = pub_count - cache_max - 1 - */ - - EXPECT_EQ(cache_max, (int)evts_read.size()); - EXPECT_EQ(1, (int)last_evts_read.size()); - EXPECT_EQ((pub_count - cache_max - 1), overflow); - - EXPECT_EQ(pub_count, stats_instance.read_counter( - INDEX_COUNTERS_EVENTS_PUBLISHED)); - EXPECT_EQ((pub_count - cache_max - 1), stats_instance.read_counter( - INDEX_COUNTERS_EVENTS_MISSED_CACHE)); - - events_deinit_publisher(pub_handle); - - for (int i=0; i < COUNTERS_EVENTS_TOTAL; ++i) { - string key = string("COUNTERS_EVENTS:") + counter_keys[i]; - unordered_map m; - bool key_found = false, val_found=false, val_match=false; - - if (db.exists(key)) { - try { - m = db.hgetall(key); - unordered_map::const_iterator itc = - m.find(string(EVENTS_STATS_FIELD_NAME)); - if (itc != m.end()) { - int expect = (counter_keys[i] == string(COUNTERS_EVENTS_PUBLISHED) ? - pub_count : (pub_count - cache_max - 1)); - val_match = (expect == stoi(itc->second) ? true : false); - val_found = true; - } - } - catch (exception &e) - { - printf("Failed to get key=(%s) err=(%s)", key.c_str(), e.what()); - EXPECT_TRUE(false); - } - key_found = true; - } - - if (!val_match) { - printf("key=%s key_found=%d val_found=%d fields=%d", - key.c_str(), key_found, val_found, (int)m.size()); - - printf("hgetall BEGIN key=%s", key.c_str()); - for(unordered_map::const_iterator itc = m.begin(); - itc != m.end(); ++itc) { - printf("val[%s] = (%s)", itc->first.c_str(), itc->second.c_str()); - } - printf("hgetall END\n"); - EXPECT_TRUE(false); - } - } - - stats_instance.stop(); - - delete pxy; - delete pcap; - - zmq_ctx_term(zctx); - - printf("DB TEST completed\n"); -} - - -// TODO -- Add unit tests for stats diff --git a/src/sonic-eventd/tests/main.cpp b/src/sonic-eventd/tests/main.cpp deleted file mode 100644 index 4b869e8c3004..000000000000 --- a/src/sonic-eventd/tests/main.cpp +++ /dev/null @@ -1,97 +0,0 @@ -#include "gtest/gtest.h" -#include "dbconnector.h" -#include - -using namespace std; -using namespace swss; - -string existing_file = "./tests/redis_multi_db_ut_config/database_config.json"; -string nonexisting_file = "./tests/redis_multi_db_ut_config/database_config_nonexisting.json"; -string global_existing_file = "./tests/redis_multi_db_ut_config/database_global.json"; - -#define TEST_DB "APPL_DB" -#define TEST_NAMESPACE "asic0" -#define INVALID_NAMESPACE "invalid" - -bool g_is_redis_available = false; - -class SwsscommonEnvironment : public ::testing::Environment { -public: - // Override this to define how to set up the environment - void SetUp() override { - // by default , init should be false - cout<<"Default : isInit = "<:tag\n") - return sourceTag - -def getFVMFromParams(params): - param_dict = FieldValueMap() - for key, value in params.items(): - key = str(key) - value = str(value) - param_dict[key] = value - return param_dict - -def publishEvents(line, publisher_handle): - try: - json_dict = json.loads(line) - except Exception as ex: - logging.error("JSON string not able to be parsed\n") - return - if not json_dict or len(json_dict) != 1: - logging.error("JSON string not able to be parsed\n") - return - sourceTag = list(json_dict)[0] - params = list(json_dict.values())[0] - tag = getTag(sourceTag) - param_dict = getFVMFromParams(params) - if param_dict: - event_publish(publisher_handle, tag, param_dict) - -def publishEventsFromFile(publisher_handle, infile, count, pause): - try: - with open(infile, 'r') as f: - for line in f.readlines(): - line.rstrip() - publishEvents(line, publisher_handle) - time.sleep(pause) - except Exception as ex: - logging.error("Unable to open file from given path or has incorrect json format, gives exception {}\n".format(ex)) - logging.info("Switching to default bgp state publish events\n") - publishBGPEvents(publisher_handle, count, pause) - -def publishBGPEvents(publisher_handle, count, pause): - ip_addresses = [] - param_dict = FieldValueMap() - - for _ in range(count): - ip = str(ipaddress.IPv4Address(random.randint(0, 2 ** 32))) - ip_addresses.append(ip) - - # publish down events - for ip in ip_addresses: - param_dict["ip"] = ip - param_dict["status"] = "down" - event_publish(publisher_handle, "bgp-state", param_dict) - time.sleep(pause) - - # publish up events - for ip in ip_addresses: - param_dict["ip"] = ip - event_publish(publisher_handle, "bgp-state", param_dict) - time.sleep(pause) - -def main(): - parser = argparse.ArgumentParser() - parser.add_argument("-s", "--source", nargs='?', const='test-event-source', default='test-event-source', help="Source of event, default us test-event-source") - parser.add_argument("-f", "--file", nargs='?', const='', default='', help="File containing json event strings, must be in format \'{\":foo\": {\"aaa\": \"AAA\", \"bbb\": \"BBB\"}}\'") - parser.add_argument("-c", "--count", nargs='?', type=int, const=10, default=10, help="Count of default bgp events to be generated") - parser.add_argument("-p", "--pause", nargs='?', type=float, const=0.0, default=0.0, help="Pause time wanted between each event, default is 0") - args = parser.parse_args() - publisher_handle = events_init_publisher(args.source) - if args.file == '': - publishBGPEvents(publisher_handle, args.count, args.pause) - else: - publishEventsFromFile(publisher_handle, args.file, args.count, args.pause) - -if __name__ == "__main__": - main() diff --git a/src/sonic-eventd/tools/events_tool.cpp b/src/sonic-eventd/tools/events_tool.cpp deleted file mode 100644 index 97b17c1d7566..000000000000 --- a/src/sonic-eventd/tools/events_tool.cpp +++ /dev/null @@ -1,328 +0,0 @@ -#include -#include -#include "events.h" -#include "events_common.h" - -/* - * Sample i/p file contents for send - * - * {"src_0:key-0": {"foo": "bar", "hello": "world" }} - * {"src_0:key-1": {"foo": "barXX", "hello": "world" }} - * - * Repeat the above line to increase entries. - * Each line is parsed independently, so no "," expected at the end. - */ - -#define ASSERT(res, m, ...) \ - if (!(res)) {\ - int _e = errno; \ - printf("Failed here %s:%d errno:%d zerrno:%d ", __FUNCTION__, __LINE__, _e, zmq_errno()); \ - printf(m, ##__VA_ARGS__); \ - printf("\n"); \ - exit(-1); } - - -typedef enum { - OP_INIT=0, - OP_SEND=1, - OP_RECV=2, - OP_SEND_RECV=3 //SEND|RECV -} op_t; - - -#define PRINT_CHUNK_SZ 2 - -/* - * Usage: - */ - -const char *s_usage = "\ --s - To Send\n\ --r - To receive\n\ -Note:\n\ - when both -s & -r are given:\n\ - it uses main thread to publish and fork a dedicated thread to receive.\n\ - The rest of the parameters except -w is used for send\n\ -\n\ --n - Count of messages to send/receive. When both given, it is used as count to send\n\ - Default: 1 \n\ - A value of 0 implies unlimited\n\ -\n\ --p - Count of milliseconds to pause between sends or receives. In send-recv mode, it only affects send.\n\ - Default: 0 implying no pause\n\ -\n\ - -i - List of JSON messages to send in a file, with each event/message\n\ - declared in a single line. When n is more than size of list, the list\n\ - is rotated upon completion.\n\ - e.g. '[ \n\ - { \"sonic-bgp:bgp-state\": { \"ip\": \"10.101.01.10\", \"ts\": \"2022-10-11T01:02:30.45567\", \"state\": \"up\" }}\n\ - { \"abc-xxx:yyy-zz\": { \"foo\": \"bar\", \"hello\":\"world\", \"ts\": \"2022-10-11T01:02:30.45567\"}}\n\ - { \"some-mod:some-tag\": {}}\n\ - ]\n\ - Default: \n\ -\n\ --c - Use offline cache in receive mode\n\ --o - O/p file to write received events\n\ - Default: STDOUT\n"; - - -bool term_receive = false; - -template -string -t_map_to_str(const Map &m) -{ - stringstream _ss; - string sep; - - _ss << "{"; - for (const auto elem: m) { - _ss << sep << "{" << elem.first << "," << elem.second << "}"; - if (sep.empty()) { - sep = ", "; - } - } - _ss << "}"; - return _ss.str(); -} - -void -do_receive(const event_subscribe_sources_t filter, const string outfile, int cnt, int pause, bool use_cache) -{ - int index=0, total_missed = 0; - ostream* fp = &cout; - ofstream fout; - - if (!outfile.empty()) { - fout.open(outfile); - if (!fout.fail()) { - fp = &fout; - printf("outfile=%s set\n", outfile.c_str()); - } - } - event_handle_t h = events_init_subscriber(use_cache, 2000, filter.empty() ? NULL : &filter); - printf("Subscribed with use_cache=%d timeout=2000 filter %s\n", - use_cache, filter.empty() ? "empty" : "non-empty"); - ASSERT(h != NULL, "Failed to get subscriber handle"); - - while(!term_receive) { - event_receive_op_t evt; - map_str_str_t evtOp; - - int rc = event_receive(h, evt); - if (rc != 0) { - ASSERT(rc == EAGAIN, "Failed to receive rc=%d index=%d\n", - rc, index); - continue; - } - ASSERT(!evt.key.empty(), "received EMPTY key"); - ASSERT(evt.missed_cnt >= 0, "Missed count uninitialized"); - ASSERT(evt.publish_epoch_ms > 0, "publish_epoch_ms uninitialized"); - - total_missed += evt.missed_cnt; - - evtOp[evt.key] = t_map_to_str(evt.params); - (*fp) << t_map_to_str(evtOp) << "\n"; - fp->flush(); - - if ((++index % PRINT_CHUNK_SZ) == 0) { - printf("Received index %d\n", index); - } - - if (cnt > 0) { - if (--cnt <= 0) { - break; - } - } - } - - events_deinit_subscriber(h); - printf("Total received = %d missed = %dfile:%s\n", index, total_missed, - outfile.empty() ? "STDOUT" : outfile.c_str()); -} - - -int -do_send(const string infile, int cnt, int pause) -{ - typedef struct { - string tag; - event_params_t params; - } evt_t; - - typedef vector lst_t; - - lst_t lst; - string source; - event_handle_t h; - int index = 0; - - if (!infile.empty()) { - ifstream input(infile); - - /* Read infile into list of events, that are ready for send */ - for( string line; getline( input, line ); ) - { - evt_t evt; - string str_params; - - const auto &data = nlohmann::json::parse(line); - ASSERT(data.is_object(), "Parsed data is not object"); - ASSERT((int)data.size() == 1, "string parse size = %d", (int)data.size()); - - string key(data.begin().key()); - if (source.empty()) { - source = key.substr(0, key.find(":")); - } else { - ASSERT(source == key.substr(0, key.find(":")), "source:%s read=%s", - source.c_str(), key.substr(0, key.find(":")).c_str()); - } - evt.tag = key.substr(key.find(":")+1); - - const auto &val = data.begin().value(); - ASSERT(val.is_object(), "Parsed params is not object"); - ASSERT((int)val.size() >= 1, "Expect non empty params"); - - for(auto par_it = val.begin(); par_it != val.end(); par_it++) { - evt.params[string(par_it.key())] = string(par_it.value()); - } - lst.push_back(evt); - } - } - - if (lst.empty()) { - evt_t evt = { - "test-tag", - { - { "param1", "foo"}, - {"param2", "bar"} - } - }; - lst.push_back(evt); - } - - h = events_init_publisher(source); - ASSERT(h != NULL, "failed to init publisher"); - - /* cnt = 0 as i/p implies forever */ - - while(cnt >= 0) { - /* Keep resending the list until count is exhausted */ - for(lst_t::const_iterator itc = lst.begin(); (cnt >= 0) && (itc != lst.end()); ++itc) { - const evt_t &evt = *itc; - - if ((++index % PRINT_CHUNK_SZ) == 0) { - printf("Sending index %d\n", index); - } - - int rc = event_publish(h, evt.tag, evt.params.empty() ? NULL : &evt.params); - ASSERT(rc == 0, "Failed to publish index=%d rc=%d", index, rc); - - if ((cnt > 0) && (--cnt == 0)) { - /* set to termninate */ - cnt = -1; - } - else if (pause) { - /* Pause between two sends */ - this_thread::sleep_for(chrono::milliseconds(pause)); - } - } - } - - events_deinit_publisher(h); - printf("Sent %d events\n", index); - return 0; -} - -void usage() -{ - printf("%s", s_usage); - exit(-1); -} - -int main(int argc, char **argv) -{ - bool use_cache = false; - int op = OP_INIT; - int cnt=0, pause=0; - string json_str_msg, outfile("STDOUT"), infile; - event_subscribe_sources_t filter; - - for(;;) - { - switch(getopt(argc, argv, "srn:p:i:o:f:c")) // note the colon (:) to indicate that 'b' has a parameter and is not a switch - { - case 'c': - use_cache = true; - continue; - - case 's': - op |= OP_SEND; - continue; - - case 'r': - op |= OP_RECV; - continue; - - case 'n': - cnt = stoi(optarg); - continue; - - case 'p': - pause = stoi(optarg); - continue; - - case 'i': - infile = optarg; - continue; - - case 'o': - outfile = optarg; - continue; - - case 'f': - { - stringstream ss(optarg); //create string stream from the string - while(ss.good()) { - string substr; - getline(ss, substr, ','); - filter.push_back(substr); - } - } - continue; - - case -1: - break; - - case '?': - case 'h': - default : - usage(); - break; - - } - break; - } - - - printf("op=%d n=%d pause=%d i=%s o=%s\n", - op, cnt, pause, infile.c_str(), outfile.c_str()); - - if (op == OP_SEND_RECV) { - thread thr(&do_receive, filter, outfile, 0, 0, use_cache); - do_send(infile, cnt, pause); - } - else if (op == OP_SEND) { - do_send(infile, cnt, pause); - } - else if (op == OP_RECV) { - do_receive(filter, outfile, cnt, pause, use_cache); - } - else { - ASSERT(false, "Elect -s for send or -r receive or both; Bailing out with no action\n"); - } - - printf("--------- END: Good run -----------------\n"); - return 0; -} - diff --git a/src/sonic-eventd/tools/events_volume_test.py b/src/sonic-eventd/tools/events_volume_test.py deleted file mode 100644 index 73143d483cd8..000000000000 --- a/src/sonic-eventd/tools/events_volume_test.py +++ /dev/null @@ -1,68 +0,0 @@ -import sys -import subprocess -import time -import logging -import argparse - -logging.basicConfig( - level=logging.INFO, - format="%(asctime)s [%(levelname)s] %(message)s", - handlers = [ - logging.FileHandler("debug.log"), - logging.StreamHandler(sys.stdout) - ] -) - -def read_events_from_file(file, count): - logging.info("Reading from file generated by events_tool") - lines = 0 - with open(file, 'r') as infile: - lines = infile.readlines() - logging.info("Should receive {} events and got {} events\n".format(count, len(lines))) - assert len(lines) == count - -def start_tool(file): - logging.info("Starting events_tool\n") - proc = subprocess.Popen(["./events_tool", "-r", "-o", file]) - return proc - -def run_test(process, file, count, duplicate): - # log messages to see if events have been received - tool_proc = start_tool(file) - - time.sleep(2) # buffer for events_tool to startup - logging.info("Generating logger messages\n") - for i in range(count): - line = "" - state = "up" - if duplicate: - line = "{} test message testmessage state up".format(process) - else: - if i % 2 != 1: - state = "down" - line = "{} test message testmessage{} state {}".format(process, i, state) - command = "logger -p local0.notice -t {}".format(line) - subprocess.run(command, shell=True, stdout=subprocess.PIPE) - - time.sleep(2) # some buffer for all events to be published to file - read_events_from_file(file, count) - tool_proc.terminate() - -def main(): - parser = argparse.ArgumentParser() - parser.add_argument("-p", "--process", nargs='?', const ='', default='', help="Process that is spitting out log") - parser.add_argument("-f", "--file", nargs='?', const='', default='', help="File used by events_tool to read events from") - parser.add_argument("-c", "--count", type=int, nargs='?', const=1000, default=1000, help="Count of times log message needs to be published down/up, default is 1000") - args = parser.parse_args() - if(args.process == '' or args.file == ''): - logging.error("Invalid process or logfile\n") - return - logging.info("Starting volume test\n") - logging.info("Generating {} unique messages for rsyslog plugin\n".format(args.count)) - run_test(args.process, args.file, args.count, False) - time.sleep(2) - logging.info("Restarting volume test but for duplicate log messages\n") - run_test(args.process, args.file, args.count, True) - -if __name__ == "__main__": - main() diff --git a/src/sonic-eventd/tools/sample_ip.json b/src/sonic-eventd/tools/sample_ip.json deleted file mode 100644 index acb8726cf253..000000000000 --- a/src/sonic-eventd/tools/sample_ip.json +++ /dev/null @@ -1 +0,0 @@ -{"src_0:key-0": {"foo": "bar", "hello": "world" }} diff --git a/src/sonic-eventd/tools/subdir.mk b/src/sonic-eventd/tools/subdir.mk deleted file mode 100644 index 5f13043dd612..000000000000 --- a/src/sonic-eventd/tools/subdir.mk +++ /dev/null @@ -1,12 +0,0 @@ -CC := g++ - -TOOL_OBJS = ./tools/events_tool.o - -C_DEPS += ./tools/events_tool.d - -tools/%.o: tools/%.cpp - @echo 'Building file: $<' - @echo 'Invoking: GCC C++ Compiler' - $(CC) -D__FILENAME__="$(subst tools/,,$<)" $(CFLAGS) -c -fmessage-length=0 -MMD -MP -MF"$(@:%.o=%.d)" -MT"$(@)" -o "$@" "$<" - @echo 'Finished building: $<' - @echo ' '