diff --git a/config/examples/file-input-glob-patterns-es-output.yml b/config/examples/file-input-glob-patterns-es-output.yml index 528d86b7..99dbfec3 100644 --- a/config/examples/file-input-glob-patterns-es-output.yml +++ b/config/examples/file-input-glob-patterns-es-output.yml @@ -10,15 +10,16 @@ input: # ship everything EXCEPT for kube*.log, storage*.log, etcd*.log, and coredns*.log - /var/log/containers/!(kube*.log|storage*.log|etcd*.log|coredns*.log) -inputFilter: - - module: grep - config: - matchSource: !!js/regexp /.*log/ # match log files - include: !!js/regexp /failed|error|exception/i # include errors - exclude: !!js/regexp /super noisy error messages/i # exclude noise +# inputFilter: +# - module: grep +# config: +# matchSource: !!js/regexp /.*log/ # match log files +# include: !!js/regexp /failed|error|exception/i # include errors +# exclude: !!js/regexp /super noisy error messages/i # exclude noise +# - module: input-filter-k8s-containerd # Only needed when running in K8s env with Containerd/CRI-O container runtime output: elasticsearch: module: elasticsearch url: https://logsene-receiver.sematext.com - index: YOUR_LOGS_APP_TOKEN + index: LOGS_TOKEN diff --git a/kubernetes/ibm-cloud-logagent-ds.yml b/kubernetes/ibm-cloud-logagent-ds.yml index c6c9d502..78020c4a 100644 --- a/kubernetes/ibm-cloud-logagent-ds.yml +++ b/kubernetes/ibm-cloud-logagent-ds.yml @@ -22,6 +22,8 @@ spec: env: - name: LOG_GLOB value: "/var/log/containers/*.log;/var/log/*.log" + - name: LOGAGENT_ARGS + value: "--k8sContainerd" - name: LOGS_TOKEN value: "YOUR_SEMATEXT_LOGS_TOKEN" - name: REGION diff --git a/lib/plugins/input-filter/kubernetesContainerd.js b/lib/plugins/input-filter/kubernetesContainerd.js index 35aa2c73..941a393c 100644 --- a/lib/plugins/input-filter/kubernetesContainerd.js +++ b/lib/plugins/input-filter/kubernetesContainerd.js @@ -1,4 +1,12 @@ -var containerdSplitRegexp = /^(.+[stdout|stderr] [F|P]) / +const containerdSplitRegexp = /^(.+[stdout|stderr] [F|P]) / + +// Dictionary to store sources and log lines +/** + * Key: sourceName + * Value (Object): { streamFlag, previousStreamFlag, logLines } + */ +const sources = {} + /** * sourceName - origin of the log, e.g. file name * config - properties from the config section for this plugin @@ -6,24 +14,48 @@ var containerdSplitRegexp = /^(.+[stdout|stderr] [F|P]) / * callback - callback function (err, data). */ function parseK8sFileName (sourceName) { + /** + * SAMPLE sourceName * + * ***************** * + * sourceName: /var/log/containers/app-77b4d5595b-hmjxs_default_app-80209fd578c6be842b5b8a2d6389227ccab0196b7b658bd245eed4767c9b843c.log + * fileName: app-77b4d5595b-hmjxs_default_app-80209fd578c6be842b5b8a2d6389227ccab0196b7b658bd245eed4767c9b843c.log + * meta: { + * 0: app-77b4d5595b-hmjxs, // pod + * 1: default // namespace + * 2: app-80209fd578c6be842b5b8a2d6389227ccab0196b7b658bd245eed4767c9b843c.log // container with .log suffix + * } + */ + // cut path from /var/log/containers/__-.log // Reference: https://github.com/kubernetes/community/blob/master/contributors/design-proposals/node/kubelet-cri-logging.md - var index = sourceName.lastIndexOf('/') - var fileName = sourceName.substr(index + 1, sourceName.length) - var meta = fileName.split('_') - var info = {} + const index = sourceName.lastIndexOf('/') + const fileName = sourceName.substr(index + 1, sourceName.length) + const meta = fileName.split('_') + const info = {} if (meta.length === 3) { + const { 0: name, 1: namespace, 2: containerWithLogSuffix } = meta info.kubernetes = { - pod: { name: meta[0] }, - namespace: meta[1] + pod: { + name, + container: {} + }, + namespace + } + + const positionOfNameAndIdSeparator = containerWithLogSuffix.lastIndexOf('-') + const positionOfDotLog = containerWithLogSuffix.indexOf('.') + const containerId = containerWithLogSuffix.substring( + positionOfNameAndIdSeparator + 1, + positionOfDotLog + ) + const containerName = containerWithLogSuffix.substring(0, index) + + if (containerId) { + info.kubernetes.pod.container.id = containerId } - index = meta[2].lastIndexOf('-') - var endOfId = meta[2].indexOf('.') - var containerName = meta[2].substring(index + 1, endOfId) if (containerName) { - info.kubernetes.pod.container.name = meta[2].substring(0, index) - info.kubernetes.pod.container.id = containerName + info.kubernetes.pod.container.name = containerName } } return info @@ -31,36 +63,67 @@ function parseK8sFileName (sourceName) { module.exports = function (context, config, data, callback) { try { - var k8sInfo = null - var sections = data.split(containerdSplitRegexp) + const sections = data.split(containerdSplitRegexp) if (sections && sections.length === 3) { - k8sInfo = parseK8sFileName(context.sourceName) - var meta = sections[1].split(' ') + const k8sInfo = parseK8sFileName(context.sourceName) + const meta = sections[1].split(' ') + const logLine = sections[2] + if (meta.length === 3 && meta[0]) { k8sInfo['@timestamp'] = new Date(meta[0]) k8sInfo.streamName = meta[1] k8sInfo.streamFlag = meta[2] - // a special property in context object to propagate fields to - // the parsed object after parsing -> all logs will be enriched k8s metadata - context.enrichEvent = k8sInfo + + const sourceName = context.sourceName + if (sources[sourceName] === undefined) { + sources[sourceName] = {} + } + sources[sourceName].streamFlag = k8sInfo.streamFlag + + if (sources[sourceName].streamFlag === 'P') { + if (sources[sourceName].logLines === undefined) { + sources[sourceName].logLines = [] + } + + sources[sourceName].logLines.push(logLine) + sources[sourceName].previousStreamFlag = 'P' + + return callback(null, null) + } + + if ( + sources[sourceName].streamFlag === 'F' && + sources[sourceName].previousStreamFlag === 'P' + ) { + sources[sourceName].logLines.push(logLine) + const joinedLogLine = sources[sourceName].logLines.join(' ') + delete sources[sourceName] + + // a special property in context object to propagate fields to + // the parsed object after parsing -> all logs will be enriched k8s metadata + context.enrichEvent = k8sInfo + return callback(null, joinedLogLine) + } } - return callback(null, sections[2]) + + return callback(null, logLine) } return callback(null, data) } catch (err) { + console.error(err) return callback(null, data) } } // test function -if (require.main === module) { - module.exports( - { - sourceName: - '/var/log/containers/busybox2_default_busybox-5f03725b871fe3f2cbfdde7864100a12aed2708d759bea14f8d41656accba8f6.log' - }, - {}, - '2019-03-28T23:13:41.945317977Z stdout F Mar 28 23:13:41 kube-mil01-pa1934121294964badacb5ddd3753d504c-w1 local1.notice haproxy[8]: Proxy masteretcdfrontend started', - console.log - ) -} +// if (require.main === module) { +// module.exports( +// { +// sourceName: +// '/var/log/containers/busybox2_default_busybox-5f03725b871fe3f2cbfdde7864100a12aed2708d759bea14f8d41656accba8f6.log' +// }, +// {}, +// '2019-03-28T23:13:41.945317977Z stdout F Mar 28 23:13:41 kube-mil01-pa1934121294964badacb5ddd3753d504c-w1 local1.notice haproxy[8]: Proxy masteretcdfrontend started', +// console.log +// ) +// }