Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stop writing to '/var/log/app_engine/' and write logs to Stackdriver logging API #3410

Merged
merged 6 commits into from
May 17, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion logging/google/cloud/logging/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ def get_default_handler(self):
"""
if (_APPENGINE_FLEXIBLE_ENV_VM in os.environ or
_APPENGINE_FLEXIBLE_ENV_FLEX in os.environ):
return AppEngineHandler()
return AppEngineHandler(self)
elif _CONTAINER_ENGINE_ENV in os.environ:
return ContainerEngineHandler()
else:
Expand Down
82 changes: 39 additions & 43 deletions logging/google/cloud/logging/handlers/app_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,60 +14,56 @@

"""Logging handler for App Engine Flexible

Logs to the well-known file that the fluentd sidecar container on App Engine
Flexible is configured to read from and send to Stackdriver Logging.

See the fluentd configuration here:

https://github.com/GoogleCloudPlatform/appengine-sidecars-docker/tree/master/fluentd_logger
Sends logs to the Stackdriver Logging API with the appropriate resource
and labels for App Engine logs.
"""

# This file is largely copied from:
# https://github.com/GoogleCloudPlatform/python-compat-runtime/blob/master
# /appengine-vmruntime/vmruntime/cloud_logging.py

import logging.handlers
import os

from google.cloud.logging.handlers._helpers import format_stackdriver_json
from google.cloud.logging.handlers.handlers import CloudLoggingHandler
from google.cloud.logging.handlers.transports import BackgroundThreadTransport
from google.cloud.logging.resource import Resource

_LOG_PATH_TEMPLATE = '/var/log/app_engine/app.{pid}.json'
_MAX_LOG_BYTES = 128 * 1024 * 1024
_LOG_FILE_COUNT = 3
_DEFAULT_GAE_LOGGER_NAME = 'app'

_GAE_PROJECT_ENV = 'GCLOUD_PROJECT'
_GAE_SERVICE_ENV = 'GAE_SERVICE'
_GAE_VERSION_ENV = 'GAE_VERSION'

class AppEngineHandler(logging.handlers.RotatingFileHandler):
"""A handler that writes to the App Engine fluentd Stackdriver log file.

Writes to the file that the fluentd agent on App Engine Flexible is
configured to discover logs and send them to Stackdriver Logging.
Log entries are wrapped in JSON and with appropriate metadata. The
process of converting the user's formatted logs into a JSON payload for
Stackdriver Logging consumption is implemented as part of the handler
itself, and not as a formatting step, so as not to interfere with
user-defined logging formats.
"""
class AppEngineHandler(CloudLoggingHandler):
"""A logging handler that sends App Engine-formatted logs to Stackdriver.

def __init__(self):
"""Construct the handler
:type client: :class:`~google.cloud.logging.client.Client`
:param client: The authenticated Google Cloud Logging client for this
handler to use.

Large log entries will get mangled if multiple workers write to the
same file simultaneously, so we'll use the worker's PID to pick a log
filename.
"""
self.filename = _LOG_PATH_TEMPLATE.format(pid=os.getpid())
super(AppEngineHandler, self).__init__(self.filename,
maxBytes=_MAX_LOG_BYTES,
backupCount=_LOG_FILE_COUNT)
:type transport: type
:param transport: The transport class. It should be a subclass
of :class:`.Transport`. If unspecified,
:class:`.BackgroundThreadTransport` will be used.
"""

def format(self, record):
"""Format the specified record into the expected JSON structure.
def __init__(self, client,
transport=BackgroundThreadTransport):
super(AppEngineHandler, self).__init__(
client,
name=_DEFAULT_GAE_LOGGER_NAME,
transport=transport,
resource=self.get_gae_resource())

:type record: :class:`~logging.LogRecord`
:param record: the log record
def get_gae_resource(self):
"""Return the GAE resource using the environment variables.

:rtype: str
:returns: JSON str to be written to the log file
:rtype: :class:`~google.cloud.logging.resource.Resource`
:returns: Monitored resource for GAE.
"""
message = super(AppEngineHandler, self).format(record)
return format_stackdriver_json(record, message)
gae_resource = Resource(
type='gae_app',
labels={
'project_id': os.environ.get(_GAE_PROJECT_ENV),
'module_id': os.environ.get(_GAE_SERVICE_ENV),
'version_id': os.environ.get(_GAE_VERSION_ENV),
},
)
return gae_resource
11 changes: 9 additions & 2 deletions logging/google/cloud/logging/handlers/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import logging

from google.cloud.logging.handlers.transports import BackgroundThreadTransport
from google.cloud.logging.logger import _GLOBAL_RESOURCE

DEFAULT_LOGGER_NAME = 'python'

