Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest): set lastObserved in sdk when unset #11071

Merged
merged 3 commits into from
Aug 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions metadata-ingestion/src/datahub/cli/cli_utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
import logging
import time
import typing
from datetime import datetime
from typing import Any, Dict, List, Optional, Tuple, Type, Union
Expand Down Expand Up @@ -403,6 +404,8 @@ def ensure_has_system_metadata(
if event.systemMetadata is None:
event.systemMetadata = SystemMetadataClass()
metadata = event.systemMetadata
if metadata.lastObserved == 0:
metadata.lastObserved = int(time.time() * 1000)
darnaut marked this conversation as resolved.
Show resolved Hide resolved
if metadata.properties is None:
metadata.properties = {}
props = metadata.properties
Expand Down
12 changes: 8 additions & 4 deletions metadata-ingestion/tests/unit/test_rest_sink.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
import json
from datetime import datetime, timezone

import pytest
import requests
from freezegun import freeze_time

import datahub.metadata.schema_classes as models
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.rest_emitter import DatahubRestEmitter

MOCK_GMS_ENDPOINT = "http://fakegmshost:8080"

FROZEN_TIME = 1618987484580
basicAuditStamp = models.AuditStampClass(
time=1618987484580,
actor="urn:li:corpuser:datahub",
Expand Down Expand Up @@ -76,7 +79,7 @@
}
},
"systemMetadata": {
"lastObserved": 0,
"lastObserved": FROZEN_TIME,
"lastRunId": "no-run-id-provided",
"properties": {
"clientId": "acryl-datahub",
Expand Down Expand Up @@ -134,7 +137,7 @@
}
},
"systemMetadata": {
"lastObserved": 0,
"lastObserved": FROZEN_TIME,
"lastRunId": "no-run-id-provided",
"properties": {
"clientId": "acryl-datahub",
Expand Down Expand Up @@ -178,7 +181,7 @@
}
},
"systemMetadata": {
"lastObserved": 0,
"lastObserved": FROZEN_TIME,
"lastRunId": "no-run-id-provided",
"properties": {
"clientId": "acryl-datahub",
Expand Down Expand Up @@ -263,7 +266,7 @@
"contentType": "application/json",
},
"systemMetadata": {
"lastObserved": 0,
"lastObserved": FROZEN_TIME,
"lastRunId": "no-run-id-provided",
"properties": {
"clientId": "acryl-datahub",
Expand All @@ -276,6 +279,7 @@
),
],
)
@freeze_time(datetime.fromtimestamp(FROZEN_TIME / 1000, tz=timezone.utc))
def test_datahub_rest_emitter(requests_mock, record, path, snapshot):
def match_request_text(request: requests.Request) -> bool:
requested_snapshot = request.json()
Expand Down
Loading