diff --git a/dockers/docker-eventd/Dockerfile.j2 b/dockers/docker-eventd/Dockerfile.j2 new file mode 100644 index 000000000000..8d935dc9f365 --- /dev/null +++ b/dockers/docker-eventd/Dockerfile.j2 @@ -0,0 +1,36 @@ +{% from "dockers/dockerfile-macros.j2" import install_debian_packages, install_python_wheels, copy_files %} +FROM docker-config-engine-bullseye-{{DOCKER_USERNAME}}:{{DOCKER_USERTAG}} + +ARG docker_container_name +ARG image_version +RUN [ -f /etc/rsyslog.conf ] && sed -ri "s/%syslogtag%/$docker_container_name#%syslogtag%/;" /etc/rsyslog.conf + +# Make apt-get non-interactive +ENV DEBIAN_FRONTEND=noninteractive + +# Pass the image_version to container +ENV IMAGE_VERSION=$image_version + +# Update apt's cache of available packages +RUN apt-get update + +{% if docker_eventd_debs.strip() -%} +# Copy built Debian packages +{{ copy_files("debs/", docker_eventd_debs.split(' '), "/debs/") }} + +# Install built Debian packages and implicitly install their dependencies +{{ install_debian_packages(docker_eventd_debs.split(' ')) }} +{%- endif %} + +# Clean up +RUN apt-get clean -y && \ + apt-get autoclean -y && \ + apt-get autoremove -y && \ + rm -rf /debs + +COPY ["start.sh", "/usr/bin/"] +COPY ["supervisord.conf", "/etc/supervisor/conf.d/"] +COPY ["files/supervisor-proc-exit-listener", "/usr/bin"] +COPY ["critical_processes", "/etc/supervisor"] + +ENTRYPOINT ["/usr/local/bin/supervisord"] diff --git a/dockers/docker-eventd/critical_processes b/dockers/docker-eventd/critical_processes new file mode 100644 index 000000000000..8ff28edbc148 --- /dev/null +++ b/dockers/docker-eventd/critical_processes @@ -0,0 +1 @@ +program:eventd diff --git a/dockers/docker-eventd/start.sh b/dockers/docker-eventd/start.sh new file mode 100755 index 000000000000..60cd6a00aecb --- /dev/null +++ b/dockers/docker-eventd/start.sh @@ -0,0 +1,6 @@ +#!/usr/bin/env bash + +if [ "${RUNTIME_OWNER}" == "" ]; then + RUNTIME_OWNER="kube" +fi + diff --git a/dockers/docker-eventd/supervisord.conf b/dockers/docker-eventd/supervisord.conf new file mode 100644 index 000000000000..5d9a50bca2ae --- /dev/null +++ b/dockers/docker-eventd/supervisord.conf @@ -0,0 +1,52 @@ +[supervisord] +logfile_maxbytes=1MB +logfile_backups=2 +nodaemon=true + +[eventlistener:dependent-startup] +command=python3 -m supervisord_dependent_startup +autostart=true +autorestart=unexpected +startretries=0 +exitcodes=0,3 +events=PROCESS_STATE +buffer_size=1024 + +[eventlistener:supervisor-proc-exit-listener] +command=/usr/bin/supervisor-proc-exit-listener --container-name eventd +events=PROCESS_STATE_EXITED,PROCESS_STATE_RUNNING +autostart=true +autorestart=unexpected +buffer_size=1024 + +[program:rsyslogd] +command=/usr/sbin/rsyslogd -n -iNONE +priority=1 +autostart=false +autorestart=unexpected +stdout_logfile=syslog +stderr_logfile=syslog +dependent_startup=true + +[program:start] +command=/usr/bin/start.sh +priority=2 +autostart=false +autorestart=false +startsecs=0 +stdout_logfile=syslog +stderr_logfile=syslog +dependent_startup=true +dependent_startup_wait_for=rsyslogd:running + + +[program:eventd] +command=/usr/sbin/eventd +priority=3 +autostart=false +autorestart=false +stdout_logfile=syslog +stderr_logfile=syslog +dependent_startup=true +dependent_startup_wait_for=start:exited + diff --git a/dockers/docker-fpm-frr/Dockerfile.j2 b/dockers/docker-fpm-frr/Dockerfile.j2 index ad665e71ceae..fd7ad0f08ed4 100644 --- a/dockers/docker-fpm-frr/Dockerfile.j2 +++ b/dockers/docker-fpm-frr/Dockerfile.j2 @@ -55,9 +55,15 @@ COPY ["TSC", "/usr/bin/TSC"] COPY ["TS", "/usr/bin/TS"] COPY ["files/supervisor-proc-exit-listener", "/usr/bin"] COPY ["zsocket.sh", "/usr/bin/"] +COPY ["*.json", "/etc/rsyslog.d/"] +COPY ["files/rsyslog_plugin.conf.j2", "/etc/rsyslog.d/"] RUN chmod a+x /usr/bin/TSA && \ chmod a+x /usr/bin/TSB && \ chmod a+x /usr/bin/TSC && \ chmod a+x /usr/bin/zsocket.sh +RUN j2 -f json /etc/rsyslog.d/rsyslog_plugin.conf.j2 /etc/rsyslog.d/events_info.json > /etc/rsyslog.d/bgp_events.conf +RUN rm -f /etc/rsyslog.d/rsyslog_plugin.conf.j2* +RUN rm -f /etc/rsyslog.d/events_info.json* + ENTRYPOINT ["/usr/bin/docker_init.sh"] diff --git a/dockers/docker-fpm-frr/bgp_regex.json b/dockers/docker-fpm-frr/bgp_regex.json new file mode 100644 index 000000000000..898b5b060ebe --- /dev/null +++ b/dockers/docker-fpm-frr/bgp_regex.json @@ -0,0 +1,8 @@ +[ + { + "tag": "bgp-state", + "regex": "Peer .default\\|([0-9a-f:.]*[0-9a-f]*). admin state is set to .(up|down).", + "params": [ "ip", "status" ] + } +] + diff --git a/dockers/docker-fpm-frr/events_info.json b/dockers/docker-fpm-frr/events_info.json new file mode 100644 index 000000000000..66fa9a727ae2 --- /dev/null +++ b/dockers/docker-fpm-frr/events_info.json @@ -0,0 +1,10 @@ +{ + "yang_module": "sonic-events-bgp", + "proclist": [ + { + "name": "bgp", + "parse_json": "bgp_regex.json" + } + ] +} + diff --git a/files/build_templates/docker_image_ctl.j2 b/files/build_templates/docker_image_ctl.j2 index 99051ee62d8c..a77706cad497 100644 --- a/files/build_templates/docker_image_ctl.j2 +++ b/files/build_templates/docker_image_ctl.j2 @@ -515,6 +515,7 @@ start() { {%- endif -%} {%- if docker_container_name == "bgp" %} -v /etc/sonic/frr/$DEV:/etc/frr:rw \ + -v /usr/share/sonic/scripts:/usr/share/sonic/scripts:ro \ {%- endif %} {%- if docker_container_name == "database" %} $DB_OPT \ diff --git a/files/build_templates/eventd.service.j2 b/files/build_templates/eventd.service.j2 new file mode 100644 index 000000000000..0ad7f52ee83d --- /dev/null +++ b/files/build_templates/eventd.service.j2 @@ -0,0 +1,17 @@ +[Unit] +Description=EVENTD container +Requires=updategraph.service +After=updategraph.service +BindsTo=sonic.target +After=sonic.target +StartLimitIntervalSec=1200 +StartLimitBurst=3 + +[Service] +ExecStartPre=/usr/bin/{{docker_container_name}}.sh start +ExecStart=/usr/bin/{{docker_container_name}}.sh wait +ExecStop=/usr/bin/{{docker_container_name}}.sh stop +RestartSec=30 + +[Install] +WantedBy=sonic.target diff --git a/files/build_templates/init_cfg.json.j2 b/files/build_templates/init_cfg.json.j2 index 7de0ad977807..8e92807f4e2c 100644 --- a/files/build_templates/init_cfg.json.j2 +++ b/files/build_templates/init_cfg.json.j2 @@ -39,6 +39,7 @@ ("pmon", "enabled", false, "enabled"), ("radv", "enabled", false, "enabled"), ("snmp", "enabled", true, "enabled"), + ("eventd", "enabled", true, "enabled"), ("swss", "enabled", false, "enabled"), ("syncd", "enabled", false, "enabled"), ("teamd", "enabled", false, "enabled")] %} @@ -69,7 +70,7 @@ "check_up_status" : "false", {%- endif %} {%- if include_kubernetes == "y" %} -{%- if feature in ["lldp", "pmon", "radv", "snmp", "telemetry"] %} +{%- if feature in ["lldp", "pmon", "radv", "eventd", "snmp", "telemetry"] %} "set_owner": "kube", {% else %} "set_owner": "local", {% endif %} {% endif %} "high_mem_alert": "disabled" diff --git a/files/build_templates/rsyslog_plugin.conf.j2 b/files/build_templates/rsyslog_plugin.conf.j2 new file mode 100644 index 000000000000..ec19c62a78f6 --- /dev/null +++ b/files/build_templates/rsyslog_plugin.conf.j2 @@ -0,0 +1,19 @@ +## rsyslog-plugin for streaming telemetry via gnmi + + + +template(name="prog_msg" type="list") { + property(name="msg") + constant(value="\n") +} + +$ModLoad omprog + +{% for proc in proclist %} +if re_match($programname, "{{ proc.name }}") then { + action(type="omprog" + binary="/usr/share/sonic/scripts/rsyslog_plugin -r /etc/rsyslog.d/{{ proc.parse_json }} -m {{ yang_module }}" + output="/var/log/rsyslog_plugin.log" + template="prog_msg") +} +{% endfor %} diff --git a/files/build_templates/sonic_debian_extension.j2 b/files/build_templates/sonic_debian_extension.j2 index 4b7a77b3151c..56b8290cc12e 100644 --- a/files/build_templates/sonic_debian_extension.j2 +++ b/files/build_templates/sonic_debian_extension.j2 @@ -799,6 +799,10 @@ sudo bash -c "echo { > $FILESYSTEM_ROOT_USR_SHARE_SONIC_TEMPLATES/ctr_image_name {% endfor %} sudo bash -c "echo } >> $FILESYSTEM_ROOT_USR_SHARE_SONIC_TEMPLATES/ctr_image_names.json" +# copy rsyslog plugin binary for use by all dockers that use plugin to publish events. +sudo mkdir -p ${FILESYSTEM_ROOT_USR_SHARE_SONIC_SCRIPTS} +sudo cp ${files_path}/rsyslog_plugin ${FILESYSTEM_ROOT_USR_SHARE_SONIC_SCRIPTS}/ + {% for script in installer_start_scripts.split(' ') -%} if [ -f $TARGET_MACHINE"_{{script}}" ]; then sudo cp $TARGET_MACHINE"_{{script}}" $FILESYSTEM_ROOT/usr/bin/{{script}} diff --git a/rules/docker-config-engine-bullseye.mk b/rules/docker-config-engine-bullseye.mk index c125aa65b209..ea0ae43b54b9 100644 --- a/rules/docker-config-engine-bullseye.mk +++ b/rules/docker-config-engine-bullseye.mk @@ -8,13 +8,15 @@ $(DOCKER_CONFIG_ENGINE_BULLSEYE)_DEPENDS += $(LIBSWSSCOMMON) \ $(LIBYANG_CPP) \ $(LIBYANG_PY3) \ $(PYTHON3_SWSSCOMMON) \ - $(SONIC_DB_CLI) + $(SONIC_DB_CLI) \ + $(SONIC_EVENTD) $(DOCKER_CONFIG_ENGINE_BULLSEYE)_PYTHON_WHEELS += $(SONIC_PY_COMMON_PY3) \ $(SONIC_YANG_MGMT_PY3) \ $(SONIC_YANG_MODELS_PY3) $(DOCKER_CONFIG_ENGINE_BULLSEYE)_PYTHON_WHEELS += $(SONIC_CONFIG_ENGINE_PY3) $(DOCKER_CONFIG_ENGINE_BULLSEYE)_LOAD_DOCKERS += $(DOCKER_BASE_BULLSEYE) $(DOCKER_CONFIG_ENGINE_BULLSEYE)_FILES += $(SWSS_VARS_TEMPLATE) +$(DOCKER_CONFIG_ENGINE_BULLSEYE)_FILES += $(RSYSLOG_PLUGIN_CONF_J2) $(DOCKER_CONFIG_ENGINE_BULLSEYE)_FILES += $($(SONIC_CTRMGRD)_CONTAINER_SCRIPT) $(DOCKER_CONFIG_ENGINE_BULLSEYE)_DBG_DEPENDS = $($(DOCKER_BASE_BULLSEYE)_DBG_DEPENDS) \ diff --git a/rules/docker-config-engine-buster.mk b/rules/docker-config-engine-buster.mk index ae5589a59595..38a94bae4c1d 100644 --- a/rules/docker-config-engine-buster.mk +++ b/rules/docker-config-engine-buster.mk @@ -15,6 +15,7 @@ $(DOCKER_CONFIG_ENGINE_BUSTER)_PYTHON_WHEELS += $(SONIC_PY_COMMON_PY3) \ $(DOCKER_CONFIG_ENGINE_BUSTER)_PYTHON_WHEELS += $(SONIC_CONFIG_ENGINE_PY3) $(DOCKER_CONFIG_ENGINE_BUSTER)_LOAD_DOCKERS += $(DOCKER_BASE_BUSTER) $(DOCKER_CONFIG_ENGINE_BUSTER)_FILES += $(SWSS_VARS_TEMPLATE) +$(DOCKER_CONFIG_ENGINE_BUSTER)_FILES += $(RSYSLOG_PLUGIN_CONF_J2) $(DOCKER_CONFIG_ENGINE_BUSTER)_FILES += $($(SONIC_CTRMGRD)_CONTAINER_SCRIPT) $(DOCKER_CONFIG_ENGINE_BUSTER)_DBG_DEPENDS = $($(DOCKER_BASE_BUSTER)_DBG_DEPENDS) \ diff --git a/rules/docker-eventd.dep b/rules/docker-eventd.dep new file mode 100644 index 000000000000..382513e5eb82 --- /dev/null +++ b/rules/docker-eventd.dep @@ -0,0 +1,11 @@ + +DPATH := $($(DOCKER_EVENTD)_PATH) +DEP_FILES := $(SONIC_COMMON_FILES_LIST) rules/docker-eventd.mk rules/docker-eventd.dep +DEP_FILES += $(SONIC_COMMON_BASE_FILES_LIST) +DEP_FILES += $(shell git ls-files $(DPATH)) + +$(DOCKER_EVENTD)_CACHE_MODE := GIT_CONTENT_SHA +$(DOCKER_EVENTD)_DEP_FLAGS := $(SONIC_COMMON_FLAGS_LIST) +$(DOCKER_EVENTD)_DEP_FILES := $(DEP_FILES) + +$(eval $(call add_dbg_docker,$(DOCKER_EVENTD),$(DOCKER_EVENTD_DBG))) diff --git a/rules/docker-eventd.mk b/rules/docker-eventd.mk new file mode 100644 index 000000000000..c69fee09e569 --- /dev/null +++ b/rules/docker-eventd.mk @@ -0,0 +1,47 @@ +# docker image for eventd + +DOCKER_EVENTD_STEM = docker-eventd +DOCKER_EVENTD = $(DOCKER_EVENTD_STEM).gz +DOCKER_EVENTD_DBG = $(DOCKER_EVENTD_STEM)-$(DBG_IMAGE_MARK).gz + +$(DOCKER_EVENTD)_DEPENDS += $(SONIC_EVENTD) + +$(DOCKER_EVENTD)_DBG_DEPENDS = $($(DOCKER_CONFIG_ENGINE_BULLSEYE)_DBG_DEPENDS) +$(DOCKER_EVENTD)_DBG_DEPENDS += $(SONIC_EVENTD_DBG) $(LIBSWSSCOMMON_DBG) + +$(DOCKER_EVENTD)_DBG_IMAGE_PACKAGES = $($(DOCKER_CONFIG_ENGINE_BULLSEYE)_DBG_IMAGE_PACKAGES) + +$(DOCKER_EVENTD)_LOAD_DOCKERS = $(DOCKER_CONFIG_ENGINE_BULLSEYE) + +$(DOCKER_EVENTD)_PATH = $(DOCKERS_PATH)/$(DOCKER_EVENTD_STEM) + +$(DOCKER_EVENTD)_INSTALL_PYTHON_WHEELS = $(SONIC_UTILITIES_PY3) +$(DOCKER_EVENTD)_INSTALL_DEBS = $(PYTHON3_SWSSCOMMON) + +$(DOCKER_EVENTD)_VERSION = 1.0.0 +$(DOCKER_EVENTD)_PACKAGE_NAME = eventd + +$(DOCKER_DHCP)_SERVICE_REQUIRES = updategraph +$(DOCKER_DHCP)_SERVICE_AFTER = database + +SONIC_DOCKER_IMAGES += $(DOCKER_EVENTD) +SONIC_INSTALL_DOCKER_IMAGES += $(DOCKER_EVENTD) + +SONIC_DOCKER_DBG_IMAGES += $(DOCKER_EVENTD_DBG) +SONIC_INSTALL_DOCKER_DBG_IMAGES += $(DOCKER_EVENTD_DBG) + +$(DOCKER_EVENTD)_CONTAINER_NAME = eventd +$(DOCKER_EVENTD)_RUN_OPT += --privileged -t +$(DOCKER_EVENTD)_RUN_OPT += -v /etc/sonic:/etc/sonic:ro + +SONIC_BULLSEYE_DOCKERS += $(DOCKER_EVENTD) +SONIC_BULLSEYE_DBG_DOCKERS += $(DOCKER_EVENTD_DBG) + +$(DOCKER_EVENTD)_FILESPATH = $($(SONIC_EVENTD)_SRC_PATH)/rsyslog_plugin + +$(DOCKER_EVENTD)_PLUGIN = rsyslog_plugin +$($(DOCKER_EVENTD)_PLUGIN)_PATH = $($(DOCKER_EVENTD)_FILESPATH) + +SONIC_COPY_FILES += $($(DOCKER_EVENTD)_PLUGIN) +$(DOCKER_EVENTD)_SHARED_FILES = $($(DOCKER_EVENTD)_PLUGIN) + diff --git a/rules/eventd.dep b/rules/eventd.dep new file mode 100644 index 000000000000..12f32a30f2c7 --- /dev/null +++ b/rules/eventd.dep @@ -0,0 +1,10 @@ + +SPATH := $($(SONIC_EVENTD)_SRC_PATH) +DEP_FILES := $(SONIC_COMMON_FILES_LIST) rules/eventd.mk rules/eventd.dep +DEP_FILES += $(SONIC_COMMON_BASE_FILES_LIST) +DEP_FILES := $(addprefix $(SPATH)/,$(shell cd $(SPATH) && git ls-files)) + +$(SONIC_EVENTD)_CACHE_MODE := GIT_CONTENT_SHA +$(SONIC_EVENTD)_DEP_FLAGS := $(SONIC_COMMON_FLAGS_LIST) +$(SONIC_EVENTD)_DEP_FILES := $(DEP_FILES) + diff --git a/rules/eventd.mk b/rules/eventd.mk new file mode 100644 index 000000000000..9eea21a4cfb5 --- /dev/null +++ b/rules/eventd.mk @@ -0,0 +1,19 @@ +# eventd package + +SONIC_EVENTD_VERSION = 1.0.0-0 +SONIC_EVENTD_PKG_NAME = eventd + +SONIC_EVENTD = sonic-$(SONIC_EVENTD_PKG_NAME)_$(SONIC_EVENTD_VERSION)_$(CONFIGURED_ARCH).deb +$(SONIC_EVENTD)_SRC_PATH = $(SRC_PATH)/sonic-eventd +$(SONIC_EVENTD)_DEPENDS += $(LIBSWSSCOMMON) $(LIBSWSSCOMMON_DEV) + +SONIC_DPKG_DEBS += $(SONIC_EVENTD) + +SONIC_EVENTD_DBG = sonic-$(SONIC_EVENTD_PKG_NAME)-dbgsym_$(SONIC_EVENTD_VERSION)_$(CONFIGURED_ARCH).deb +$(eval $(call add_derived_package,$(SONIC_EVENTD),$(SONIC_EVENTD_DBG))) + +# The .c, .cpp, .h & .hpp files under src/{$DBG_SRC_ARCHIVE list} +# are archived into debug one image to facilitate debugging. +# +DBG_SRC_ARCHIVE += sonic-eventd + diff --git a/rules/scripts.mk b/rules/scripts.mk index ce6a8eb90025..12919d520b09 100644 --- a/rules/scripts.mk +++ b/rules/scripts.mk @@ -32,6 +32,9 @@ $(SWSS_VARS_TEMPLATE)_PATH = files/build_templates COPP_CONFIG_TEMPLATE = copp_cfg.j2 $(COPP_CONFIG_TEMPLATE)_PATH = files/image_config/copp +RSYSLOG_PLUGIN_CONF_J2 = rsyslog_plugin.conf.j2 +$(RSYSLOG_PLUGIN_CONF_J2)_PATH = files/build_templates + SONIC_COPY_FILES += $(CONFIGDB_LOAD_SCRIPT) \ $(ARP_UPDATE_SCRIPT) \ $(ARP_UPDATE_VARS_TEMPLATE) \ @@ -42,4 +45,5 @@ SONIC_COPY_FILES += $(CONFIGDB_LOAD_SCRIPT) \ $(SYSCTL_NET_CONFIG) \ $(UPDATE_CHASSISDB_CONFIG_SCRIPT) \ $(SWSS_VARS_TEMPLATE) \ + $(RSYSLOG_PLUGIN_CONF_J2) \ $(COPP_CONFIG_TEMPLATE) diff --git a/rules/telemetry.mk b/rules/telemetry.mk index 24fe4ae2fe52..942e9797726a 100644 --- a/rules/telemetry.mk +++ b/rules/telemetry.mk @@ -2,6 +2,7 @@ SONIC_TELEMETRY = sonic-gnmi_0.1_$(CONFIGURED_ARCH).deb $(SONIC_TELEMETRY)_SRC_PATH = $(SRC_PATH)/sonic-gnmi -$(SONIC_TELEMETRY)_DEPENDS = $(SONIC_MGMT_COMMON) $(SONIC_MGMT_COMMON_CODEGEN) -$(SONIC_TELEMETRY)_RDEPENDS = +$(SONIC_TELEMETRY)_DEPENDS = $(SONIC_MGMT_COMMON) $(SONIC_MGMT_COMMON_CODEGEN) \ + $(LIBSWSSCOMMON_DEV) $(LIBSWSSCOMMON) +$(SONIC_TELEMETRY)_RDEPENDS = $(LIBSWSSCOMMON) $(LIBSWSSCOMMON_DEV) SONIC_DPKG_DEBS += $(SONIC_TELEMETRY) diff --git a/slave.mk b/slave.mk index 7cdee954ad73..f720061b2e52 100644 --- a/slave.mk +++ b/slave.mk @@ -1292,6 +1292,8 @@ $(addprefix $(TARGET_PATH)/, $(SONIC_INSTALLERS)) : $(TARGET_PATH)/% : \ $(if $($(docker:-dbg.gz=.gz)_MACHINE),\ mv $($(docker:-dbg.gz=.gz)_CONTAINER_NAME).sh $($(docker:-dbg.gz=.gz)_MACHINE)_$($(docker:-dbg.gz=.gz)_CONTAINER_NAME).sh ) + $(foreach file, $($(docker)_SHARED_FILES), \ + { cp $($(file)_PATH)/$(file) $(FILES_PATH)/ $(LOG) || exit 1 ; } ; ) ) # Exported variables are used by sonic_debian_extension.sh diff --git a/src/sonic-eventd/Makefile b/src/sonic-eventd/Makefile new file mode 100644 index 000000000000..00d3199a65bc --- /dev/null +++ b/src/sonic-eventd/Makefile @@ -0,0 +1,84 @@ +RM := rm -rf +EVENTD_TARGET := eventd +EVENTD_TEST := tests/tests +EVENTD_TOOL := tools/events_tool +EVENTD_PUBLISH_TOOL := tools/events_publish_tool.py +RSYSLOG-PLUGIN_TARGET := rsyslog_plugin/rsyslog_plugin +RSYSLOG-PLUGIN_TEST := rsyslog_plugin_tests/tests +CP := cp +MKDIR := mkdir +CC := g++ +LIBS := -levent -lhiredis -lswsscommon -lpthread -lboost_thread -lboost_system -lzmq -lboost_serialization -luuid -llua5.1 +TEST_LIBS := -L/usr/src/gtest -lgtest -lgtest_main -lgmock -lgmock_main + +CFLAGS += -Wall -std=c++17 -fPIE -I$(PWD)/../sonic-swss-common/common +PWD := $(shell pwd) + +ifneq ($(MAKECMDGOALS),clean) +ifneq ($(strip $(C_DEPS)),) +-include $(C_DEPS) $(OBJS) +endif +endif + +-include src/subdir.mk +-include tests/subdir.mk +-include tools/subdir.mk +-include rsyslog_plugin/subdir.mk +-include rsyslog_plugin_tests/subdir.mk + +all: sonic-eventd eventd-tests eventd-tool rsyslog-plugin rsyslog-plugin-tests + +sonic-eventd: $(OBJS) + @echo 'Building target: $@' + @echo 'Invoking: G++ Linker' + $(CC) $(LDFLAGS) -o $(EVENTD_TARGET) $(OBJS) $(LIBS) + @echo 'Finished building target: $@' + @echo ' ' + +eventd-tool: $(TOOL_OBJS) + @echo 'Building target: $@' + @echo 'Invoking: G++ Linker' + $(CC) $(LDFLAGS) -o $(EVENTD_TOOL) $(TOOL_OBJS) $(LIBS) + @echo 'Finished building target: $@' + @echo ' ' + +rsyslog-plugin: $(RSYSLOG-PLUGIN_OBJS) + @echo 'Buidling Target: $@' + @echo 'Invoking: G++ Linker' + $(CC) $(LDFLAGS) -o $(RSYSLOG-PLUGIN_TARGET) $(RSYSLOG-PLUGIN_OBJS) $(LIBS) + @echo 'Finished building target: $@' + @echo ' ' + +eventd-tests: $(TEST_OBJS) + @echo 'Building target: $@' + @echo 'Invoking: G++ Linker' + $(CC) $(LDFLAGS) -o $(EVENTD_TEST) $(TEST_OBJS) $(LIBS) $(TEST_LIBS) + @echo 'Finished building target: $@' + $(EVENTD_TEST) + @echo 'Finished running tests' + @echo ' ' + +rsyslog-plugin-tests: $(RSYSLOG-PLUGIN-TEST_OBJS) + @echo 'BUILDING target: $@' + @echo 'Invoking G++ Linker' + $(CC) $(LDFLAGS) -o $(RSYSLOG-PLUGIN_TEST) $(RSYSLOG-PLUGIN-TEST_OBJS) $(LIBS) $(TEST_LIBS) + @echo 'Finished building target: $@' + $(RSYSLOG-PLUGIN_TEST) + @echo 'Finished running tests' + @echo ' ' + +install: + $(MKDIR) -p $(DESTDIR)/usr/sbin + $(CP) $(EVENTD_TARGET) $(DESTDIR)/usr/sbin + $(CP) $(EVENTD_TOOL) $(DESTDIR)/usr/sbin + $(CP) $(EVENTD_PUBLISH_TOOL) $(DESTDIR)/usr/sbin + +deinstall: + $(RM) $(DESTDIR)/usr/sbin/$(EVENTD_TARGET) + $(RM) $(DESTDIR)/usr/sbin/$(RSYSLOG-PLUGIN_TARGET) + $(RM) -rf $(DESTDIR)/usr/sbin + +clean: + -@echo ' ' + +.PHONY: all clean dependents diff --git a/src/sonic-eventd/debian/changelog b/src/sonic-eventd/debian/changelog new file mode 100644 index 000000000000..eba3bf10ea53 --- /dev/null +++ b/src/sonic-eventd/debian/changelog @@ -0,0 +1,5 @@ +sonic-eventd (1.0.0-0) UNRELEASED; urgency=medium + + * Initial release. + +-- Renuka Manavalan diff --git a/src/sonic-eventd/debian/compat b/src/sonic-eventd/debian/compat new file mode 100644 index 000000000000..48082f72f087 --- /dev/null +++ b/src/sonic-eventd/debian/compat @@ -0,0 +1 @@ +12 diff --git a/src/sonic-eventd/debian/control b/src/sonic-eventd/debian/control new file mode 100644 index 000000000000..95ae6fd76452 --- /dev/null +++ b/src/sonic-eventd/debian/control @@ -0,0 +1,14 @@ +Source: sonic-eventd +Section: devel +Priority: optional +Maintainer: Renuka Manavalan +Build-Depends: debhelper (>= 12.0.0), libevent-dev, libboost-thread-dev, libboost-system-dev, libswsscommon-dev +Standards-Version: 3.9.3 +Homepage: https://github.com/Azure/sonic-buildimage +XS-Go-Import-Path: github.com/Azure/sonic-buildimage + +Package: sonic-eventd +Architecture: any +Built-Using: ${misc:Built-Using} +Depends: ${shlibs:Depends} +Description: SONiC event service diff --git a/src/sonic-eventd/debian/rules b/src/sonic-eventd/debian/rules new file mode 100755 index 000000000000..ac2cd63889ef --- /dev/null +++ b/src/sonic-eventd/debian/rules @@ -0,0 +1,6 @@ +#!/usr/bin/make -f + +export DEB_BUILD_MAINT_OPTIONS=hardening=+all + +%: + dh $@ --parallel diff --git a/src/sonic-eventd/rsyslog_plugin/main.cpp b/src/sonic-eventd/rsyslog_plugin/main.cpp new file mode 100644 index 000000000000..53162608c5a9 --- /dev/null +++ b/src/sonic-eventd/rsyslog_plugin/main.cpp @@ -0,0 +1,57 @@ +#include +#include +#include +#include "rsyslog_plugin.h" + +#define SUCCESS_CODE 0 +#define INVALID_REGEX_ERROR_CODE 1 +#define EVENT_INIT_PUBLISH_ERROR_CODE 2 +#define MISSING_ARGS_ERROR_CODE 3 + +void showUsage() { + cout << "Usage for rsyslog_plugin: \n" << "options\n" + << "\t-r,required,type=string\t\tPath to regex file\n" + << "\t-m,required,type=string\t\tYANG module name of source generating syslog message\n" + << "\t-h \t\tHelp" + << endl; +} + +int main(int argc, char** argv) { + string regexPath; + string moduleName; + int optionVal; + + while((optionVal = getopt(argc, argv, "r:m:h")) != -1) { + switch(optionVal) { + case 'r': + regexPath = optarg; + break; + case 'm': + moduleName = optarg; + break; + case 'h': + case '?': + default: + showUsage(); + return 1; + } + } + + if(regexPath.empty() || moduleName.empty()) { // Missing required rc path + cerr << "Error: Missing regexPath and moduleName." << endl; + return MISSING_ARGS_ERROR_CODE; + } + + unique_ptr plugin(new RsyslogPlugin(moduleName, regexPath)); + int returnCode = plugin->onInit(); + if(returnCode == INVALID_REGEX_ERROR_CODE) { + SWSS_LOG_ERROR("Rsyslog plugin was not able to be initialized due to invalid regex file provided.\n"); + return returnCode; + } else if(returnCode == EVENT_INIT_PUBLISH_ERROR_CODE) { + SWSS_LOG_ERROR("Rsyslog plugin was not able to be initialized due to event_init_publish call failing.\n"); + return returnCode; + } + + plugin->run(); + return SUCCESS_CODE; +} diff --git a/src/sonic-eventd/rsyslog_plugin/rsyslog_plugin.cpp b/src/sonic-eventd/rsyslog_plugin/rsyslog_plugin.cpp new file mode 100644 index 000000000000..3786c5f0fea9 --- /dev/null +++ b/src/sonic-eventd/rsyslog_plugin/rsyslog_plugin.cpp @@ -0,0 +1,135 @@ +#include +#include +#include +#include +#include +#include +#include "rsyslog_plugin.h" +#include "json.hpp" + +using json = nlohmann::json; + +bool RsyslogPlugin::onMessage(string msg, lua_State* luaState) { + string tag; + event_params_t paramDict; + if(!m_parser->parseMessage(msg, tag, paramDict, luaState)) { + SWSS_LOG_DEBUG("%s was not able to be parsed into a structured event\n", msg.c_str()); + return false; + } else { + int returnCode = event_publish(m_eventHandle, tag, ¶mDict); + if(returnCode != 0) { + SWSS_LOG_ERROR("rsyslog_plugin was not able to publish event for %s.\n", tag.c_str()); + return false; + } + return true; + } +} + +void parseParams(vector params, vector& eventParams) { + for(long unsigned int i = 0; i < params.size(); i++) { + if(params[i].empty()) { + SWSS_LOG_ERROR("Empty param provided in regex file\n"); + continue; + } + EventParam ep = EventParam(); + auto delimPos = params[i].find(':'); + if(delimPos == string::npos) { // no lua code + ep.paramName = params[i]; + } else { + ep.paramName = params[i].substr(0, delimPos); + ep.luaCode = params[i].substr(delimPos + 1); + if(ep.luaCode.empty()) { + SWSS_LOG_ERROR("Lua code missing after :\n"); + } + } + eventParams.push_back(ep); + } +} + +bool RsyslogPlugin::createRegexList() { + fstream regexFile; + json jsonList = json::array(); + regexFile.open(m_regexPath, ios::in); + if (!regexFile) { + SWSS_LOG_ERROR("No such path exists: %s for source %s\n", m_regexPath.c_str(), m_moduleName.c_str()); + return false; + } + try { + regexFile >> jsonList; + } catch (invalid_argument& iaException) { + SWSS_LOG_ERROR("Invalid JSON file: %s, throws exception: %s\n", m_regexPath.c_str(), iaException.what()); + return false; + } + + string regexString; + string timestampRegex = "^([a-zA-Z]{3})?\\s*([0-9]{1,2})?\\s*([0-9]{2}:[0-9]{2}:[0-9]{2}.[0-9]{0,6})?\\s*"; + regex expression; + vector regexList; + + for(long unsigned int i = 0; i < jsonList.size(); i++) { + RegexStruct rs = RegexStruct(); + vector eventParams; + try { + string eventRegex = jsonList[i]["regex"]; + regexString = timestampRegex + eventRegex; + string tag = jsonList[i]["tag"]; + vector params = jsonList[i]["params"]; + vector timestampParams = { "month", "day", "time" }; + params.insert(params.begin(), timestampParams.begin(), timestampParams.end()); + regex expr(regexString); + expression = expr; + parseParams(params, eventParams); + rs.params = eventParams; + rs.tag = tag; + rs.regexExpression = expression; + regexList.push_back(rs); + } catch (domain_error& deException) { + SWSS_LOG_ERROR("Missing required key, throws exception: %s\n", deException.what()); + return false; + } catch (regex_error& reException) { + SWSS_LOG_ERROR("Invalid regex, throws exception: %s\n", reException.what()); + return false; + } + } + + if(regexList.empty()) { + SWSS_LOG_ERROR("Empty list of regex expressions.\n"); + return false; + } + + m_parser->m_regexList = regexList; + + regexFile.close(); + return true; +} + +void RsyslogPlugin::run() { + lua_State* luaState = luaL_newstate(); + luaL_openlibs(luaState); + while(true) { + string line; + getline(cin, line); + if(line.empty()) { + continue; + } + onMessage(line, luaState); + } + lua_close(luaState); +} + +int RsyslogPlugin::onInit() { + m_eventHandle = events_init_publisher(m_moduleName); + bool success = createRegexList(); + if(!success) { + return 1; // invalid regex error code + } else if(m_eventHandle == NULL) { + return 2; // event init publish error code + } + return 0; +} + +RsyslogPlugin::RsyslogPlugin(string moduleName, string regexPath) { + m_parser = unique_ptr(new SyslogParser()); + m_moduleName = moduleName; + m_regexPath = regexPath; +} diff --git a/src/sonic-eventd/rsyslog_plugin/rsyslog_plugin.h b/src/sonic-eventd/rsyslog_plugin/rsyslog_plugin.h new file mode 100644 index 000000000000..0811b5f3032f --- /dev/null +++ b/src/sonic-eventd/rsyslog_plugin/rsyslog_plugin.h @@ -0,0 +1,40 @@ +#ifndef RSYSLOG_PLUGIN_H +#define RSYSLOG_PLUGIN_H + +extern "C" +{ + #include + #include + #include +} +#include +#include +#include "syslog_parser.h" +#include "events.h" +#include "logger.h" + +using namespace std; +using namespace swss; + +/** + * Rsyslog Plugin will utilize an instance of a syslog parser to read syslog messages from rsyslog.d and will continuously read from stdin + * A plugin instance is created for each container/host. + * + */ + +class RsyslogPlugin { +public: + int onInit(); + bool onMessage(string msg, lua_State* luaState); + void run(); + RsyslogPlugin(string moduleName, string regexPath); +private: + unique_ptr m_parser; + event_handle_t m_eventHandle; + string m_regexPath; + string m_moduleName; + bool createRegexList(); +}; + +#endif + diff --git a/src/sonic-eventd/rsyslog_plugin/subdir.mk b/src/sonic-eventd/rsyslog_plugin/subdir.mk new file mode 100644 index 000000000000..17df55c718a0 --- /dev/null +++ b/src/sonic-eventd/rsyslog_plugin/subdir.mk @@ -0,0 +1,13 @@ +CC := g++ + +RSYSLOG-PLUGIN-TEST_OBJS += ./rsyslog_plugin/rsyslog_plugin.o ./rsyslog_plugin/syslog_parser.o ./rsyslog_plugin/timestamp_formatter.o +RSYSLOG-PLUGIN_OBJS += ./rsyslog_plugin/rsyslog_plugin.o ./rsyslog_plugin/syslog_parser.o ./rsyslog_plugin/timestamp_formatter.o ./rsyslog_plugin/main.o + +C_DEPS += ./rsyslog_plugin/rsyslog_plugin.d ./rsyslog_plugin/syslog_parser.d ./rsyslog_plugin/timestamp_formatter.d ./rsyslog_plugin/main.d + +rsyslog_plugin/%.o: rsyslog_plugin/%.cpp + @echo 'Building file: $<' + @echo 'Invoking: GCC C++ Compiler' + $(CC) -D__FILENAME__="$(subst rsyslog_plugin/,,$<)" $(CFLAGS) -c -fmessage-length=0 -MMD -MP -MF"$(@:%.o=%.d)" -MT"$(@)" -o "$(@)" "$<" + @echo 'Finished building: $<' + @echo ' ' diff --git a/src/sonic-eventd/rsyslog_plugin/syslog_parser.cpp b/src/sonic-eventd/rsyslog_plugin/syslog_parser.cpp new file mode 100644 index 000000000000..ebf7c598d15a --- /dev/null +++ b/src/sonic-eventd/rsyslog_plugin/syslog_parser.cpp @@ -0,0 +1,65 @@ +#include +#include +#include "syslog_parser.h" +#include "logger.h" + +/** + * Parses syslog message and returns structured event + * + * @param nessage us syslog message being fed in by rsyslog.d + * @return return structured event json for publishing + * +*/ + +bool SyslogParser::parseMessage(string message, string& eventTag, event_params_t& paramMap, lua_State* luaState) { + for(long unsigned int i = 0; i < m_regexList.size(); i++) { + smatch matchResults; + if(!regex_search(message, matchResults, m_regexList[i].regexExpression) || m_regexList[i].params.size() != matchResults.size() - 1 || matchResults.size() < 4) { + continue; + } + string formattedTimestamp; + if(!matchResults[1].str().empty() && !matchResults[2].str().empty() && !matchResults[3].str().empty()) { // found timestamp components + formattedTimestamp = m_timestampFormatter->changeTimestampFormat({ matchResults[1].str(), matchResults[2].str(), matchResults[3].str() }); + } + if(!formattedTimestamp.empty()) { + paramMap["timestamp"] = formattedTimestamp; + } else { + SWSS_LOG_INFO("Timestamp is invalid and is not able to be formatted"); + } + + // found matching regex + eventTag = m_regexList[i].tag; + // check params for lua code + for(long unsigned int j = 3; j < m_regexList[i].params.size(); j++) { + string resultValue = matchResults[j + 1].str(); + string paramName = m_regexList[i].params[j].paramName; + const char* luaCode = m_regexList[i].params[j].luaCode.c_str(); + + if(luaCode == NULL || *luaCode == 0) { + SWSS_LOG_INFO("Invalid lua code, empty or missing"); + paramMap[paramName] = resultValue; + continue; + } + + // execute lua code + lua_pushstring(luaState, resultValue.c_str()); + lua_setglobal(luaState, "arg"); + if(luaL_dostring(luaState, luaCode) == 0) { + lua_pop(luaState, lua_gettop(luaState)); + } else { // error in lua code + SWSS_LOG_ERROR("Invalid lua code, unable to do operation.\n"); + paramMap[paramName] = resultValue; + continue; + } + lua_getglobal(luaState, "ret"); + paramMap[paramName] = lua_tostring(luaState, -1); + lua_pop(luaState, 1); + } + return true; + } + return false; +} + +SyslogParser::SyslogParser() { + m_timestampFormatter = unique_ptr(new TimestampFormatter()); +} diff --git a/src/sonic-eventd/rsyslog_plugin/syslog_parser.h b/src/sonic-eventd/rsyslog_plugin/syslog_parser.h new file mode 100644 index 000000000000..6293eb3c4a34 --- /dev/null +++ b/src/sonic-eventd/rsyslog_plugin/syslog_parser.h @@ -0,0 +1,46 @@ +#ifndef SYSLOG_PARSER_H +#define SYSLOG_PARSER_H + +extern "C" +{ + #include + #include + #include +} + +#include +#include +#include +#include "json.hpp" +#include "events.h" +#include "timestamp_formatter.h" + +using namespace std; +using json = nlohmann::json; + +struct EventParam { + string paramName; + string luaCode; +}; + +struct RegexStruct { + regex regexExpression; + vector params; + string tag; +}; + +/** + * Syslog Parser is responsible for parsing log messages fed by rsyslog.d and returns + * matched result to rsyslog_plugin to use with events publish API + * + */ + +class SyslogParser { +public: + unique_ptr m_timestampFormatter; + vector m_regexList; + bool parseMessage(string message, string& tag, event_params_t& paramDict, lua_State* luaState); + SyslogParser(); +}; + +#endif diff --git a/src/sonic-eventd/rsyslog_plugin/timestamp_formatter.cpp b/src/sonic-eventd/rsyslog_plugin/timestamp_formatter.cpp new file mode 100644 index 000000000000..cc179adbbc75 --- /dev/null +++ b/src/sonic-eventd/rsyslog_plugin/timestamp_formatter.cpp @@ -0,0 +1,74 @@ +#include +#include "timestamp_formatter.h" +#include "logger.h" +#include "events.h" + +using namespace std; + +/*** + * + * Formats given string into string needed by YANG model + * + * @param timestamp parsed from syslog message + * @return formatted timestamp that conforms to YANG model + * + */ + +static const unordered_map g_monthDict { + { "Jan", "01" }, + { "Feb", "02" }, + { "Mar", "03" }, + { "Apr", "04" }, + { "May", "05" }, + { "Jun", "06" }, + { "Jul", "07" }, + { "Aug", "08" }, + { "Sep", "09" }, + { "Oct", "10" }, + { "Nov", "11" }, + { "Dec", "12" } +}; + +string TimestampFormatter::getYear(string timestamp) { + if(!m_storedTimestamp.empty()) { + if(m_storedTimestamp.compare(timestamp) <= 0) { + m_storedTimestamp = timestamp; + return m_storedYear; + } + } + // no last timestamp or year change + time_t currentTime = time(nullptr); + tm* const localTime = localtime(¤tTime); + stringstream ss; + auto currentYear = 1900 + localTime->tm_year; + ss << currentYear; // get current year + string year = ss.str(); + m_storedTimestamp = timestamp; + m_storedYear = year; + return year; +} + +string TimestampFormatter::changeTimestampFormat(vector dateComponents) { + if(dateComponents.size() < 3) { + SWSS_LOG_ERROR("Timestamp formatter unable to format due to invalid input"); + return ""; + } + string formattedTimestamp; // need to change format of Mmm dd hh:mm:ss.SSSSSS to YYYY-mm-ddThh:mm:ss.SSSSSSZ + string month; + auto it = g_monthDict.find(dateComponents[0]); + if(it != g_monthDict.end()) { + month = it->second; + } else { + SWSS_LOG_ERROR("Timestamp month was given in wrong format.\n"); + return ""; + } + string day = dateComponents[1]; + if(day.size() == 1) { // convert 1 -> 01 + day.insert(day.begin(), '0'); + } + string time = dateComponents[2]; + string currentTimestamp = month + day + time; + string year = getYear(currentTimestamp); + formattedTimestamp = year + "-" + month + "-" + day + "T" + time + "Z"; + return formattedTimestamp; +} diff --git a/src/sonic-eventd/rsyslog_plugin/timestamp_formatter.h b/src/sonic-eventd/rsyslog_plugin/timestamp_formatter.h new file mode 100644 index 000000000000..ea99c4cfcb8c --- /dev/null +++ b/src/sonic-eventd/rsyslog_plugin/timestamp_formatter.h @@ -0,0 +1,27 @@ +#ifndef TIMESTAMP_FORMATTER_H +#define TIMESTAMP_FORMATTER_H + +#include +#include +#include +#include +#include + +using namespace std; + +/*** + * + * TimestampFormatter is responsible for formatting the timestamps received in syslog messages and to format them into the type needed by YANG model + * + */ + +class TimestampFormatter { +public: + string changeTimestampFormat(vector dateComponents); + string m_storedTimestamp; + string m_storedYear; +private: + string getYear(string timestamp); +}; + +#endif diff --git a/src/sonic-eventd/rsyslog_plugin_tests/rsyslog_plugin_ut.cpp b/src/sonic-eventd/rsyslog_plugin_tests/rsyslog_plugin_ut.cpp new file mode 100644 index 000000000000..be5a19ad5a5b --- /dev/null +++ b/src/sonic-eventd/rsyslog_plugin_tests/rsyslog_plugin_ut.cpp @@ -0,0 +1,274 @@ +extern "C" +{ + #include + #include + #include +} +#include +#include +#include +#include +#include "gtest/gtest.h" +#include "json.hpp" +#include "events.h" +#include "../rsyslog_plugin/rsyslog_plugin.h" +#include "../rsyslog_plugin/syslog_parser.h" +#include "../rsyslog_plugin/timestamp_formatter.h" + +using namespace std; +using namespace swss; +using json = nlohmann::json; + +vector createEventParams(vector params, vector luaCodes) { + vector eventParams; + for(long unsigned int i = 0; i < params.size(); i++) { + EventParam ep = EventParam(); + ep.paramName = params[i]; + ep.luaCode = luaCodes[i]; + eventParams.push_back(ep); + } + return eventParams; +} + +TEST(syslog_parser, matching_regex) { + json jList = json::array(); + vector regexList; + string regexString = "^([a-zA-Z]{3})?\\s*([0-9]{1,2})?\\s*([0-9]{2}:[0-9]{2}:[0-9]{2}.[0-9]{0,6})?\\s*message (.*) other_data (.*) even_more_data (.*)"; + vector params = { "month", "day", "time", "message", "other_data", "even_more_data" }; + vector luaCodes = { "", "", "", "", "", "" }; + regex expression(regexString); + + RegexStruct rs = RegexStruct(); + rs.tag = "test_tag"; + rs.regexExpression = expression; + rs.params = createEventParams(params, luaCodes); + regexList.push_back(rs); + + string tag; + event_params_t paramDict; + + event_params_t expectedDict; + expectedDict["message"] = "test_message"; + expectedDict["other_data"] = "test_data"; + expectedDict["even_more_data"] = "test_data"; + + unique_ptr parser(new SyslogParser()); + parser->m_regexList = regexList; + lua_State* luaState = luaL_newstate(); + luaL_openlibs(luaState); + + bool success = parser->parseMessage("message test_message other_data test_data even_more_data test_data", tag, paramDict, luaState); + EXPECT_EQ(true, success); + EXPECT_EQ("test_tag", tag); + EXPECT_EQ(expectedDict, paramDict); + + lua_close(luaState); +} + +TEST(syslog_parser, matching_regex_timestamp) { + json jList = json::array(); + vector regexList; + string regexString = "^([a-zA-Z]{3})?\\s*([0-9]{1,2})?\\s*([0-9]{2}:[0-9]{2}:[0-9]{2}.[0-9]{0,6})?\\s*message (.*) other_data (.*)"; + vector params = { "month", "day", "time", "message", "other_data" }; + vector luaCodes = { "", "", "", "", "" }; + regex expression(regexString); + + RegexStruct rs = RegexStruct(); + rs.tag = "test_tag"; + rs.regexExpression = expression; + rs.params = createEventParams(params, luaCodes); + regexList.push_back(rs); + + string tag; + event_params_t paramDict; + + event_params_t expectedDict; + expectedDict["message"] = "test_message"; + expectedDict["other_data"] = "test_data"; + expectedDict["timestamp"] = "2022-07-21T02:10:00.000000Z"; + + unique_ptr parser(new SyslogParser()); + parser->m_regexList = regexList; + lua_State* luaState = luaL_newstate(); + luaL_openlibs(luaState); + + bool success = parser->parseMessage("Jul 21 02:10:00.000000 message test_message other_data test_data", tag, paramDict, luaState); + EXPECT_EQ(true, success); + EXPECT_EQ("test_tag", tag); + EXPECT_EQ(expectedDict, paramDict); + + lua_close(luaState); +} + +TEST(syslog_parser, no_matching_regex) { + json jList = json::array(); + vector regexList; + string regexString = "^([a-zA-Z]{3})?\\s*([0-9]{1,2})?\\s*([0-9]{2}:[0-9]{2}:[0-9]{2}.[0-9]{0,6})?\\s*no match"; + vector params = { "month", "day", "time" }; + vector luaCodes = { "", "", "" }; + regex expression(regexString); + + RegexStruct rs = RegexStruct(); + rs.tag = "test_tag"; + rs.regexExpression = expression; + rs.params = createEventParams(params, luaCodes); + regexList.push_back(rs); + + string tag; + event_params_t paramDict; + + unique_ptr parser(new SyslogParser()); + parser->m_regexList = regexList; + lua_State* luaState = luaL_newstate(); + luaL_openlibs(luaState); + + bool success = parser->parseMessage("Test Message", tag, paramDict, luaState); + EXPECT_EQ(false, success); + + lua_close(luaState); +} + +TEST(syslog_parser, lua_code_valid_1) { + json jList = json::array(); + vector regexList; + string regexString = "^([a-zA-Z]{3})?\\s*([0-9]{1,2})?\\s*([0-9]{2}:[0-9]{2}:[0-9]{2}.[0-9]{0,6})?\\s*.* (sent|received) (?:to|from) .* ([0-9]{2,3}.[0-9]{2,3}.[0-9]{2,3}.[0-9]{2,3}) active ([1-9]{1,3})/([1-9]{1,3}) .*"; + vector params = { "month", "day", "time", "is-sent", "ip", "major-code", "minor-code" }; + vector luaCodes = { "", "", "", "ret=tostring(arg==\"sent\")", "", "", "" }; + regex expression(regexString); + + RegexStruct rs = RegexStruct(); + rs.tag = "test_tag"; + rs.regexExpression = expression; + rs.params = createEventParams(params, luaCodes); + regexList.push_back(rs); + + string tag; + event_params_t paramDict; + + event_params_t expectedDict; + expectedDict["is-sent"] = "true"; + expectedDict["ip"] = "100.95.147.229"; + expectedDict["major-code"] = "2"; + expectedDict["minor-code"] = "2"; + + unique_ptr parser(new SyslogParser()); + parser->m_regexList = regexList; + lua_State* luaState = luaL_newstate(); + luaL_openlibs(luaState); + + bool success = parser->parseMessage("NOTIFICATION: sent to neighbor 100.95.147.229 active 2/2 (peer in wrong AS) 2 bytes", tag, paramDict, luaState); + EXPECT_EQ(true, success); + EXPECT_EQ("test_tag", tag); + EXPECT_EQ(expectedDict, paramDict); + + lua_close(luaState); +} + +TEST(syslog_parser, lua_code_valid_2) { + json jList = json::array(); + vector regexList; + string regexString = "([a-zA-Z]{3})?\\s*([0-9]{1,2})?\\s*([0-9]{2}:[0-9]{2}:[0-9]{2}.[0-9]{0,6})?\\s*.* (sent|received) (?:to|from) .* ([0-9]{2,3}.[0-9]{2,3}.[0-9]{2,3}.[0-9]{2,3}) active ([1-9]{1,3})/([1-9]{1,3}) .*"; + vector params = { "month", "day", "time", "is-sent", "ip", "major-code", "minor-code" }; + vector luaCodes = { "", "", "", "ret=tostring(arg==\"sent\")", "", "", "" }; + regex expression(regexString); + + RegexStruct rs = RegexStruct(); + rs.tag = "test_tag"; + rs.regexExpression = expression; + rs.params = createEventParams(params, luaCodes); + regexList.push_back(rs); + + string tag; + event_params_t paramDict; + + event_params_t expectedDict; + expectedDict["is-sent"] = "false"; + expectedDict["ip"] = "10.10.24.216"; + expectedDict["major-code"] = "6"; + expectedDict["minor-code"] = "2"; + expectedDict["timestamp"] = "2022-12-03T12:36:24.503424Z"; + + unique_ptr parser(new SyslogParser()); + parser->m_regexList = regexList; + lua_State* luaState = luaL_newstate(); + luaL_openlibs(luaState); + + bool success = parser->parseMessage("Dec 3 12:36:24.503424 NOTIFICATION: received from neighbor 10.10.24.216 active 6/2 (Administrative Shutdown) 0 bytes", tag, paramDict, luaState); + EXPECT_EQ(true, success); + EXPECT_EQ("test_tag", tag); + EXPECT_EQ(expectedDict, paramDict); + + lua_close(luaState); +} + +TEST(rsyslog_plugin, onInit_emptyJSON) { + unique_ptr plugin(new RsyslogPlugin("test_mod_name", "./rsyslog_plugin_tests/test_regex_1.rc.json")); + EXPECT_NE(0, plugin->onInit()); +} + +TEST(rsyslog_plugin, onInit_missingRegex) { + unique_ptr plugin(new RsyslogPlugin("test_mod_name", "./rsyslog_plugin_tests/test_regex_3.rc.json")); + EXPECT_NE(0, plugin->onInit()); +} + +TEST(rsyslog_plugin, onInit_invalidRegex) { + unique_ptr plugin(new RsyslogPlugin("test_mod_name", "./rsyslog_plugin_tests/test_regex_4.rc.json")); + EXPECT_NE(0, plugin->onInit()); +} + +TEST(rsyslog_plugin, onMessage) { + unique_ptr plugin(new RsyslogPlugin("test_mod_name", "./rsyslog_plugin_tests/test_regex_2.rc.json")); + EXPECT_EQ(0, plugin->onInit()); + ifstream infile("./rsyslog_plugin_tests/test_syslogs.txt"); + string logMessage; + bool parseResult; + lua_State* luaState = luaL_newstate(); + luaL_openlibs(luaState); + while(infile >> logMessage >> parseResult) { + EXPECT_EQ(parseResult, plugin->onMessage(logMessage, luaState)); + } + lua_close(luaState); + infile.close(); +} + +TEST(rsyslog_plugin, onMessage_noParams) { + unique_ptr plugin(new RsyslogPlugin("test_mod_name", "./rsyslog_plugin_tests/test_regex_5.rc.json")); + EXPECT_EQ(0, plugin->onInit()); + ifstream infile("./rsyslog_plugin_tests/test_syslogs_2.txt"); + string logMessage; + bool parseResult; + lua_State* luaState = luaL_newstate(); + luaL_openlibs(luaState); + while(infile >> logMessage >> parseResult) { + EXPECT_EQ(parseResult, plugin->onMessage(logMessage, luaState)); + } + lua_close(luaState); + infile.close(); +} + +TEST(timestampFormatter, changeTimestampFormat) { + unique_ptr formatter(new TimestampFormatter()); + + vector timestampOne = { "Jul", "20", "10:09:40.230874" }; + vector timestampTwo = { "Jan", "1", "00:00:00.000000" }; + vector timestampThree = { "Dec", "31", "23:59:59.000000" }; + + string formattedTimestampOne = formatter->changeTimestampFormat(timestampOne); + EXPECT_EQ("2022-07-20T10:09:40.230874Z", formattedTimestampOne); + + EXPECT_EQ("072010:09:40.230874", formatter->m_storedTimestamp); + + string formattedTimestampTwo = formatter->changeTimestampFormat(timestampTwo); + EXPECT_EQ("2022-01-01T00:00:00.000000Z", formattedTimestampTwo); + + formatter->m_storedTimestamp = "010100:00:00.000000"; + formatter->m_storedYear = "2025"; + + string formattedTimestampThree = formatter->changeTimestampFormat(timestampThree); + EXPECT_EQ("2025-12-31T23:59:59.000000Z", formattedTimestampThree); +} + +int main(int argc, char* argv[]) { + testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/src/sonic-eventd/rsyslog_plugin_tests/subdir.mk b/src/sonic-eventd/rsyslog_plugin_tests/subdir.mk new file mode 100644 index 000000000000..6be7ef09786a --- /dev/null +++ b/src/sonic-eventd/rsyslog_plugin_tests/subdir.mk @@ -0,0 +1,12 @@ +CC := g++ + +RSYSLOG-PLUGIN-TEST_OBJS += ./rsyslog_plugin_tests/rsyslog_plugin_ut.o + +C_DEPS += ./rsyslog_plugin_tests/rsyslog_plugin_ut.d + +rsyslog_plugin_tests/%.o: rsyslog_plugin_tests/%.cpp + @echo 'Building file: $<' + @echo 'Invoking: GCC C++ Compiler' + $(CC) -D__FILENAME__="$(subst rsyslog_plugin_tests/,,$<)" $(CFLAGS) -c -fmessage-length=0 -MMD -MP -MF"$(@:%.o=%.d)" -MT"$(@)" -o "$@" "$<" + @echo 'Finished building: $<' + @echo ' ' diff --git a/src/sonic-eventd/rsyslog_plugin_tests/test_regex_1.rc.json b/src/sonic-eventd/rsyslog_plugin_tests/test_regex_1.rc.json new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/src/sonic-eventd/rsyslog_plugin_tests/test_regex_2.rc.json b/src/sonic-eventd/rsyslog_plugin_tests/test_regex_2.rc.json new file mode 100644 index 000000000000..66788d326331 --- /dev/null +++ b/src/sonic-eventd/rsyslog_plugin_tests/test_regex_2.rc.json @@ -0,0 +1,7 @@ +[ + { + "tag": "bgp-state", + "regex": ".* %ADJCHANGE: neighbor (.*) (Up|Down) .*", + "params": ["neighbor_ip", "state" ] + } +] diff --git a/src/sonic-eventd/rsyslog_plugin_tests/test_regex_3.rc.json b/src/sonic-eventd/rsyslog_plugin_tests/test_regex_3.rc.json new file mode 100644 index 000000000000..2e67e88f8448 --- /dev/null +++ b/src/sonic-eventd/rsyslog_plugin_tests/test_regex_3.rc.json @@ -0,0 +1,6 @@ +[ + { + "tag": "TEST-TAG-NO-REGEX", + "param": [] + } +] diff --git a/src/sonic-eventd/rsyslog_plugin_tests/test_regex_4.rc.json b/src/sonic-eventd/rsyslog_plugin_tests/test_regex_4.rc.json new file mode 100644 index 000000000000..c3a875aded0f --- /dev/null +++ b/src/sonic-eventd/rsyslog_plugin_tests/test_regex_4.rc.json @@ -0,0 +1,7 @@ +[ + { + "tag": "TEST-TAG-INVALID-REGEX", + "regex": "+++ ++++(", + "params": [] + } +] diff --git a/src/sonic-eventd/rsyslog_plugin_tests/test_regex_5.rc.json b/src/sonic-eventd/rsyslog_plugin_tests/test_regex_5.rc.json new file mode 100644 index 000000000000..ddaf37c931a8 --- /dev/null +++ b/src/sonic-eventd/rsyslog_plugin_tests/test_regex_5.rc.json @@ -0,0 +1,7 @@ +[ + { + "tag": "test_tag", + "regex": ".*", + "params": [] + } +] diff --git a/src/sonic-eventd/rsyslog_plugin_tests/test_syslogs.txt b/src/sonic-eventd/rsyslog_plugin_tests/test_syslogs.txt new file mode 100644 index 000000000000..78f89aec3d28 --- /dev/null +++ b/src/sonic-eventd/rsyslog_plugin_tests/test_syslogs.txt @@ -0,0 +1,4 @@ +"Aug 17 02:39:21.286611 SN6-0101-0114-02T0 INFO bgp#bgpd[62]: %ADJCHANGE: neighbor 100.126.188.90 Down Neighbor deleted" true +"Aug 17 02:46:42.615668 SN6-0101-0114-02T0 INFO bgp#bgpd[62]: %ADJCHANGE: neighbor 100.126.188.90 Up" true +"Aug 17 04:46:51.290979 SN6-0101-0114-02T0 INFO bgp#bgpd[62]: %ADJCHANGE: neighbor 100.126.188.78 Down Neighbor deleted" true +"Aug 17 04:46:51.290979 SN6-0101-0114-02T0 INFO bgp#bgpd[62]: %NOEVENT: no event" false diff --git a/src/sonic-eventd/rsyslog_plugin_tests/test_syslogs_2.txt b/src/sonic-eventd/rsyslog_plugin_tests/test_syslogs_2.txt new file mode 100644 index 000000000000..d56615f61681 --- /dev/null +++ b/src/sonic-eventd/rsyslog_plugin_tests/test_syslogs_2.txt @@ -0,0 +1,3 @@ +testMessage true +another_test_message true + true diff --git a/src/sonic-eventd/src/eventd.cpp b/src/sonic-eventd/src/eventd.cpp new file mode 100644 index 000000000000..1ff9dd8be20b --- /dev/null +++ b/src/sonic-eventd/src/eventd.cpp @@ -0,0 +1,798 @@ +#include +#include "eventd.h" +#include "dbconnector.h" + +/* + * There are 5 threads, including the main + * + * (0) main thread -- Runs eventd service that accepts commands event_req_type_t + * This can be used to control caching events and a no-op echo service. + * + * (1) capture/cache service + * Saves all the events between cache start & stop. + * Update missed cached counter in memory. + * + * (2) Main proxy service that runs XSUB/XPUB ends + * + * (3) Get stats for total published counter in memory. This thread also sends + * heartbeat message. It accomplishes by counting upon receive missed due + * to event receive timeout. + * + * (4) Thread to update counters from memory to redis periodically. + * + */ + +using namespace std; +using namespace swss; + +#define MB(N) ((N) * 1024 * 1024) +#define EVT_SIZE_AVG 150 + +#define MAX_CACHE_SIZE (MB(100) / (EVT_SIZE_AVG)) + +/* Count of elements returned in each read */ +#define READ_SET_SIZE 100 + +#define VEC_SIZE(p) ((int)p.size()) + +/* Sock read timeout in milliseconds, to enable look for control signals */ +#define CAPTURE_SOCK_TIMEOUT 800 + +#define HEARTBEAT_INTERVAL_SECS 2 /* Default: 2 seconds */ + +/* Source & tag for heartbeat events */ +#define EVENTD_PUBLISHER_SOURCE "sonic-events-eventd" +#define EVENTD_HEARTBEAT_TAG "heartbeat" + + +const char *counter_keys[COUNTERS_EVENTS_TOTAL] = { + COUNTERS_EVENTS_PUBLISHED, + COUNTERS_EVENTS_MISSED_CACHE +}; + +static bool s_unit_testing = false; + +int +eventd_proxy::init() +{ + int ret = -1, rc = 0; + SWSS_LOG_INFO("Start xpub/xsub proxy"); + + m_frontend = zmq_socket(m_ctx, ZMQ_XSUB); + RET_ON_ERR(m_frontend != NULL, "failing to get ZMQ_XSUB socket"); + + rc = zmq_bind(m_frontend, get_config(string(XSUB_END_KEY)).c_str()); + RET_ON_ERR(rc == 0, "Failing to bind XSUB to %s", get_config(string(XSUB_END_KEY)).c_str()); + + m_backend = zmq_socket(m_ctx, ZMQ_XPUB); + RET_ON_ERR(m_backend != NULL, "failing to get ZMQ_XPUB socket"); + + rc = zmq_bind(m_backend, get_config(string(XPUB_END_KEY)).c_str()); + RET_ON_ERR(rc == 0, "Failing to bind XPUB to %s", get_config(string(XPUB_END_KEY)).c_str()); + + m_capture = zmq_socket(m_ctx, ZMQ_PUB); + RET_ON_ERR(m_capture != NULL, "failing to get ZMQ_PUB socket for capture"); + + rc = zmq_bind(m_capture, get_config(string(CAPTURE_END_KEY)).c_str()); + RET_ON_ERR(rc == 0, "Failing to bind capture PUB to %s", get_config(string(CAPTURE_END_KEY)).c_str()); + + m_thr = thread(&eventd_proxy::run, this); + ret = 0; +out: + return ret; +} + +void +eventd_proxy::run() +{ + SWSS_LOG_INFO("Running xpub/xsub proxy"); + + /* runs forever until zmq context is terminated */ + zmq_proxy(m_frontend, m_backend, m_capture); + + SWSS_LOG_INFO("Stopped xpub/xsub proxy"); +} + + +stats_collector::stats_collector() : + m_shutdown(false), m_pause_heartbeat(false), m_heartbeats_published(0), + m_heartbeats_interval_cnt(0) +{ + set_heartbeat_interval(HEARTBEAT_INTERVAL_SECS); + for (int i=0; i < COUNTERS_EVENTS_TOTAL; ++i) { + m_lst_counters[i] = 0; + } + m_updated = false; +} + + +void +stats_collector::set_heartbeat_interval(int val) +{ + if (val > 0) { + /* Round to highest possible multiples of MIN */ + m_heartbeats_interval_cnt = + (((val * 1000) + STATS_HEARTBEAT_MIN - 1) / STATS_HEARTBEAT_MIN); + } + else if (val == 0) { + /* Least possible */ + m_heartbeats_interval_cnt = 1; + } + else if (val == -1) { + /* Turn off heartbeat */ + m_heartbeats_interval_cnt = 0; + SWSS_LOG_INFO("Heartbeat turned OFF"); + } + /* Any other value is ignored as invalid */ + + SWSS_LOG_INFO("Set heartbeat: val=%d secs cnt=%d min=%d ms final=%d secs", + val, m_heartbeats_interval_cnt, STATS_HEARTBEAT_MIN, + (m_heartbeats_interval_cnt * STATS_HEARTBEAT_MIN / 1000)); +} + + +int +stats_collector::get_heartbeat_interval() +{ + return m_heartbeats_interval_cnt * STATS_HEARTBEAT_MIN / 1000; +} + +int +stats_collector::start() +{ + int rc = -1; + + if (!s_unit_testing) { + try { + m_counters_db = make_shared("COUNTERS_DB", 0, true); + } + catch (exception &e) + { + SWSS_LOG_ERROR("Unable to get DB Connector, e=(%s)\n", e.what()); + } + RET_ON_ERR(m_counters_db != NULL, "Failed to get COUNTERS_DB"); + + m_stats_table = make_shared( + m_counters_db.get(), COUNTERS_EVENTS_TABLE); + RET_ON_ERR(m_stats_table != NULL, "Failed to get events table"); + + m_thr_writer = thread(&stats_collector::run_writer, this); + } + m_thr_collector = thread(&stats_collector::run_collector, this); + rc = 0; +out: + return rc; +} + +void +stats_collector::run_writer() +{ + while (true) { + if (m_updated.exchange(false)) { + /* Update if there had been any update */ + + for (int i = 0; i < COUNTERS_EVENTS_TOTAL; ++i) { + vector fv; + + fv.emplace_back(EVENTS_STATS_FIELD_NAME, to_string(m_lst_counters[i])); + + m_stats_table->set(counter_keys[i], fv); + } + } + if (m_shutdown) { + break; + } + this_thread::sleep_for(chrono::milliseconds(10)); + /* + * After sleep always do an update if needed before checking + * shutdown flag, as any counters collected during sleep + * needs to be updated. + */ + } + + m_stats_table.reset(); + m_counters_db.reset(); +} + +void +stats_collector::run_collector() +{ + int hb_cntr = 0; + string hb_key = string(EVENTD_PUBLISHER_SOURCE) + ":" + EVENTD_HEARTBEAT_TAG; + event_handle_t pub_handle = NULL; + event_handle_t subs_handle = NULL; + + /* + * A subscriber is required to set a subscription. Else all published + * events will be dropped at the point of publishing itself. + */ + pub_handle = events_init_publisher(EVENTD_PUBLISHER_SOURCE); + RET_ON_ERR(pub_handle != NULL, + "failed to create publisher handle for heartbeats"); + + subs_handle = events_init_subscriber(false, STATS_HEARTBEAT_MIN); + RET_ON_ERR(subs_handle != NULL, "failed to subscribe to all"); + + /* + * Though we can count off of capture socket, then we need to duplicate + * code in event_receive which has the logic to count all missed per + * runtime id. It also has logic to retire closed runtime IDs. + * + * So use regular subscriber API w/o cache but timeout to enable + * exit, upon shutdown. + */ + /* + * The collector service runs until shutdown. + * The only task is to update total_published & total_missed_internal. + * The write of these counters into redis is done by another thread. + */ + + while(!m_shutdown) { + event_receive_op_t op; + int rc = 0; + + try { + rc = event_receive(subs_handle, op); + } + catch (exception& e) + { + rc = -1; + stringstream ss; + ss << e.what(); + SWSS_LOG_ERROR("Receive event failed with %s", ss.str().c_str()); + } + + if ((rc == 0) && (op.key != hb_key)) { + /* TODO: Discount EVENT_STR_CTRL_DEINIT messages too */ + increment_published(1+op.missed_cnt); + + /* reset counter on receive to restart. */ + hb_cntr = 0; + } + else { + if (rc < 0) { + SWSS_LOG_ERROR( + "event_receive failed with rc=%d; stats:published(%lu)", rc, + m_lst_counters[INDEX_COUNTERS_EVENTS_PUBLISHED]); + } + if (!m_pause_heartbeat && (m_heartbeats_interval_cnt > 0) && + ++hb_cntr >= m_heartbeats_interval_cnt) { + rc = event_publish(pub_handle, EVENTD_HEARTBEAT_TAG); + if (rc != 0) { + SWSS_LOG_ERROR("Failed to publish heartbeat rc=%d", rc); + } + hb_cntr = 0; + ++m_heartbeats_published; + } + } + } + +out: + /* + * NOTE: A shutdown could lose messages in cache. + * But consider, that eventd shutdown is a critical shutdown as it would + * bring down all other features. Hence done only at system level shutdown, + * hence losing few messages in flight is acceptable. Any more complex code + * to handle is unwanted. + */ + + events_deinit_subscriber(subs_handle); + events_deinit_publisher(pub_handle); + m_shutdown = true; +} + +capture_service::~capture_service() +{ + stop_capture(); +} + +void +capture_service::stop_capture() +{ + m_ctrl = STOP_CAPTURE; + + if (m_thr.joinable()) { + m_thr.join(); + } +} + +static bool +validate_event(const internal_event_t &event, runtime_id_t &rid, sequence_t &seq) +{ + bool ret = false; + + internal_event_t::const_iterator itc_r, itc_s, itc_e; + itc_r = event.find(EVENT_RUNTIME_ID); + itc_s = event.find(EVENT_SEQUENCE); + itc_e = event.find(EVENT_STR_DATA); + + if ((itc_r != event.end()) && (itc_s != event.end()) && (itc_e != event.end())) { + ret = true; + rid = itc_r->second; + seq = str_to_seq(itc_s->second); + } + else { + SWSS_LOG_ERROR("Invalid evt: %s", map_to_str(event).c_str()); + } + + return ret; +} + + +/* + * Initialize cache with set of events provided. + * Events read by cache service will be appended + */ +void +capture_service::init_capture_cache(const event_serialized_lst_t &lst) +{ + /* Cache given events as initial stock. + * Save runtime ID with last seen seq to avoid duplicates, while reading + * from capture socket. + * No check for max cache size here, as most likely not needed. + */ + for (event_serialized_lst_t::const_iterator itc = lst.begin(); itc != lst.end(); ++itc) { + internal_event_t event; + + if (deserialize(*itc, event) == 0) { + runtime_id_t rid; + sequence_t seq; + + if (validate_event(event, rid, seq)) { + m_pre_exist_id[rid] = seq; + m_events.push_back(*itc); + } + } + } +} + + +void +capture_service::do_capture() +{ + int rc; + int block_ms=CAPTURE_SOCK_TIMEOUT; + int init_cnt; + void *cap_sub_sock = NULL; + counters_t total_overflow = 0; + + typedef enum { + /* + * In this state every event read is compared with init cache given + * Only new events are saved. + */ + CAP_STATE_INIT = 0, + + /* In this state, all events read are cached until max limit */ + CAP_STATE_ACTIVE, + + /* Cache has hit max. Hence only save last event for each runime ID */ + CAP_STATE_LAST + } cap_state_t; + + cap_state_t cap_state = CAP_STATE_INIT; + + /* + * Need subscription for publishers to publish. + * The stats collector service already has active subscriber for all. + */ + + cap_sub_sock = zmq_socket(m_ctx, ZMQ_SUB); + RET_ON_ERR(cap_sub_sock != NULL, "failing to get ZMQ_SUB socket"); + + rc = zmq_connect(cap_sub_sock, get_config(string(CAPTURE_END_KEY)).c_str()); + RET_ON_ERR(rc == 0, "Failing to bind capture SUB to %s", get_config(string(CAPTURE_END_KEY)).c_str()); + + rc = zmq_setsockopt(cap_sub_sock, ZMQ_SUBSCRIBE, "", 0); + RET_ON_ERR(rc == 0, "Failing to ZMQ_SUBSCRIBE"); + + rc = zmq_setsockopt(cap_sub_sock, ZMQ_RCVTIMEO, &block_ms, sizeof (block_ms)); + RET_ON_ERR(rc == 0, "Failed to ZMQ_RCVTIMEO to %d", block_ms); + + m_cap_run = true; + + while (m_ctrl != START_CAPTURE) { + /* Wait for capture start */ + this_thread::sleep_for(chrono::milliseconds(10)); + } + + /* + * The cache service connects but defers any reading until caller provides + * the startup cache. But all events that arrived since connect, though not read + * will be held by ZMQ in its local cache. + * + * When cache service starts reading, check against the initial stock for duplicates. + * m_pre_exist_id caches the last seq number in initial stock for each runtime id. + * So only allow sequence number greater than cached number. + * + * Theoretically all the events provided via initial stock could be duplicates. + * Hence until as many events as in initial stock or until the cached id map + * is empty, do this check. + */ + init_cnt = (int)m_events.size(); + + /* Read until STOP_CAPTURE */ + while(m_ctrl == START_CAPTURE) { + runtime_id_t rid; + sequence_t seq; + internal_event_t event; + string source, evt_str; + + if ((rc = zmq_message_read(cap_sub_sock, 0, source, event)) != 0) { + /* + * The capture socket captures SUBSCRIBE requests too. + * The messge could contain subscribe filter strings and binary code. + * Empty string with binary code will fail to deserialize. + * Else would fail event validation. + */ + RET_ON_ERR((rc == EAGAIN) || (rc == ERR_MESSAGE_INVALID), + "0:Failed to read from capture socket"); + continue; + } + if (!validate_event(event, rid, seq)) { + continue; + } + serialize(event, evt_str); + + switch(cap_state) { + case CAP_STATE_INIT: + /* + * In this state check against cache, if duplicate + * When duplicate or new one seen, remove the entry from pre-exist map + * Stay in this state, until the pre-exist cache is empty or as many + * messages as in cache are seen, as in worst case even if you see + * duplicate of each, it will end with first m_events.size() + */ + { + bool add = true; + init_cnt--; + pre_exist_id_t::iterator it = m_pre_exist_id.find(rid); + + if (it != m_pre_exist_id.end()) { + if (seq <= it->second) { + /* Duplicate; Later/same seq in cache. */ + add = false; + } + if (seq >= it->second) { + /* new one; This runtime ID need not be checked again */ + m_pre_exist_id.erase(it); + } + } + if (add) { + m_events.push_back(evt_str); + } + } + if(m_pre_exist_id.empty() || (init_cnt <= 0)) { + /* Init check is no more needed. */ + pre_exist_id_t().swap(m_pre_exist_id); + cap_state = CAP_STATE_ACTIVE; + } + break; + + case CAP_STATE_ACTIVE: + /* Save until max allowed */ + try + { + m_events.push_back(evt_str); + if (VEC_SIZE(m_events) >= m_cache_max) { + cap_state = CAP_STATE_LAST; + /* Clear the map, created to ensure memory space available */ + m_last_events.clear(); + m_last_events_init = true; + } + break; + } + catch (bad_alloc& e) + { + stringstream ss; + ss << e.what(); + SWSS_LOG_ERROR("Cache save event failed with %s events:size=%d", + ss.str().c_str(), VEC_SIZE(m_events)); + cap_state = CAP_STATE_LAST; + // fall through to save this event in last set. + } + + case CAP_STATE_LAST: + total_overflow++; + m_last_events[rid] = evt_str; + if (total_overflow > m_last_events.size()) { + m_total_missed_cache++; + m_stats_instance->increment_missed_cache(1); + } + break; + } + } + +out: + /* + * Capture stop will close the socket which fail the read + * and hence bail out. + */ + zmq_close(cap_sub_sock); + m_cap_run = false; + return; +} + + +int +capture_service::set_control(capture_control_t ctrl, event_serialized_lst_t *lst) +{ + int ret = -1; + + /* Can go in single step only. */ + RET_ON_ERR((ctrl - m_ctrl) == 1, "m_ctrl(%d)+1 < ctrl(%d)", m_ctrl, ctrl); + + switch(ctrl) { + case INIT_CAPTURE: + m_thr = thread(&capture_service::do_capture, this); + for(int i=0; !m_cap_run && (i < 100); ++i) { + /* Wait max a second for thread to init */ + this_thread::sleep_for(chrono::milliseconds(10)); + } + RET_ON_ERR(m_cap_run, "Failed to init capture"); + m_ctrl = ctrl; + ret = 0; + break; + + case START_CAPTURE: + + /* + * Reserve a MAX_PUBLISHERS_COUNT entries for last events, as we use it only + * upon m_events/vector overflow, which might block adding new entries in map + * if overall mem consumption is too high. Clearing the map just before use + * is likely to help. + */ + for (int i=0; iempty())) { + init_capture_cache(*lst); + } + m_ctrl = ctrl; + ret = 0; + break; + + + case STOP_CAPTURE: + /* + * Caller would have initiated SUBS channel. + * Read for CACHE_DRAIN_IN_MILLISECS to drain off cache + * before stopping. + */ + this_thread::sleep_for(chrono::milliseconds(CACHE_DRAIN_IN_MILLISECS)); + stop_capture(); + ret = 0; + break; + + default: + SWSS_LOG_ERROR("Unexpected code=%d", ctrl); + break; + } +out: + return ret; +} + +int +capture_service::read_cache(event_serialized_lst_t &lst_fifo, + last_events_t &lst_last, counters_t &overflow_cnt) +{ + lst_fifo.swap(m_events); + if (m_last_events_init) { + lst_last.swap(m_last_events); + } else { + last_events_t().swap(lst_last); + } + last_events_t().swap(m_last_events); + event_serialized_lst_t().swap(m_events); + overflow_cnt = m_total_missed_cache; + return 0; +} + +static int +process_options(stats_collector *stats, const event_serialized_lst_t &req_data, + event_serialized_lst_t &resp_data) +{ + int ret = -1; + if (!req_data.empty()) { + RET_ON_ERR(req_data.size() == 1, "Expect only one options string %d", + (int)req_data.size()); + const auto &data = nlohmann::json::parse(*(req_data.begin())); + RET_ON_ERR(data.size() == 1, "Only one supported option. Expect 1. size=%d", + (int)data.size()); + const auto it = data.find(GLOBAL_OPTION_HEARTBEAT); + RET_ON_ERR(it != data.end(), "Expect HEARTBEAT_INTERVAL; got %s", + data.begin().key().c_str()); + stats->set_heartbeat_interval(it.value()); + ret = 0; + } + else { + nlohmann::json msg = nlohmann::json::object(); + msg[GLOBAL_OPTION_HEARTBEAT] = stats->get_heartbeat_interval(); + resp_data.push_back(msg.dump()); + ret = 0; + } +out: + return ret; +} + + +void +run_eventd_service() +{ + int code = 0; + int cache_max; + event_service service; + stats_collector stats_instance; + eventd_proxy *proxy = NULL; + capture_service *capture = NULL; + + event_serialized_lst_t capture_fifo_events; + last_events_t capture_last_events; + + SWSS_LOG_INFO("Eventd service starting\n"); + + void *zctx = zmq_ctx_new(); + RET_ON_ERR(zctx != NULL, "Failed to get zmq ctx"); + + cache_max = get_config_data(string(CACHE_MAX_CNT), (int)MAX_CACHE_SIZE); + RET_ON_ERR(cache_max > 0, "Failed to get CACHE_MAX_CNT"); + + proxy = new eventd_proxy(zctx); + RET_ON_ERR(proxy != NULL, "Failed to create proxy"); + + RET_ON_ERR(proxy->init() == 0, "Failed to init proxy"); + + RET_ON_ERR(service.init_server(zctx) == 0, "Failed to init service"); + + RET_ON_ERR(stats_instance.start() == 0, "Failed to start stats collector"); + + /* Pause heartbeat during caching */ + stats_instance.heartbeat_ctrl(true); + + /* + * Start cache service, right upon eventd starts so as not to lose + * events until telemetry starts. + * Telemetry will send a stop & collect cache upon startup + */ + capture = new capture_service(zctx, cache_max, &stats_instance); + RET_ON_ERR(capture->set_control(INIT_CAPTURE) == 0, "Failed to init capture"); + RET_ON_ERR(capture->set_control(START_CAPTURE) == 0, "Failed to start capture"); + + this_thread::sleep_for(chrono::milliseconds(200)); + RET_ON_ERR(stats_instance.is_running(), "Failed to start stats instance"); + + while(code != EVENT_EXIT) { + int resp = -1; + event_serialized_lst_t req_data, resp_data; + + RET_ON_ERR(service.channel_read(code, req_data) == 0, + "Failed to read request"); + + switch(code) { + case EVENT_CACHE_INIT: + /* connect only*/ + if (capture != NULL) { + delete capture; + } + event_serialized_lst_t().swap(capture_fifo_events); + last_events_t().swap(capture_last_events); + + capture = new capture_service(zctx, cache_max, &stats_instance); + if (capture != NULL) { + resp = capture->set_control(INIT_CAPTURE); + } + break; + + + case EVENT_CACHE_START: + if (capture == NULL) { + SWSS_LOG_ERROR("Cache is not initialized to start"); + resp = -1; + break; + } + /* Pause heartbeat during caching */ + stats_instance.heartbeat_ctrl(true); + + resp = capture->set_control(START_CAPTURE, &req_data); + break; + + + case EVENT_CACHE_STOP: + if (capture == NULL) { + SWSS_LOG_ERROR("Cache is not initialized to stop"); + resp = -1; + break; + } + resp = capture->set_control(STOP_CAPTURE); + if (resp == 0) { + counters_t overflow; + resp = capture->read_cache(capture_fifo_events, capture_last_events, + overflow); + } + delete capture; + capture = NULL; + + /* Unpause heartbeat upon stop caching */ + stats_instance.heartbeat_ctrl(); + break; + + + case EVENT_CACHE_READ: + if (capture != NULL) { + SWSS_LOG_ERROR("Cache is not stopped yet."); + resp = -1; + break; + } + resp = 0; + + if (capture_fifo_events.empty()) { + for (last_events_t::iterator it = capture_last_events.begin(); + it != capture_last_events.end(); ++it) { + capture_fifo_events.push_back(it->second); + } + last_events_t().swap(capture_last_events); + } + + { + int sz = VEC_SIZE(capture_fifo_events) < READ_SET_SIZE ? + VEC_SIZE(capture_fifo_events) : READ_SET_SIZE; + + if (sz != 0) { + auto it = std::next(capture_fifo_events.begin(), sz); + move(capture_fifo_events.begin(), capture_fifo_events.end(), + back_inserter(resp_data)); + + if (sz == VEC_SIZE(capture_fifo_events)) { + event_serialized_lst_t().swap(capture_fifo_events); + } else { + capture_fifo_events.erase(capture_fifo_events.begin(), it); + } + } + } + break; + + + case EVENT_ECHO: + resp = 0; + resp_data.swap(req_data); + break; + + case EVENT_OPTIONS: + resp = process_options(&stats_instance, req_data, resp_data); + break; + + case EVENT_EXIT: + resp = 0; + break; + + default: + SWSS_LOG_ERROR("Unexpected request: %d", code); + assert(false); + break; + } + RET_ON_ERR(service.channel_write(resp, resp_data) == 0, + "Failed to write response back"); + } +out: + service.close_service(); + stats_instance.stop(); + + if (proxy != NULL) { + delete proxy; + } + if (capture != NULL) { + delete capture; + } + if (zctx != NULL) { + zmq_ctx_term(zctx); + } + SWSS_LOG_ERROR("Eventd service exiting\n"); +} + +void set_unit_testing(bool b) +{ + s_unit_testing = b; +} + + diff --git a/src/sonic-eventd/src/eventd.h b/src/sonic-eventd/src/eventd.h new file mode 100644 index 000000000000..8411223b35be --- /dev/null +++ b/src/sonic-eventd/src/eventd.h @@ -0,0 +1,268 @@ +/* + * Header file for eventd daemon + */ +#include "table.h" +#include "events_service.h" +#include "events.h" +#include "events_wrap.h" + +#define ARRAY_SIZE(l) (sizeof(l)/sizeof((l)[0])) + +typedef map last_events_t; + +/* stat counters */ +typedef uint64_t counters_t; + +typedef enum { + INDEX_COUNTERS_EVENTS_PUBLISHED, + INDEX_COUNTERS_EVENTS_MISSED_CACHE, + COUNTERS_EVENTS_TOTAL +} stats_counter_index_t; + +#define EVENTS_STATS_FIELD_NAME "value" +#define STATS_HEARTBEAT_MIN 300 + +/* + * Started by eventd_service. + * Creates XPUB & XSUB end points. + * Bind the same + * Create a PUB socket end point for capture and bind. + * Call run_proxy method with sockets in a dedicated thread. + * Thread runs forever until the zmq context is terminated. + */ +class eventd_proxy +{ + public: + eventd_proxy(void *ctx) : m_ctx(ctx), m_frontend(NULL), m_backend(NULL), + m_capture(NULL) {}; + + ~eventd_proxy() { + zmq_close(m_frontend); + zmq_close(m_backend); + zmq_close(m_capture); + + if (m_thr.joinable()) + m_thr.join(); + } + + int init(); + + private: + void run(); + + void *m_ctx; + void *m_frontend; + void *m_backend; + void *m_capture; + thread m_thr; +}; + + +class stats_collector +{ + public: + stats_collector(); + + ~stats_collector() { stop(); } + + int start(); + + void stop() { + + m_shutdown = true; + + if (m_thr_collector.joinable()) { + m_thr_collector.join(); + } + + if (m_thr_writer.joinable()) { + m_thr_writer.join(); + } + } + + void increment_published(counters_t val) { + _update_stats(INDEX_COUNTERS_EVENTS_PUBLISHED, val); + } + + void increment_missed_cache(counters_t val) { + _update_stats(INDEX_COUNTERS_EVENTS_MISSED_CACHE, val); + } + + counters_t read_counter(stats_counter_index_t index) { + if (index != COUNTERS_EVENTS_TOTAL) { + return m_lst_counters[index]; + } + else { + return 0; + } + } + + /* Sets heartbeat interval in milliseconds */ + void set_heartbeat_interval(int val_in_ms); + + /* + * Get heartbeat interval in milliseconds + * NOTE: Set & get value may not match as the value is rounded + * to a multiple of smallest possible interval. + */ + int get_heartbeat_interval(); + + /* A way to pause heartbeat */ + void heartbeat_ctrl(bool pause = false) { + m_pause_heartbeat = pause; + SWSS_LOG_INFO("Set heartbeat_ctrl pause=%d", pause); + } + + uint64_t heartbeats_published() const { + return m_heartbeats_published; + } + + bool is_running() + { + return !m_shutdown; + } + + private: + void _update_stats(stats_counter_index_t index, counters_t val) { + if (index != COUNTERS_EVENTS_TOTAL) { + m_lst_counters[index] += val; + m_updated = true; + } + else { + SWSS_LOG_ERROR("Internal code error. Invalid index=%d", index); + } + } + + void run_collector(); + + void run_writer(); + + atomic m_updated; + + counters_t m_lst_counters[COUNTERS_EVENTS_TOTAL]; + + bool m_shutdown; + + thread m_thr_collector; + thread m_thr_writer; + + shared_ptr m_counters_db; + shared_ptr m_stats_table; + + bool m_pause_heartbeat; + + uint64_t m_heartbeats_published; + + int m_heartbeats_interval_cnt; +}; + +/* + * Capture/Cache service + * + * The service started in a dedicted thread upon demand. + * It is controlled by the caller. + * On cache init, the thread is created. + * Upon create, it creates a SUB socket to PUB end point of capture. + * PUB end point is maintained by zproxy service. + * + * On Cache start, the thread is signalled to start reading. + * + * On cache stop, it is signalled to stop reading and exit. Caller waits + * for thread to exit, before starting to read cached data, to ensure + * that the data is not handled by two threads concurrently. + * + * This thread maintains its own copy of cache. Reader, does a swap + * after thread exits. + * This thread ensures the cache is empty at the init. + * + * Upon cache start, the thread is blocked in receive call with timeout. + * Only upon receive/timeout, it would notice stop signal. Hence stop + * is not synchronous. The caller may wait for thread to terminate + * via thread.join(). + * + * Each event is 2 parts. It drops the first part, which is + * more for filtering events. It creates string from second part + * and saves it. + * + * The string is the serialized version of internal_event_ref + * + * It keeps two sets of data + * 1) List of all events received in vector in same order as received + * 2) Map of last event from each runtime id upon list overflow max size. + * + * We add to the vector as much as allowed by vector and max limit, + * whichever comes first. + * + * The sequence number in internal event will help assess the missed count + * by the consumer of the cache data. + * + */ +typedef enum { + NEED_INIT = 0, + INIT_CAPTURE, + START_CAPTURE, + STOP_CAPTURE +} capture_control_t; + + +class capture_service +{ + public: + capture_service(void *ctx, int cache_max, stats_collector *stats) : + m_ctx(ctx), m_stats_instance(stats), m_cap_run(false), + m_ctrl(NEED_INIT), m_cache_max(cache_max), + m_last_events_init(false), m_total_missed_cache(0) + {} + + ~capture_service(); + + int set_control(capture_control_t ctrl, event_serialized_lst_t *p=NULL); + + int read_cache(event_serialized_lst_t &lst_fifo, + last_events_t &lst_last, counters_t &overflow_cnt); + + private: + void init_capture_cache(const event_serialized_lst_t &lst); + void do_capture(); + + void stop_capture(); + + void *m_ctx; + stats_collector *m_stats_instance; + + bool m_cap_run; + capture_control_t m_ctrl; + thread m_thr; + + int m_cache_max; + + event_serialized_lst_t m_events; + + last_events_t m_last_events; + bool m_last_events_init; + + typedef map pre_exist_id_t; + pre_exist_id_t m_pre_exist_id; + + counters_t m_total_missed_cache; + +}; + + +/* + * Main server, that starts the zproxy service and honor + * eventd service requests event_req_type_t + * + * For echo, it just echoes + * + * FOr cache start, create the SUB end of capture and kick off + * capture_events thread. Upon cache stop command, close the handle + * which will stop the caching thread with read failure. + * + * for cache read, returns the collected events in chunks. + * + */ +void run_eventd_service(); + +/* To help skip redis access during unit testing */ +void set_unit_testing(bool b); diff --git a/src/sonic-eventd/src/main.cpp b/src/sonic-eventd/src/main.cpp new file mode 100644 index 000000000000..7a20497f0986 --- /dev/null +++ b/src/sonic-eventd/src/main.cpp @@ -0,0 +1,18 @@ +#include "logger.h" +#include "eventd.h" + +void run_eventd_service(); + +int main() +{ + swss::Logger::setMinPrio(swss::Logger::SWSS_DEBUG); + SWSS_LOG_INFO("The eventd service started"); + SWSS_LOG_ERROR("ERR:The eventd service started"); + + run_eventd_service(); + + SWSS_LOG_INFO("The eventd service exited"); + + return 0; +} + diff --git a/src/sonic-eventd/src/subdir.mk b/src/sonic-eventd/src/subdir.mk new file mode 100644 index 000000000000..a1e2b55f8d13 --- /dev/null +++ b/src/sonic-eventd/src/subdir.mk @@ -0,0 +1,13 @@ +CC := g++ + +TEST_OBJS += ./src/eventd.o +OBJS += ./src/eventd.o ./src/main.o + +C_DEPS += ./src/eventd.d ./src/main.d + +src/%.o: src/%.cpp + @echo 'Building file: $<' + @echo 'Invoking: GCC C++ Compiler' + $(CC) -D__FILENAME__="$(subst src/,,$<)" $(CFLAGS) -c -fmessage-length=0 -MMD -MP -MF"$(@:%.o=%.d)" -MT"$(@)" -o "$@" "$<" + @echo 'Finished building: $<' + @echo ' ' diff --git a/src/sonic-eventd/tests/eventd_ut.cpp b/src/sonic-eventd/tests/eventd_ut.cpp new file mode 100644 index 000000000000..399255edb2b8 --- /dev/null +++ b/src/sonic-eventd/tests/eventd_ut.cpp @@ -0,0 +1,915 @@ +#include +#include +#include +#include +#include +#include +#include +#include "gtest/gtest.h" +#include "events_common.h" +#include "events.h" +#include "../src/eventd.h" + +using namespace std; +using namespace swss; + +extern bool g_is_redis_available; +extern const char *counter_keys[]; + +typedef struct { + int id; + string source; + string tag; + string rid; + string seq; + event_params_t params; + int missed_cnt; +} test_data_t; + +internal_event_t create_ev(const test_data_t &data) +{ + internal_event_t event_data; + + event_data[EVENT_STR_DATA] = convert_to_json( + data.source + ":" + data.tag, data.params); + event_data[EVENT_RUNTIME_ID] = data.rid; + event_data[EVENT_SEQUENCE] = data.seq; + + return event_data; +} + +/* Mock test data with event parameters and expected missed count */ +static const test_data_t ldata[] = { + { + 0, + "source0", + "tag0", + "guid-0", + "1", + {{"ip", "10.10.10.10"}, {"state", "up"}}, + 0 + }, + { + 1, + "source0", + "tag1", + "guid-1", + "100", + {{"ip", "10.10.27.10"}, {"state", "down"}}, + 0 + }, + { + 2, + "source1", + "tag2", + "guid-2", + "101", + {{"ip", "10.10.24.10"}, {"state", "down"}}, + 0 + }, + { + 3, + "source0", + "tag3", + "guid-1", + "105", + {{"ip", "10.10.10.10"}, {"state", "up"}}, + 4 + }, + { + 4, + "source0", + "tag4", + "guid-0", + "2", + {{"ip", "10.10.20.10"}, {"state", "down"}}, + 0 + }, + { + 5, + "source1", + "tag5", + "guid-2", + "110", + {{"ip", "10.10.24.10"}, {"state", "down"}}, + 8 + }, + { + 6, + "source0", + "tag0", + "guid-0", + "5", + {{"ip", "10.10.10.10"}, {"state", "up"}}, + 2 + }, + { + 7, + "source0", + "tag1", + "guid-1", + "106", + {{"ip", "10.10.27.10"}, {"state", "down"}}, + 0 + }, + { + 8, + "source1", + "tag2", + "guid-2", + "111", + {{"ip", "10.10.24.10"}, {"state", "down"}}, + 0 + }, + { + 9, + "source0", + "tag3", + "guid-1", + "109", + {{"ip", "10.10.10.10"}, {"state", "up"}}, + 2 + }, + { + 10, + "source0", + "tag4", + "guid-0", + "6", + {{"ip", "10.10.20.10"}, {"state", "down"}}, + 0 + }, + { + 11, + "source1", + "tag5", + "guid-2", + "119", + {{"ip", "10.10.24.10"}, {"state", "down"}}, + 7 + }, +}; + + +void run_cap(void *zctx, bool &term, string &read_source, + int &cnt) +{ + void *mock_cap = zmq_socket (zctx, ZMQ_SUB); + string source; + internal_event_t ev_int; + int block_ms = 200; + int i=0; + + EXPECT_TRUE(NULL != mock_cap); + EXPECT_EQ(0, zmq_connect(mock_cap, get_config(CAPTURE_END_KEY).c_str())); + EXPECT_EQ(0, zmq_setsockopt(mock_cap, ZMQ_SUBSCRIBE, "", 0)); + EXPECT_EQ(0, zmq_setsockopt(mock_cap, ZMQ_RCVTIMEO, &block_ms, sizeof (block_ms))); + + while(!term) { + string source; + internal_event_t ev_int; + + if (0 == zmq_message_read(mock_cap, 0, source, ev_int)) { + cnt = ++i; + } + } + zmq_close(mock_cap); +} + +void run_sub(void *zctx, bool &term, string &read_source, internal_events_lst_t &lst, + int &cnt) +{ + void *mock_sub = zmq_socket (zctx, ZMQ_SUB); + string source; + internal_event_t ev_int; + int block_ms = 200; + + EXPECT_TRUE(NULL != mock_sub); + EXPECT_EQ(0, zmq_connect(mock_sub, get_config(XPUB_END_KEY).c_str())); + EXPECT_EQ(0, zmq_setsockopt(mock_sub, ZMQ_SUBSCRIBE, "", 0)); + EXPECT_EQ(0, zmq_setsockopt(mock_sub, ZMQ_RCVTIMEO, &block_ms, sizeof (block_ms))); + + while(!term) { + if (0 == zmq_message_read(mock_sub, 0, source, ev_int)) { + lst.push_back(ev_int); + read_source.swap(source); + cnt = (int)lst.size(); + } + } + + zmq_close(mock_sub); +} + +void *init_pub(void *zctx) +{ + void *mock_pub = zmq_socket (zctx, ZMQ_PUB); + EXPECT_TRUE(NULL != mock_pub); + EXPECT_EQ(0, zmq_connect(mock_pub, get_config(XSUB_END_KEY).c_str())); + + /* Provide time for async connect to complete */ + this_thread::sleep_for(chrono::milliseconds(200)); + + return mock_pub; +} + +void run_pub(void *mock_pub, const string wr_source, internal_events_lst_t &lst) +{ + for(internal_events_lst_t::const_iterator itc = lst.begin(); itc != lst.end(); ++itc) { + EXPECT_EQ(0, zmq_message_send(mock_pub, wr_source, *itc)); + } +} + + +TEST(eventd, proxy) +{ + printf("Proxy TEST started\n"); + bool term_sub = false; + bool term_cap = false; + string rd_csource, rd_source, wr_source("hello"); + internal_events_lst_t rd_evts, wr_evts; + int rd_evts_sz = 0, rd_cevts_sz = 0; + int wr_sz; + + void *zctx = zmq_ctx_new(); + EXPECT_TRUE(NULL != zctx); + + eventd_proxy *pxy = new eventd_proxy(zctx); + EXPECT_TRUE(NULL != pxy); + + /* Starting proxy */ + EXPECT_EQ(0, pxy->init()); + + /* subscriber in a thread */ + thread thr(&run_sub, zctx, ref(term_sub), ref(rd_source), ref(rd_evts), ref(rd_evts_sz)); + + /* capture in a thread */ + thread thrc(&run_cap, zctx, ref(term_cap), ref(rd_csource), ref(rd_cevts_sz)); + + /* Init pub connection */ + void *mock_pub = init_pub(zctx); + + EXPECT_TRUE(5 < ARRAY_SIZE(ldata)); + + for(int i=0; i<5; ++i) { + wr_evts.push_back(create_ev(ldata[i])); + } + + EXPECT_TRUE(rd_evts.empty()); + EXPECT_TRUE(rd_source.empty()); + + /* Publish events. */ + run_pub(mock_pub, wr_source, wr_evts); + + wr_sz = (int)wr_evts.size(); + for(int i=0; (wr_sz != rd_evts_sz) && (i < 100); ++i) { + /* Loop & wait for atmost a second */ + this_thread::sleep_for(chrono::milliseconds(10)); + } + this_thread::sleep_for(chrono::milliseconds(1000)); + + delete pxy; + pxy = NULL; + + term_sub = true; + term_cap = true; + + thr.join(); + thrc.join(); + EXPECT_EQ(rd_evts.size(), wr_evts.size()); + EXPECT_EQ(rd_cevts_sz, wr_evts.size()); + + zmq_close(mock_pub); + zmq_ctx_term(zctx); + + /* Provide time for async proxy removal to complete */ + this_thread::sleep_for(chrono::milliseconds(200)); + + printf("eventd_proxy is tested GOOD\n"); +} + + +TEST(eventd, capture) +{ + printf("Capture TEST started\n"); + + bool term_sub = false; + string sub_source; + int sub_evts_sz = 0; + internal_events_lst_t sub_evts; + stats_collector stats_instance; + + /* run_pub details */ + string wr_source("hello"); + internal_events_lst_t wr_evts; + + /* capture related */ + int init_cache = 3; /* provided along with start capture */ + int cache_max = init_cache + 3; /* capture service cache max */ + + /* startup strings; expected list & read list from capture */ + event_serialized_lst_t evts_start, evts_expect, evts_read; + last_events_t last_evts_exp, last_evts_read; + counters_t overflow, overflow_exp = 0; + + void *zctx = zmq_ctx_new(); + EXPECT_TRUE(NULL != zctx); + + /* Run the proxy; Capture service reads from proxy */ + eventd_proxy *pxy = new eventd_proxy(zctx); + EXPECT_TRUE(NULL != pxy); + + /* Starting proxy */ + EXPECT_EQ(0, pxy->init()); + + /* Run subscriber; Else publisher will drop events on floor, with no subscriber. */ + thread thr_sub(&run_sub, zctx, ref(term_sub), ref(sub_source), ref(sub_evts), ref(sub_evts_sz)); + + /* Create capture service */ + capture_service *pcap = new capture_service(zctx, cache_max, &stats_instance); + + /* Expect START_CAPTURE */ + EXPECT_EQ(-1, pcap->set_control(STOP_CAPTURE)); + + /* Initialize the capture */ + EXPECT_EQ(0, pcap->set_control(INIT_CAPTURE)); + + EXPECT_TRUE(init_cache > 1); + EXPECT_TRUE((cache_max+3) < (int)ARRAY_SIZE(ldata)); + + /* Collect few serailized strings of events for startup cache */ + for(int i=0; i < init_cache; ++i) { + internal_event_t ev(create_ev(ldata[i])); + string evt_str; + serialize(ev, evt_str); + evts_start.push_back(evt_str); + evts_expect.push_back(evt_str); + } + + /* + * Collect events to publish for capture to cache + * re-publishing some events sent in cache. + * Hence i=1, when first init_cache events are already + * in crash. + */ + for(int i=1; i < (int)ARRAY_SIZE(ldata); ++i) { + internal_event_t ev(create_ev(ldata[i])); + string evt_str; + + serialize(ev, evt_str); + + wr_evts.push_back(ev); + + if (i < cache_max) { + if (i >= init_cache) { + /* for i < init_cache, evts_expect is already populated */ + evts_expect.push_back(evt_str); + } + } else { + /* collect last entries for overflow */ + last_evts_exp[ldata[i].rid] = evt_str; + overflow_exp++; + } + } + overflow_exp -= (int)last_evts_exp.size(); + + EXPECT_EQ(0, pcap->set_control(START_CAPTURE, &evts_start)); + + /* Init pub connection */ + void *mock_pub = init_pub(zctx); + + /* Publish events from 1 to all. */ + run_pub(mock_pub, wr_source, wr_evts); + + /* Provide time for async message receive. */ + this_thread::sleep_for(chrono::milliseconds(200)); + + /* Stop capture, closes socket & terminates the thread */ + EXPECT_EQ(0, pcap->set_control(STOP_CAPTURE)); + + /* terminate subs thread */ + term_sub = true; + + /* Read the cache */ + EXPECT_EQ(0, pcap->read_cache(evts_read, last_evts_read, overflow)); + +#ifdef DEBUG_TEST + if ((evts_read.size() != evts_expect.size()) || + (last_evts_read.size() != last_evts_exp.size())) { + printf("size: sub_evts_sz=%d sub_evts=%d\n", sub_evts_sz, (int)sub_evts.size()); + printf("init_cache=%d cache_max=%d\n", init_cache, cache_max); + printf("overflow=%ul overflow_exp=%ul\n", overflow, overflow_exp); + printf("evts_start=%d evts_expect=%d evts_read=%d\n", + (int)evts_start.size(), (int)evts_expect.size(), (int)evts_read.size()); + printf("last_evts_exp=%d last_evts_read=%d\n", (int)last_evts_exp.size(), + (int)last_evts_read.size()); + } +#endif + + EXPECT_EQ(evts_read.size(), evts_expect.size()); + EXPECT_EQ(evts_read, evts_expect); + EXPECT_EQ(last_evts_read.size(), last_evts_exp.size()); + EXPECT_EQ(last_evts_read, last_evts_exp); + EXPECT_EQ(overflow, overflow_exp); + + delete pxy; + pxy = NULL; + + delete pcap; + pcap = NULL; + + thr_sub.join(); + + zmq_close(mock_pub); + zmq_ctx_term(zctx); + + /* Provide time for async proxy removal to complete */ + this_thread::sleep_for(chrono::milliseconds(200)); + + printf("Capture TEST completed\n"); +} + +TEST(eventd, captureCacheMax) +{ + printf("Capture TEST with matchinhg cache-max started\n"); + + /* + * Need to run subscriber; Else publisher would skip publishing + * in the absence of any subscriber. + */ + bool term_sub = false; + string sub_source; + int sub_evts_sz = 0; + internal_events_lst_t sub_evts; + stats_collector stats_instance; + + /* run_pub details */ + string wr_source("hello"); + internal_events_lst_t wr_evts; + + /* capture related */ + int init_cache = 4; /* provided along with start capture */ + int cache_max = ARRAY_SIZE(ldata); /* capture service cache max */ + + /* startup strings; expected list & read list from capture */ + event_serialized_lst_t evts_start, evts_expect, evts_read; + last_events_t last_evts_read; + counters_t overflow; + + void *zctx = zmq_ctx_new(); + EXPECT_TRUE(NULL != zctx); + + /* Run the proxy; Capture service reads from proxy */ + eventd_proxy *pxy = new eventd_proxy(zctx); + EXPECT_TRUE(NULL != pxy); + + /* Starting proxy */ + EXPECT_EQ(0, pxy->init()); + + /* Run subscriber; Else publisher will drop events on floor, with no subscriber. */ + thread thr_sub(&run_sub, zctx, ref(term_sub), ref(sub_source), ref(sub_evts), ref(sub_evts_sz)); + + /* Create capture service */ + capture_service *pcap = new capture_service(zctx, cache_max, &stats_instance); + + /* Expect START_CAPTURE */ + EXPECT_EQ(-1, pcap->set_control(STOP_CAPTURE)); + + EXPECT_TRUE(init_cache > 1); + + /* Collect few serailized strings of events for startup cache */ + for(int i=0; i < init_cache; ++i) { + internal_event_t ev(create_ev(ldata[i])); + string evt_str; + serialize(ev, evt_str); + evts_start.push_back(evt_str); + evts_expect.push_back(evt_str); + } + + /* + * Collect events to publish for capture to cache + * re-publishing some events sent in cache. + */ + for(int i=1; i < (int)ARRAY_SIZE(ldata); ++i) { + internal_event_t ev(create_ev(ldata[i])); + string evt_str; + + serialize(ev, evt_str); + + wr_evts.push_back(ev); + + if (i >= init_cache) { + /* for i < init_cache, evts_expect is already populated */ + evts_expect.push_back(evt_str); + } + } + + EXPECT_EQ(0, pcap->set_control(INIT_CAPTURE)); + EXPECT_EQ(0, pcap->set_control(START_CAPTURE, &evts_start)); + + /* Init pub connection */ + void *mock_pub = init_pub(zctx); + + /* Publish events from 1 to all. */ + run_pub(mock_pub, wr_source, wr_evts); + + /* Provide time for async message receive. */ + this_thread::sleep_for(chrono::milliseconds(100)); + + /* Stop capture, closes socket & terminates the thread */ + EXPECT_EQ(0, pcap->set_control(STOP_CAPTURE)); + + /* terminate subs thread */ + term_sub = true; + + /* Read the cache */ + EXPECT_EQ(0, pcap->read_cache(evts_read, last_evts_read, overflow)); + +#ifdef DEBUG_TEST + if ((evts_read.size() != evts_expect.size()) || + !last_evts_read.empty()) { + printf("size: sub_evts_sz=%d sub_evts=%d\n", sub_evts_sz, (int)sub_evts.size()); + printf("init_cache=%d cache_max=%d\n", init_cache, cache_max); + printf("evts_start=%d evts_expect=%d evts_read=%d\n", + (int)evts_start.size(), (int)evts_expect.size(), (int)evts_read.size()); + printf("last_evts_read=%d\n", (int)last_evts_read.size()); + printf("overflow=%ul overflow_exp=%ul\n", overflow, overflow_exp); + } +#endif + + EXPECT_EQ(evts_read, evts_expect); + EXPECT_TRUE(last_evts_read.empty()); + EXPECT_EQ(overflow, 0); + + delete pxy; + pxy = NULL; + + delete pcap; + pcap = NULL; + + thr_sub.join(); + + zmq_close(mock_pub); + zmq_ctx_term(zctx); + + /* Provide time for async proxy removal to complete */ + this_thread::sleep_for(chrono::milliseconds(200)); + + printf("Capture TEST with matchinhg cache-max completed\n"); +} + +TEST(eventd, service) +{ + /* + * Don't PUB/SUB events as main run_eventd_service itself + * is using zmq_message_read. Any PUB/SUB will cause + * eventd's do_capture running in another thread to call + * zmq_message_read, which will crash as boost:archive is + * not thread safe. + * TEST(eventd, capture) has already tested caching. + */ + printf("Service TEST started\n"); + + /* startup strings; expected list & read list from capture */ + event_service service; + + void *zctx = zmq_ctx_new(); + EXPECT_TRUE(NULL != zctx); + + /* + * Start the eventd server side service + * It runs proxy & capture service + * It uses its own zmq context + * It starts to capture too. + */ + + if (!g_is_redis_available) { + set_unit_testing(true); + } + + thread thread_service(&run_eventd_service); + + /* Need client side service to interact with server side */ + EXPECT_EQ(0, service.init_client(zctx)); + + { + /* eventd_service starts cache too; Test this caching */ + /* Init pub connection */ + void *mock_pub = init_pub(zctx); + EXPECT_TRUE(NULL != mock_pub); + + internal_events_lst_t wr_evts; + int wr_sz = 2; + string wr_source("hello"); + + /* Test service startup caching */ + event_serialized_lst_t evts_start, evts_read; + + for(int i=0; i evts_start_int; + + EXPECT_TRUE(init_cache > 1); + + /* Collect few serailized strings of events for startup cache */ + for(int i=0; i < init_cache; ++i) { + internal_event_t ev(create_ev(ldata[i])); + string evt_str; + serialize(ev, evt_str); + evts_start.push_back(evt_str); + evts_start_int.push_back(ev); + } + + + EXPECT_EQ(0, service.cache_init()); + EXPECT_EQ(0, service.cache_start(evts_start)); + + this_thread::sleep_for(chrono::milliseconds(200)); + + /* Stop capture, closes socket & terminates the thread */ + EXPECT_EQ(0, service.cache_stop()); + + /* Read the cache */ + EXPECT_EQ(0, service.cache_read(evts_read)); + + if (evts_read != evts_start) { + vector evts_read_int; + + for (event_serialized_lst_t::const_iterator itc = evts_read.begin(); + itc != evts_read.end(); ++itc) { + internal_event_t event; + + if (deserialize(*itc, event) == 0) { + evts_read_int.push_back(event); + } + } + EXPECT_EQ(evts_read_int, evts_start_int); + } + } + + { + string set_opt_bad("{\"HEARTBEAT_INTERVAL\": 2000, \"OFFLINE_CACHE_SIZE\": 500}"); + string set_opt_good("{\"HEARTBEAT_INTERVAL\":5}"); + char buff[100]; + buff[0] = 0; + + EXPECT_EQ(-1, service.global_options_set(set_opt_bad.c_str())); + EXPECT_EQ(0, service.global_options_set(set_opt_good.c_str())); + EXPECT_LT(0, service.global_options_get(buff, sizeof(buff))); + + EXPECT_EQ(set_opt_good, string(buff)); + } + + EXPECT_EQ(0, service.send_recv(EVENT_EXIT)); + + service.close_service(); + + thread_service.join(); + + zmq_ctx_term(zctx); + printf("Service TEST completed\n"); +} + + +void +wait_for_heartbeat(stats_collector &stats_instance, long unsigned int cnt, + int wait_ms = 3000) +{ + int diff = 0; + + auto st = duration_cast(system_clock::now().time_since_epoch()).count(); + while (stats_instance.heartbeats_published() == cnt) { + auto en = duration_cast(system_clock::now().time_since_epoch()).count(); + diff = en - st; + if (diff > wait_ms) { + EXPECT_LE(diff, wait_ms); + EXPECT_EQ(cnt, stats_instance.heartbeats_published()); + break; + } + else { + stringstream ss; + ss << (en -st); + } + this_thread::sleep_for(chrono::milliseconds(300)); + } +} + +TEST(eventd, heartbeat) +{ + printf("heartbeat TEST started\n"); + + int rc; + long unsigned int cnt; + stats_collector stats_instance; + + if (!g_is_redis_available) { + set_unit_testing(true); + } + + void *zctx = zmq_ctx_new(); + EXPECT_TRUE(NULL != zctx); + + eventd_proxy *pxy = new eventd_proxy(zctx); + EXPECT_TRUE(NULL != pxy); + + /* Starting proxy */ + EXPECT_EQ(0, pxy->init()); + + rc = stats_instance.start(); + EXPECT_EQ(rc, 0); + + /* Wait for any non-zero heartbeat */ + wait_for_heartbeat(stats_instance, 0); + + /* Pause heartbeat */ + stats_instance.heartbeat_ctrl(true); + + /* Sleep to ensure the other thread noticed the pause request. */ + this_thread::sleep_for(chrono::milliseconds(200)); + + /* Get current count */ + cnt = stats_instance.heartbeats_published(); + + /* Wait for 3 seconds with no new neartbeat */ + this_thread::sleep_for(chrono::seconds(3)); + + EXPECT_EQ(stats_instance.heartbeats_published(), cnt); + + /* Set interval as 1 second */ + stats_instance.set_heartbeat_interval(1); + + /* Turn on heartbeat */ + stats_instance.heartbeat_ctrl(); + + /* Wait for heartbeat count to change from last count */ + wait_for_heartbeat(stats_instance, cnt, 2000); + + stats_instance.stop(); + + delete pxy; + + zmq_ctx_term(zctx); + + printf("heartbeat TEST completed\n"); +} + + +TEST(eventd, testDB) +{ + printf("DB TEST started\n"); + + /* consts used */ + const int pub_count = 7; + const int cache_max = 3; + + stats_collector stats_instance; + event_handle_t pub_handle; + event_serialized_lst_t evts_read; + last_events_t last_evts_read; + counters_t overflow; + string tag; + + if (!g_is_redis_available) { + printf("redis not available; Hence DB TEST skipped\n"); + return; + } + + EXPECT_LT(cache_max, pub_count); + DBConnector db("COUNTERS_DB", 0, true); + + + /* Not testing heartbeat; Hence set high val as 10 seconds */ + stats_instance.set_heartbeat_interval(10000); + + /* Start instance to capture published count & as well writes to DB */ + EXPECT_EQ(0, stats_instance.start()); + + void *zctx = zmq_ctx_new(); + EXPECT_TRUE(NULL != zctx); + + /* Run proxy to enable receive as capture test needs to receive */ + eventd_proxy *pxy = new eventd_proxy(zctx); + EXPECT_TRUE(NULL != pxy); + + /* Starting proxy */ + EXPECT_EQ(0, pxy->init()); + + /* Create capture service */ + capture_service *pcap = new capture_service(zctx, cache_max, &stats_instance); + + /* Initialize the capture */ + EXPECT_EQ(0, pcap->set_control(INIT_CAPTURE)); + + /* Kick off capture */ + EXPECT_EQ(0, pcap->set_control(START_CAPTURE)); + + pub_handle = events_init_publisher("test_db"); + + for(int i=0; i < pub_count; ++i) { + tag = string("test_db_tag_") + to_string(i); + event_publish(pub_handle, tag); + } + + /* Pause to ensure all publisghed events did reach capture service */ + this_thread::sleep_for(chrono::milliseconds(200)); + + EXPECT_EQ(0, pcap->set_control(STOP_CAPTURE)); + + /* Read the cache */ + EXPECT_EQ(0, pcap->read_cache(evts_read, last_evts_read, overflow)); + + /* + * Sent pub_count messages of different tags. + * Upon cache max, only event per sender/runtime-id is saved. Hence + * expected last_evts_read is one. + * expected overflow = pub_count - cache_max - 1 + */ + + EXPECT_EQ(cache_max, (int)evts_read.size()); + EXPECT_EQ(1, (int)last_evts_read.size()); + EXPECT_EQ((pub_count - cache_max - 1), overflow); + + EXPECT_EQ(pub_count, stats_instance.read_counter( + INDEX_COUNTERS_EVENTS_PUBLISHED)); + EXPECT_EQ((pub_count - cache_max - 1), stats_instance.read_counter( + INDEX_COUNTERS_EVENTS_MISSED_CACHE)); + + events_deinit_publisher(pub_handle); + + for (int i=0; i < COUNTERS_EVENTS_TOTAL; ++i) { + string key = string("COUNTERS_EVENTS:") + counter_keys[i]; + unordered_map m; + bool key_found = false, val_found=false, val_match=false; + + if (db.exists(key)) { + try { + m = db.hgetall(key); + unordered_map::const_iterator itc = + m.find(string(EVENTS_STATS_FIELD_NAME)); + if (itc != m.end()) { + int expect = (counter_keys[i] == string(COUNTERS_EVENTS_PUBLISHED) ? + pub_count : (pub_count - cache_max - 1)); + val_match = (expect == stoi(itc->second) ? true : false); + val_found = true; + } + } + catch (exception &e) + { + printf("Failed to get key=(%s) err=(%s)", key.c_str(), e.what()); + EXPECT_TRUE(false); + } + key_found = true; + } + + if (!val_match) { + printf("key=%s key_found=%d val_found=%d fields=%d", + key.c_str(), key_found, val_found, (int)m.size()); + + printf("hgetall BEGIN key=%s", key.c_str()); + for(unordered_map::const_iterator itc = m.begin(); + itc != m.end(); ++itc) { + printf("val[%s] = (%s)", itc->first.c_str(), itc->second.c_str()); + } + printf("hgetall END\n"); + EXPECT_TRUE(false); + } + } + + stats_instance.stop(); + + delete pxy; + delete pcap; + + zmq_ctx_term(zctx); + + printf("DB TEST completed\n"); +} + + +// TODO -- Add unit tests for stats diff --git a/src/sonic-eventd/tests/main.cpp b/src/sonic-eventd/tests/main.cpp new file mode 100644 index 000000000000..4b869e8c3004 --- /dev/null +++ b/src/sonic-eventd/tests/main.cpp @@ -0,0 +1,97 @@ +#include "gtest/gtest.h" +#include "dbconnector.h" +#include + +using namespace std; +using namespace swss; + +string existing_file = "./tests/redis_multi_db_ut_config/database_config.json"; +string nonexisting_file = "./tests/redis_multi_db_ut_config/database_config_nonexisting.json"; +string global_existing_file = "./tests/redis_multi_db_ut_config/database_global.json"; + +#define TEST_DB "APPL_DB" +#define TEST_NAMESPACE "asic0" +#define INVALID_NAMESPACE "invalid" + +bool g_is_redis_available = false; + +class SwsscommonEnvironment : public ::testing::Environment { +public: + // Override this to define how to set up the environment + void SetUp() override { + // by default , init should be false + cout<<"Default : isInit = "<:tag\n") + return sourceTag + +def getFVMFromParams(params): + param_dict = FieldValueMap() + for key, value in params.items(): + key = str(key) + value = str(value) + param_dict[key] = value + return param_dict + +def publishEvents(line, publisher_handle): + try: + json_dict = json.loads(line) + except Exception as ex: + logging.error("JSON string not able to be parsed\n") + return + if not json_dict or len(json_dict) != 1: + logging.error("JSON string not able to be parsed\n") + return + sourceTag = list(json_dict)[0] + params = list(json_dict.values())[0] + tag = getTag(sourceTag) + param_dict = getFVMFromParams(params) + if param_dict: + event_publish(publisher_handle, tag, param_dict) + +def publishEventsFromFile(publisher_handle, infile, count, pause): + try: + with open(infile, 'r') as f: + for line in f.readlines(): + line.rstrip() + publishEvents(line, publisher_handle) + time.sleep(pause) + except Exception as ex: + logging.error("Unable to open file from given path or has incorrect json format, gives exception {}\n".format(ex)) + logging.info("Switching to default bgp state publish events\n") + publishBGPEvents(publisher_handle, count, pause) + +def publishBGPEvents(publisher_handle, count, pause): + ip_addresses = [] + param_dict = FieldValueMap() + + for _ in range(count): + ip = str(ipaddress.IPv4Address(random.randint(0, 2 ** 32))) + ip_addresses.append(ip) + + # publish down events + for ip in ip_addresses: + param_dict["ip"] = ip + param_dict["status"] = "down" + event_publish(publisher_handle, "bgp-state", param_dict) + time.sleep(pause) + + # publish up events + for ip in ip_addresses: + param_dict["ip"] = ip + event_publish(publisher_handle, "bgp-state", param_dict) + time.sleep(pause) + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("-s", "--source", nargs='?', const='test-event-source', default='test-event-source', help="Source of event, default us test-event-source") + parser.add_argument("-f", "--file", nargs='?', const='', default='', help="File containing json event strings, must be in format \'{\":foo\": {\"aaa\": \"AAA\", \"bbb\": \"BBB\"}}\'") + parser.add_argument("-c", "--count", nargs='?', type=int, const=10, default=10, help="Count of default bgp events to be generated") + parser.add_argument("-p", "--pause", nargs='?', type=float, const=0.0, default=0.0, help="Pause time wanted between each event, default is 0") + args = parser.parse_args() + publisher_handle = events_init_publisher(args.source) + if args.file == '': + publishBGPEvents(publisher_handle, args.count, args.pause) + else: + publishEventsFromFile(publisher_handle, args.file, args.count, args.pause) + +if __name__ == "__main__": + main() diff --git a/src/sonic-eventd/tools/events_tool.cpp b/src/sonic-eventd/tools/events_tool.cpp new file mode 100644 index 000000000000..97b17c1d7566 --- /dev/null +++ b/src/sonic-eventd/tools/events_tool.cpp @@ -0,0 +1,328 @@ +#include +#include +#include "events.h" +#include "events_common.h" + +/* + * Sample i/p file contents for send + * + * {"src_0:key-0": {"foo": "bar", "hello": "world" }} + * {"src_0:key-1": {"foo": "barXX", "hello": "world" }} + * + * Repeat the above line to increase entries. + * Each line is parsed independently, so no "," expected at the end. + */ + +#define ASSERT(res, m, ...) \ + if (!(res)) {\ + int _e = errno; \ + printf("Failed here %s:%d errno:%d zerrno:%d ", __FUNCTION__, __LINE__, _e, zmq_errno()); \ + printf(m, ##__VA_ARGS__); \ + printf("\n"); \ + exit(-1); } + + +typedef enum { + OP_INIT=0, + OP_SEND=1, + OP_RECV=2, + OP_SEND_RECV=3 //SEND|RECV +} op_t; + + +#define PRINT_CHUNK_SZ 2 + +/* + * Usage: + */ + +const char *s_usage = "\ +-s - To Send\n\ +-r - To receive\n\ +Note:\n\ + when both -s & -r are given:\n\ + it uses main thread to publish and fork a dedicated thread to receive.\n\ + The rest of the parameters except -w is used for send\n\ +\n\ +-n - Count of messages to send/receive. When both given, it is used as count to send\n\ + Default: 1 \n\ + A value of 0 implies unlimited\n\ +\n\ +-p - Count of milliseconds to pause between sends or receives. In send-recv mode, it only affects send.\n\ + Default: 0 implying no pause\n\ +\n\ + -i - List of JSON messages to send in a file, with each event/message\n\ + declared in a single line. When n is more than size of list, the list\n\ + is rotated upon completion.\n\ + e.g. '[ \n\ + { \"sonic-bgp:bgp-state\": { \"ip\": \"10.101.01.10\", \"ts\": \"2022-10-11T01:02:30.45567\", \"state\": \"up\" }}\n\ + { \"abc-xxx:yyy-zz\": { \"foo\": \"bar\", \"hello\":\"world\", \"ts\": \"2022-10-11T01:02:30.45567\"}}\n\ + { \"some-mod:some-tag\": {}}\n\ + ]\n\ + Default: \n\ +\n\ +-c - Use offline cache in receive mode\n\ +-o - O/p file to write received events\n\ + Default: STDOUT\n"; + + +bool term_receive = false; + +template +string +t_map_to_str(const Map &m) +{ + stringstream _ss; + string sep; + + _ss << "{"; + for (const auto elem: m) { + _ss << sep << "{" << elem.first << "," << elem.second << "}"; + if (sep.empty()) { + sep = ", "; + } + } + _ss << "}"; + return _ss.str(); +} + +void +do_receive(const event_subscribe_sources_t filter, const string outfile, int cnt, int pause, bool use_cache) +{ + int index=0, total_missed = 0; + ostream* fp = &cout; + ofstream fout; + + if (!outfile.empty()) { + fout.open(outfile); + if (!fout.fail()) { + fp = &fout; + printf("outfile=%s set\n", outfile.c_str()); + } + } + event_handle_t h = events_init_subscriber(use_cache, 2000, filter.empty() ? NULL : &filter); + printf("Subscribed with use_cache=%d timeout=2000 filter %s\n", + use_cache, filter.empty() ? "empty" : "non-empty"); + ASSERT(h != NULL, "Failed to get subscriber handle"); + + while(!term_receive) { + event_receive_op_t evt; + map_str_str_t evtOp; + + int rc = event_receive(h, evt); + if (rc != 0) { + ASSERT(rc == EAGAIN, "Failed to receive rc=%d index=%d\n", + rc, index); + continue; + } + ASSERT(!evt.key.empty(), "received EMPTY key"); + ASSERT(evt.missed_cnt >= 0, "Missed count uninitialized"); + ASSERT(evt.publish_epoch_ms > 0, "publish_epoch_ms uninitialized"); + + total_missed += evt.missed_cnt; + + evtOp[evt.key] = t_map_to_str(evt.params); + (*fp) << t_map_to_str(evtOp) << "\n"; + fp->flush(); + + if ((++index % PRINT_CHUNK_SZ) == 0) { + printf("Received index %d\n", index); + } + + if (cnt > 0) { + if (--cnt <= 0) { + break; + } + } + } + + events_deinit_subscriber(h); + printf("Total received = %d missed = %dfile:%s\n", index, total_missed, + outfile.empty() ? "STDOUT" : outfile.c_str()); +} + + +int +do_send(const string infile, int cnt, int pause) +{ + typedef struct { + string tag; + event_params_t params; + } evt_t; + + typedef vector lst_t; + + lst_t lst; + string source; + event_handle_t h; + int index = 0; + + if (!infile.empty()) { + ifstream input(infile); + + /* Read infile into list of events, that are ready for send */ + for( string line; getline( input, line ); ) + { + evt_t evt; + string str_params; + + const auto &data = nlohmann::json::parse(line); + ASSERT(data.is_object(), "Parsed data is not object"); + ASSERT((int)data.size() == 1, "string parse size = %d", (int)data.size()); + + string key(data.begin().key()); + if (source.empty()) { + source = key.substr(0, key.find(":")); + } else { + ASSERT(source == key.substr(0, key.find(":")), "source:%s read=%s", + source.c_str(), key.substr(0, key.find(":")).c_str()); + } + evt.tag = key.substr(key.find(":")+1); + + const auto &val = data.begin().value(); + ASSERT(val.is_object(), "Parsed params is not object"); + ASSERT((int)val.size() >= 1, "Expect non empty params"); + + for(auto par_it = val.begin(); par_it != val.end(); par_it++) { + evt.params[string(par_it.key())] = string(par_it.value()); + } + lst.push_back(evt); + } + } + + if (lst.empty()) { + evt_t evt = { + "test-tag", + { + { "param1", "foo"}, + {"param2", "bar"} + } + }; + lst.push_back(evt); + } + + h = events_init_publisher(source); + ASSERT(h != NULL, "failed to init publisher"); + + /* cnt = 0 as i/p implies forever */ + + while(cnt >= 0) { + /* Keep resending the list until count is exhausted */ + for(lst_t::const_iterator itc = lst.begin(); (cnt >= 0) && (itc != lst.end()); ++itc) { + const evt_t &evt = *itc; + + if ((++index % PRINT_CHUNK_SZ) == 0) { + printf("Sending index %d\n", index); + } + + int rc = event_publish(h, evt.tag, evt.params.empty() ? NULL : &evt.params); + ASSERT(rc == 0, "Failed to publish index=%d rc=%d", index, rc); + + if ((cnt > 0) && (--cnt == 0)) { + /* set to termninate */ + cnt = -1; + } + else if (pause) { + /* Pause between two sends */ + this_thread::sleep_for(chrono::milliseconds(pause)); + } + } + } + + events_deinit_publisher(h); + printf("Sent %d events\n", index); + return 0; +} + +void usage() +{ + printf("%s", s_usage); + exit(-1); +} + +int main(int argc, char **argv) +{ + bool use_cache = false; + int op = OP_INIT; + int cnt=0, pause=0; + string json_str_msg, outfile("STDOUT"), infile; + event_subscribe_sources_t filter; + + for(;;) + { + switch(getopt(argc, argv, "srn:p:i:o:f:c")) // note the colon (:) to indicate that 'b' has a parameter and is not a switch + { + case 'c': + use_cache = true; + continue; + + case 's': + op |= OP_SEND; + continue; + + case 'r': + op |= OP_RECV; + continue; + + case 'n': + cnt = stoi(optarg); + continue; + + case 'p': + pause = stoi(optarg); + continue; + + case 'i': + infile = optarg; + continue; + + case 'o': + outfile = optarg; + continue; + + case 'f': + { + stringstream ss(optarg); //create string stream from the string + while(ss.good()) { + string substr; + getline(ss, substr, ','); + filter.push_back(substr); + } + } + continue; + + case -1: + break; + + case '?': + case 'h': + default : + usage(); + break; + + } + break; + } + + + printf("op=%d n=%d pause=%d i=%s o=%s\n", + op, cnt, pause, infile.c_str(), outfile.c_str()); + + if (op == OP_SEND_RECV) { + thread thr(&do_receive, filter, outfile, 0, 0, use_cache); + do_send(infile, cnt, pause); + } + else if (op == OP_SEND) { + do_send(infile, cnt, pause); + } + else if (op == OP_RECV) { + do_receive(filter, outfile, cnt, pause, use_cache); + } + else { + ASSERT(false, "Elect -s for send or -r receive or both; Bailing out with no action\n"); + } + + printf("--------- END: Good run -----------------\n"); + return 0; +} + diff --git a/src/sonic-eventd/tools/events_volume_test.py b/src/sonic-eventd/tools/events_volume_test.py new file mode 100644 index 000000000000..73143d483cd8 --- /dev/null +++ b/src/sonic-eventd/tools/events_volume_test.py @@ -0,0 +1,68 @@ +import sys +import subprocess +import time +import logging +import argparse + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(message)s", + handlers = [ + logging.FileHandler("debug.log"), + logging.StreamHandler(sys.stdout) + ] +) + +def read_events_from_file(file, count): + logging.info("Reading from file generated by events_tool") + lines = 0 + with open(file, 'r') as infile: + lines = infile.readlines() + logging.info("Should receive {} events and got {} events\n".format(count, len(lines))) + assert len(lines) == count + +def start_tool(file): + logging.info("Starting events_tool\n") + proc = subprocess.Popen(["./events_tool", "-r", "-o", file]) + return proc + +def run_test(process, file, count, duplicate): + # log messages to see if events have been received + tool_proc = start_tool(file) + + time.sleep(2) # buffer for events_tool to startup + logging.info("Generating logger messages\n") + for i in range(count): + line = "" + state = "up" + if duplicate: + line = "{} test message testmessage state up".format(process) + else: + if i % 2 != 1: + state = "down" + line = "{} test message testmessage{} state {}".format(process, i, state) + command = "logger -p local0.notice -t {}".format(line) + subprocess.run(command, shell=True, stdout=subprocess.PIPE) + + time.sleep(2) # some buffer for all events to be published to file + read_events_from_file(file, count) + tool_proc.terminate() + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("-p", "--process", nargs='?', const ='', default='', help="Process that is spitting out log") + parser.add_argument("-f", "--file", nargs='?', const='', default='', help="File used by events_tool to read events from") + parser.add_argument("-c", "--count", type=int, nargs='?', const=1000, default=1000, help="Count of times log message needs to be published down/up, default is 1000") + args = parser.parse_args() + if(args.process == '' or args.file == ''): + logging.error("Invalid process or logfile\n") + return + logging.info("Starting volume test\n") + logging.info("Generating {} unique messages for rsyslog plugin\n".format(args.count)) + run_test(args.process, args.file, args.count, False) + time.sleep(2) + logging.info("Restarting volume test but for duplicate log messages\n") + run_test(args.process, args.file, args.count, True) + +if __name__ == "__main__": + main() diff --git a/src/sonic-eventd/tools/sample_ip.json b/src/sonic-eventd/tools/sample_ip.json new file mode 100644 index 000000000000..acb8726cf253 --- /dev/null +++ b/src/sonic-eventd/tools/sample_ip.json @@ -0,0 +1 @@ +{"src_0:key-0": {"foo": "bar", "hello": "world" }} diff --git a/src/sonic-eventd/tools/subdir.mk b/src/sonic-eventd/tools/subdir.mk new file mode 100644 index 000000000000..5f13043dd612 --- /dev/null +++ b/src/sonic-eventd/tools/subdir.mk @@ -0,0 +1,12 @@ +CC := g++ + +TOOL_OBJS = ./tools/events_tool.o + +C_DEPS += ./tools/events_tool.d + +tools/%.o: tools/%.cpp + @echo 'Building file: $<' + @echo 'Invoking: GCC C++ Compiler' + $(CC) -D__FILENAME__="$(subst tools/,,$<)" $(CFLAGS) -c -fmessage-length=0 -MMD -MP -MF"$(@:%.o=%.d)" -MT"$(@)" -o "$@" "$<" + @echo 'Finished building: $<' + @echo ' '