Skip to content

Commit

Permalink
fix flushing loggers (Lightning-AI#1459)
Browse files Browse the repository at this point in the history
* flushing loggers

* flushing loggers

* flushing loggers

* flushing loggers

* changelog

* typo

* fix trains

* optimize imports

* add logger test all

* add logger test pickle

* flake8

* fix benchmark

* hanging loggers

* try

* del

* all

* cleaning
  • Loading branch information
Borda authored and tullie committed May 6, 2020
1 parent 47f07d6 commit d6b0edf
Show file tree
Hide file tree
Showing 21 changed files with 209 additions and 334 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci-testing.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ jobs:
requires: 'minimal'

# Timeout: https://stackoverflow.com/a/59076067/4521646
timeout-minutes: 30
timeout-minutes: 15
steps:
- uses: actions/checkout@v2
- name: Set up Python ${{ matrix.python-version }}
Expand Down
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).

### Fixed

-
- Fixed loggers - flushing last logged metrics even before continue, e.g. `trainer.test()` results ([#1459](https://github.com/PyTorchLightning/pytorch-lightning/pull/1459))

-

Expand Down
26 changes: 15 additions & 11 deletions pytorch_lightning/loggers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def __init__(
`LightningLoggerBase.agg_and_log_metrics` method.
"""
self._rank = 0
self._prev_step = -1
self._prev_step: int = -1
self._metrics_to_agg: List[Dict[str, float]] = []
self._agg_key_funcs = agg_key_funcs if agg_key_funcs else {}
self._agg_default_func = agg_default_func
Expand Down Expand Up @@ -98,15 +98,15 @@ def _aggregate_metrics(
return step, None

# compute the metrics
agg_step, agg_mets = self._finalize_agg_metrics()
agg_step, agg_mets = self._reduce_agg_metrics()

# as new step received reset accumulator
self._metrics_to_agg = [metrics]
self._prev_step = step
return agg_step, agg_mets

def _finalize_agg_metrics(self):
"""Aggregate accumulated metrics. This shall be called in close."""
def _reduce_agg_metrics(self):
"""Aggregate accumulated metrics."""
# compute the metrics
if not self._metrics_to_agg:
agg_mets = None
Expand All @@ -116,6 +116,14 @@ def _finalize_agg_metrics(self):
agg_mets = merge_dicts(self._metrics_to_agg, self._agg_key_funcs, self._agg_default_func)
return self._prev_step, agg_mets

def _finalize_agg_metrics(self):
"""This shall be called before save/close."""
agg_step, metrics_to_log = self._reduce_agg_metrics()
self._metrics_to_agg = []

if metrics_to_log is not None:
self.log_metrics(metrics=metrics_to_log, step=agg_step)

def agg_and_log_metrics(self, metrics: Dict[str, float], step: Optional[int] = None):
"""Aggregates and records metrics.
This method doesn't log the passed metrics instantaneously, but instead
Expand Down Expand Up @@ -219,22 +227,19 @@ def log_hyperparams(self, params: argparse.Namespace):

def save(self) -> None:
"""Save log data."""
pass
self._finalize_agg_metrics()

def finalize(self, status: str) -> None:
"""Do any processing that is necessary to finalize an experiment.
Args:
status: Status that the experiment finished with (e.g. success, failed, aborted)
"""
pass
self.save()

def close(self) -> None:
"""Do any cleanup that is necessary to close an experiment."""
agg_step, metrics_to_log = self._finalize_agg_metrics()

if metrics_to_log is not None:
self.log_metrics(metrics=metrics_to_log, step=agg_step)
self.save()

@property
def rank(self) -> int:
Expand Down Expand Up @@ -292,7 +297,6 @@ def close(self) -> None:

@LightningLoggerBase.rank.setter
def rank(self, value: int) -> None:
self._rank = value
for logger in self._logger_iterable:
logger.rank = value

Expand Down
16 changes: 11 additions & 5 deletions pytorch_lightning/loggers/comet.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,15 @@ class CometLogger(LightningLoggerBase):
Log using `comet.ml <https://www.comet.ml>`_.
"""

def __init__(self, api_key: Optional[str] = None, save_dir: Optional[str] = None,
workspace: Optional[str] = None, project_name: Optional[str] = None,
rest_api_key: Optional[str] = None, experiment_name: Optional[str] = None,
experiment_key: Optional[str] = None, **kwargs):
def __init__(self,
api_key: Optional[str] = None,
save_dir: Optional[str] = None,
workspace: Optional[str] = None,
project_name: Optional[str] = None,
rest_api_key: Optional[str] = None,
experiment_name: Optional[str] = None,
experiment_key: Optional[str] = None,
**kwargs):
r"""
Requires either an API Key (online mode) or a local directory path (offline mode)
Expand Down Expand Up @@ -118,6 +123,7 @@ def __init__(self, api_key: Optional[str] = None, save_dir: Optional[str] = None
self.name = experiment_name
except TypeError as e:
log.exception("Failed to set experiment name for comet.ml logger")
self._kwargs = kwargs

@property
def experiment(self) -> CometBaseExperiment:
Expand Down Expand Up @@ -197,7 +203,7 @@ def finalize(self, status: str) -> None:

@property
def name(self) -> str:
return self.experiment.project_name
return str(self.experiment.project_name)

@name.setter
def name(self, value: str) -> None:
Expand Down
18 changes: 11 additions & 7 deletions pytorch_lightning/loggers/mlflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ def any_lightning_module_function_or_hook(...):
self.logger.experiment.whatever_ml_flow_supports(...)
"""
import os
from argparse import Namespace
from time import time
from typing import Optional, Dict, Any, Union
Expand All @@ -39,10 +40,14 @@ def any_lightning_module_function_or_hook(...):


class MLFlowLogger(LightningLoggerBase):
def __init__(self, experiment_name: str, tracking_uri: Optional[str] = None,
tags: Dict[str, Any] = None):
r"""
"""MLFLow logger"""

def __init__(self,
experiment_name: str = 'default',
tracking_uri: Optional[str] = None,
tags: Optional[Dict[str, Any]] = None,
save_dir: Optional[str] = None):
r"""
Logs using MLFlow
Args:
Expand All @@ -51,6 +56,8 @@ def __init__(self, experiment_name: str, tracking_uri: Optional[str] = None,
tags (dict): todo this param
"""
super().__init__()
if not tracking_uri and save_dir:
tracking_uri = f'file:{os.sep * 2}{save_dir}'
self._mlflow_client = MlflowClient(tracking_uri)
self.experiment_name = experiment_name
self._run_id = None
Expand All @@ -59,7 +66,6 @@ def __init__(self, experiment_name: str, tracking_uri: Optional[str] = None,
@property
def experiment(self) -> MlflowClient:
r"""
Actual mlflow object. To use mlflow features do the following.
Example::
Expand Down Expand Up @@ -102,11 +108,9 @@ def log_metrics(self, metrics: Dict[str, float], step: Optional[int] = None) ->
continue
self.experiment.log_metric(self.run_id, k, v, timestamp_ms, step)

def save(self):
pass

@rank_zero_only
def finalize(self, status: str = 'FINISHED') -> None:
super().finalize(status)
if status == 'success':
status = 'FINISHED'
self.experiment.set_terminated(self.run_id, status)
Expand Down
20 changes: 13 additions & 7 deletions pytorch_lightning/loggers/neptune.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,18 @@ class NeptuneLogger(LightningLoggerBase):
To log experiment data in online mode, NeptuneLogger requries an API key:
"""

def __init__(self, api_key: Optional[str] = None, project_name: Optional[str] = None,
close_after_fit: Optional[bool] = True, offline_mode: bool = False,
def __init__(self,
api_key: Optional[str] = None,
project_name: Optional[str] = None,
close_after_fit: Optional[bool] = True,
offline_mode: bool = True,
experiment_name: Optional[str] = None,
upload_source_files: Optional[List[str]] = None, params: Optional[Dict[str, Any]] = None,
properties: Optional[Dict[str, Any]] = None, tags: Optional[List[str]] = None, **kwargs):
upload_source_files: Optional[List[str]] = None,
params: Optional[Dict[str, Any]] = None,
properties: Optional[Dict[str, Any]] = None,
tags: Optional[List[str]] = None,
**kwargs):
r"""
Initialize a neptune.ai logger.
.. note:: Requires either an API Key (online mode) or a local directory path (offline mode)
Expand Down Expand Up @@ -135,8 +140,8 @@ def any_lightning_module_function_or_hook(...):
"namespace/project_name" for example "tom/minst-classification".
If None, the value of NEPTUNE_PROJECT environment variable will be taken.
You need to create the project in https://neptune.ai first.
offline_mode: Optional default False. If offline_mode=True no logs will be send
to neptune. Usually used for debug purposes.
offline_mode: Optional default True. If offline_mode=True no logs will be send
to neptune. Usually used for debug and test purposes.
close_after_fit: Optional default True. If close_after_fit=False the experiment
will not be closed after training and additional metrics,
images or artifacts can be logged. Also, remember to close the experiment explicitly
Expand Down Expand Up @@ -243,6 +248,7 @@ def log_metrics(

@rank_zero_only
def finalize(self, status: str) -> None:
super().finalize(status)
if self.close_after_fit:
self.experiment.stop()

Expand Down
15 changes: 8 additions & 7 deletions pytorch_lightning/loggers/tensorboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

class TensorBoardLogger(LightningLoggerBase):
r"""
Log to local file system in TensorBoard format
Implemented using :class:`torch.utils.tensorboard.SummaryWriter`. Logs are saved to
Expand All @@ -40,18 +39,19 @@ class TensorBoardLogger(LightningLoggerBase):
"""
NAME_CSV_TAGS = 'meta_tags.csv'

def __init__(
self, save_dir: str, name: Optional[str] = "default",
version: Optional[Union[int, str]] = None, **kwargs
):
def __init__(self,
save_dir: str,
name: Optional[str] = "default",
version: Optional[Union[int, str]] = None,
**kwargs):
super().__init__()
self.save_dir = save_dir
self._name = name
self._version = version

self._experiment = None
self.tags = {}
self.kwargs = kwargs
self._kwargs = kwargs

@property
def root_dir(self) -> str:
Expand Down Expand Up @@ -92,7 +92,7 @@ def experiment(self) -> SummaryWriter:
return self._experiment

os.makedirs(self.root_dir, exist_ok=True)
self._experiment = SummaryWriter(log_dir=self.log_dir, **self.kwargs)
self._experiment = SummaryWriter(log_dir=self.log_dir, **self._kwargs)
return self._experiment

@rank_zero_only
Expand Down Expand Up @@ -127,6 +127,7 @@ def log_metrics(self, metrics: Dict[str, float], step: Optional[int] = None) ->

@rank_zero_only
def save(self) -> None:
super().save()
try:
self.experiment.flush()
except AttributeError:
Expand Down
14 changes: 10 additions & 4 deletions pytorch_lightning/loggers/test_tube.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@ class TestTubeLogger(LightningLoggerBase):

__test__ = False

def __init__(
self, save_dir: str, name: str = "default", description: Optional[str] = None,
debug: bool = False, version: Optional[int] = None, create_git_tag: bool = False
):
def __init__(self,
save_dir: str,
name: str = "default",
description: Optional[str] = None,
debug: bool = False,
version: Optional[int] = None,
create_git_tag: bool = False):
r"""
Example
Expand Down Expand Up @@ -105,19 +108,22 @@ def log_metrics(self, metrics: Dict[str, float], step: Optional[int] = None) ->

@rank_zero_only
def save(self) -> None:
super().save()
# TODO: HACK figure out where this is being set to true
self.experiment.debug = self.debug
self.experiment.save()

@rank_zero_only
def finalize(self, status: str) -> None:
super().finalize(status)
# TODO: HACK figure out where this is being set to true
self.experiment.debug = self.debug
self.save()
self.close()

@rank_zero_only
def close(self) -> None:
super().save()
# TODO: HACK figure out where this is being set to true
self.experiment.debug = self.debug
if not self.debug:
Expand Down
4 changes: 1 addition & 3 deletions pytorch_lightning/loggers/trains.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,11 +295,9 @@ def log_artifact(
delete_after_upload=delete_after_upload
)

def save(self) -> None:
pass

@rank_zero_only
def finalize(self, status: str = None) -> None:
# super().finalize(status)
if self.bypass_mode() or not self._trains:
return

Expand Down
17 changes: 12 additions & 5 deletions pytorch_lightning/loggers/wandb.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,18 @@ class WandbLogger(LightningLoggerBase):
trainer = Trainer(logger=wandb_logger)
"""

def __init__(self, name: Optional[str] = None, save_dir: Optional[str] = None,
offline: bool = False, id: Optional[str] = None, anonymous: bool = False,
version: Optional[str] = None, project: Optional[str] = None,
tags: Optional[List[str]] = None, log_model: bool = False,
experiment=None, entity=None):
def __init__(self,
name: Optional[str] = None,
save_dir: Optional[str] = None,
offline: bool = False,
id: Optional[str] = None,
anonymous: bool = False,
version: Optional[str] = None,
project: Optional[str] = None,
tags: Optional[List[str]] = None,
log_model: bool = False,
experiment=None,
entity=None):
super().__init__()
self._name = name
self._save_dir = save_dir
Expand Down
5 changes: 2 additions & 3 deletions pytorch_lightning/trainer/evaluation_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,14 +370,13 @@ def run_evaluation(self, test_mode: bool = False):

# run evaluation
eval_results = self._evaluate(self.model, dataloaders, max_batches, test_mode)
_, prog_bar_metrics, log_metrics, callback_metrics, _ = self.process_output(
eval_results)
_, prog_bar_metrics, log_metrics, callback_metrics, _ = self.process_output(eval_results)

# add metrics to prog bar
self.add_tqdm_metrics(prog_bar_metrics)

# log results of test
if test_mode and self.proc_rank == 0 and len(callback_metrics) > 0:
if test_mode and self.proc_rank == 0:
print('-' * 80)
print('TEST RESULTS')
pprint(callback_metrics)
Expand Down
3 changes: 1 addition & 2 deletions pytorch_lightning/trainer/trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,8 +293,7 @@ def __init__(

# benchmarking
self.benchmark = benchmark
if benchmark:
torch.backends.cudnn.benchmark = True
torch.backends.cudnn.benchmark = self.benchmark

# Transfer params
self.num_nodes = num_nodes
Expand Down
Loading

0 comments on commit d6b0edf

Please sign in to comment.