Expand Down Expand Up @@ -52,6 +53,10 @@ class CloudLoggingHandler(logging.StreamHandler):
:class:`.BackgroundThreadTransport`. The other
option is :class:`.SyncTransport`.

:type resource: :class:`~google.cloud.logging.resource.Resource`
:param resource: (Optional) Monitored resource of the entry, defaults
to the global resource type.

Example:

.. code-block:: python
Expand All @@ -73,11 +78,13 @@ class CloudLoggingHandler(logging.StreamHandler):

def __init__(self, client,
name=DEFAULT_LOGGER_NAME,
transport=BackgroundThreadTransport):
transport=BackgroundThreadTransport,
resource=_GLOBAL_RESOURCE):
super(CloudLoggingHandler, self).__init__()
self.name = name
self.client = client
self.transport = transport(client, name)
self.resource = resource

def emit(self, record):
"""Actually log the specified logging record.
Expand All @@ -90,7 +97,7 @@ def emit(self, record):
:param record: The record to be logged.
"""
message = super(CloudLoggingHandler, self).format(record)
self.transport.send(record, message)
self.transport.send(record, message, resource=self.resource)


def setup_logging(handler, excluded_loggers=EXCLUDED_LOGGER_DEFAULTS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ def _main_thread_terminated(self):
else:
print('Failed to send %d pending logs.' % (self._queue.qsize(),))

def enqueue(self, record, message):
def enqueue(self, record, message, resource=None):
"""Queues a log entry to be written by the background thread.

:type record: :class:`logging.LogRecord`
Expand All @@ -212,13 +212,17 @@ def enqueue(self, record, message):
:type message: str
:param message: The message from the ``LogRecord`` after being
formatted by the associated log formatters.

:type resource: :class:`~google.cloud.logging.resource.Resource`
:param resource: (Optional) Monitored resource of the entry
"""
self._queue.put_nowait({
'info': {
'message': message,
'python_logger': record.name,
},
'severity': record.levelname,
'resource': resource,
})

def flush(self):
Expand Down Expand Up @@ -253,7 +257,7 @@ def __init__(self, client, name, grace_period=_DEFAULT_GRACE_PERIOD,
self.worker = _Worker(logger)
self.worker.start()

def send(self, record, message):
def send(self, record, message, resource=None):
"""Overrides Transport.send().

:type record: :class:`logging.LogRecord`
Expand All @@ -262,8 +266,11 @@ def send(self, record, message):
:type message: str
:param message: The message from the ``LogRecord`` after being
formatted by the associated log formatters.

:type resource: :class:`~google.cloud.logging.resource.Resource`
:param resource: (Optional) Monitored resource of the entry.
"""
self.worker.enqueue(record, message)
self.worker.enqueue(record, message, resource=resource)

def flush(self):
"""Submit any pending log records."""
Expand Down
5 changes: 4 additions & 1 deletion logging/google/cloud/logging/handlers/transports/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class Transport(object):
client and name object, and must override :meth:`send`.
"""

def send(self, record, message):
def send(self, record, message, resource=None):
"""Transport send to be implemented by subclasses.

:type record: :class:`logging.LogRecord`
Expand All @@ -31,6 +31,9 @@ def send(self, record, message):
:type message: str
:param message: The message from the ``LogRecord`` after being
formatted by the associated log formatters.

:type resource: :class:`~google.cloud.logging.resource.Resource`
:param resource: (Optional) Monitored resource of the entry.
"""
raise NotImplementedError

Expand Down
6 changes: 4 additions & 2 deletions logging/google/cloud/logging/handlers/transports/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class SyncTransport(Transport):
def __init__(self, client, name):
self.logger = client.logger(name)

def send(self, record, message):
def send(self, record, message, resource=None):
"""Overrides transport.send().

:type record: :class:`logging.LogRecord`
Expand All @@ -40,4 +40,6 @@ def send(self, record, message):
formatted by the associated log formatters.
"""
info = {'message': message, 'python_logger': record.name}
self.logger.log_struct(info, severity=record.levelname)
self.logger.log_struct(info,
severity=record.levelname,
resource=resource)
66 changes: 40 additions & 26 deletions logging/tests/unit/handlers/test_app_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import unittest


Expand All @@ -24,34 +25,47 @@ def _get_target_class(self):
return AppEngineHandler

def _make_one(self, *args, **kw):
import tempfile
return self._get_target_class()(*args, **kw)

from google.cloud._testing import _Monkey
from google.cloud.logging.handlers import app_engine as _MUT
def test_constructor(self):
import mock

This comment was marked as spam.

This comment was marked as spam.

from google.cloud.logging.handlers.app_engine import _GAE_PROJECT_ENV
from google.cloud.logging.handlers.app_engine import _GAE_SERVICE_ENV
from google.cloud.logging.handlers.app_engine import _GAE_VERSION_ENV

tmpdir = tempfile.mktemp()
with _Monkey(_MUT, _LOG_PATH_TEMPLATE=tmpdir):
return self._get_target_class()(*args, **kw)
client = mock.Mock(project=self.PROJECT, spec=['project'])
with mock.patch('os.environ', new={_GAE_PROJECT_ENV: 'test_project',
_GAE_SERVICE_ENV: 'test_service',
_GAE_VERSION_ENV: 'test_version'}):
handler = self._make_one(client, transport=_Transport)
self.assertIs(handler.client, client)
self.assertEqual(handler.resource.type, 'gae_app')
self.assertEqual(handler.resource.labels['project_id'], 'test_project')
self.assertEqual(handler.resource.labels['module_id'], 'test_service')
self.assertEqual(handler.resource.labels['version_id'], 'test_version')

def test_format(self):
import json
import logging
def test_emit(self):
import mock

handler = self._make_one()
logname = 'loggername'
client = mock.Mock(project=self.PROJECT, spec=['project'])
handler = self._make_one(client, transport=_Transport)
gae_resource = handler.get_gae_resource()
logname = 'app'
message = 'hello world'
record = logging.LogRecord(logname, logging.INFO, None,
None, message, None, None)
record.created = 5.03
expected_payload = {
'message': message,
'timestamp': {
'seconds': 5,
'nanos': int(.03 * 1e9),
},
'thread': record.thread,
'severity': record.levelname,
}
payload = handler.format(record)

self.assertEqual(payload, json.dumps(expected_payload))
record = logging.LogRecord(logname, logging, None, None, message,
None, None)
handler.emit(record)

self.assertIs(handler.transport.client, client)
self.assertEqual(handler.transport.name, logname)
self.assertEqual(handler.transport.send_called_with, (record, message, gae_resource))


class _Transport(object):

def __init__(self, client, name):
self.client = client
self.name = name

def send(self, record, message, resource):
self.send_called_with = (record, message, resource)
10 changes: 6 additions & 4 deletions logging/tests/unit/handlers/test_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,17 @@ def test_ctor(self):
self.assertEqual(handler.client, client)

def test_emit(self):
from google.cloud.logging.logger import _GLOBAL_RESOURCE

client = _Client(self.PROJECT)
handler = self._make_one(client, transport=_Transport)
handler = self._make_one(client, transport=_Transport, resource=_GLOBAL_RESOURCE)
logname = 'loggername'
message = 'hello world'
record = logging.LogRecord(logname, logging, None, None, message,
None, None)
handler.emit(record)

self.assertEqual(handler.transport.send_called_with, (record, message))
self.assertEqual(handler.transport.send_called_with, (record, message, _GLOBAL_RESOURCE))


class TestSetupLogging(unittest.TestCase):
Expand Down Expand Up @@ -108,5 +110,5 @@ class _Transport(object):
def __init__(self, client, name):
pass

def send(self, record, message):
self.send_called_with = (record, message)
def send(self, record, message, resource):
self.send_called_with = (record, message, resource)
16 changes: 12 additions & 4 deletions logging/tests/unit/handlers/transports/test_background_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,20 +47,23 @@ def test_constructor(self):
self.assertEqual(logger.name, name)

def test_send(self):
from google.cloud.logging.logger import _GLOBAL_RESOURCE

client = _Client(self.PROJECT)
name = 'python_logger'

transport, _ = self._make_one(client, name)

python_logger_name = 'mylogger'
message = 'hello world'

record = logging.LogRecord(
python_logger_name, logging.INFO,
None, None, message, None, None)

transport.send(record, message)
transport.send(record, message, _GLOBAL_RESOURCE)

transport.worker.enqueue.assert_called_once_with(record, message)
transport.worker.enqueue.assert_called_once_with(record, message, _GLOBAL_RESOURCE)

def test_flush(self):
client = _Client(self.PROJECT)
Expand Down Expand Up @@ -284,8 +287,13 @@ def __init__(self):
self.commit_called = False
self.commit_count = None

def log_struct(self, info, severity=logging.INFO):
self.log_struct_called_with = (info, severity)
def log_struct(self, info, severity=logging.INFO, resource=None):
from google.cloud.logging.logger import _GLOBAL_RESOURCE

if resource is None:
resource = _GLOBAL_RESOURCE

self.log_struct_called_with = (info, severity, resource)
self.entries.append(info)

def commit(self):
Expand Down
2 changes: 1 addition & 1 deletion logging/tests/unit/handlers/transports/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def _make_one(self, *args, **kw):
def test_send_is_abstract(self):
target = self._make_one()
with self.assertRaises(NotImplementedError):
target.send(None, None)
target.send(None, None, None)

def test_flush_is_abstract_and_optional(self):
target = self._make_one()
Expand Down
Loading