-
Notifications
You must be signed in to change notification settings - Fork 313
New Container: Loki #733
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Labels
Comments
had to create it replicating java code from typing import Optional
from testcontainers.core.config import testcontainers_config
from testcontainers.core.container import DockerContainer
from testcontainers.core.waiting_utils import wait_for_logs
class LgtmStackContainer(DockerContainer):
"""
Grafana LGTM Stack container.
Exposed ports:
- Grafana: 3000
- Tempo: 3200
- OTel Http: 4317
- OTel Grpc: 4318
- Prometheus: 9090
- Loki: 3100
"""
DEFAULT_IMAGE_NAME = "grafana/otel-lgtm:0.11.1"
GRAFANA_PORT = 3000
OTLP_GRPC_PORT = 4317
OTLP_HTTP_PORT = 4318
LOKI_PORT = 3100
TEMPO_PORT = 3200
PROMETHEUS_PORT = 9090
def __init__(
self,
image: str = DEFAULT_IMAGE_NAME,
**kwargs,
) -> None:
testcontainers_config.ryuk_disabled = True
super().__init__(image, **kwargs)
self.with_exposed_ports(
self.GRAFANA_PORT,
self.TEMPO_PORT,
self.LOKI_PORT,
self.OTLP_GRPC_PORT,
self.OTLP_HTTP_PORT,
self.PROMETHEUS_PORT
)
self.url: Optional[str] = None
def get_otlp_grpc_url(self) -> str:
return f"http://{self.get_container_host_ip()}:{self.get_exposed_port(self.OTLP_GRPC_PORT)}"
def get_tempo_url(self) -> str:
return f"http://{self.get_container_host_ip()}:{self.get_exposed_port(self.TEMPO_PORT)}"
def get_loki_url(self) -> str:
return f"http://{self.get_container_host_ip()}:{self.get_exposed_port(self.LOKI_PORT)}"
def get_otlp_http_url(self) -> str:
return f"http://{self.get_container_host_ip()}:{self.get_exposed_port(self.OTLP_HTTP_PORT)}"
def get_prometheus_http_url(self) -> str:
return f"http://{self.get_container_host_ip()}:{self.get_exposed_port(self.PROMETHEUS_PORT)}"
def get_grafana_http_url(self) -> str:
return f"http://{self.get_container_host_ip()}:{self.get_exposed_port(self.GRAFANA_PORT)}"
def start(self, timeout: int = 60) -> "LgtmStackContainer": # Increased timeout
super().start()
wait_for_logs(
self,
r".*The OpenTelemetry collector and the Grafana LGTM stack are up and running.*", # Removed extra \s
timeout=timeout,
)
print(f"Grafana URL: {self.get_grafana_http_url()}")
print(f"Loki URL: {self.get_loki_url()}")
print(f"OTLP metrics URL: {self.get_otlp_http_url()}/v1/metrics")
return self |
Ah, so the Java version has LgtmStackContainer, ie Loki Grafana Tempo Mimir. That would make sense in Python then as well. |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
What is the new container you'd like to have?
I'd like to contribute a Loki container that I'm using for testing. It's configuration arguments are here.
Why not just use a generic container for this?
The wait method can be a little tedious especially if run in a cluster mode.
Other references:
The text was updated successfully, but these errors were encountered: