Skip to content

Commit

Permalink
tests: make neon_fixtures a bit thinner by splitting out some pageser…
Browse files Browse the repository at this point in the history
…ver related helpers
  • Loading branch information
LizardWizzard committed Apr 6, 2023
1 parent 25601ac commit 625e269
Show file tree
Hide file tree
Showing 18 changed files with 220 additions and 231 deletions.
166 changes: 1 addition & 165 deletions test_runner/fixtures/neon_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@

from fixtures.log_helper import log
from fixtures.pageserver.http import PageserverHttpClient
from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import (
ATTACHMENT_NAME_REGEX,
Expand Down Expand Up @@ -2851,151 +2852,6 @@ def check_restored_datadir_content(
assert (mismatch, error) == ([], [])


def wait_until(number_of_iterations: int, interval: float, func):
"""
Wait until 'func' returns successfully, without exception. Returns the
last return value from the function.
"""
last_exception = None
for i in range(number_of_iterations):
try:
res = func()
except Exception as e:
log.info("waiting for %s iteration %s failed", func, i + 1)
last_exception = e
time.sleep(interval)
continue
return res
raise Exception("timed out while waiting for %s" % func) from last_exception


def wait_while(number_of_iterations: int, interval: float, func):
"""
Wait until 'func' returns false, or throws an exception.
"""
for i in range(number_of_iterations):
try:
if not func():
return
log.info("waiting for %s iteration %s failed", func, i + 1)
time.sleep(interval)
continue
except Exception:
return
raise Exception("timed out while waiting for %s" % func)


def assert_tenant_status(
pageserver_http_client: PageserverHttpClient, tenant: TenantId, expected_status: str
):
tenant_status = pageserver_http_client.tenant_status(tenant)
log.info(f"tenant_status: {tenant_status}")
assert tenant_status["state"] == expected_status, tenant_status


def tenant_exists(ps_http: PageserverHttpClient, tenant_id: TenantId):
tenants = ps_http.tenant_list()
matching = [t for t in tenants if TenantId(t["id"]) == tenant_id]
assert len(matching) < 2
if len(matching) == 0:
return None
return matching[0]


def remote_consistent_lsn(
pageserver_http_client: PageserverHttpClient, tenant: TenantId, timeline: TimelineId
) -> Lsn:
detail = pageserver_http_client.timeline_detail(tenant, timeline)

if detail["remote_consistent_lsn"] is None:
# No remote information at all. This happens right after creating
# a timeline, before any part of it has been uploaded to remote
# storage yet.
return Lsn(0)
else:
lsn_str = detail["remote_consistent_lsn"]
assert isinstance(lsn_str, str)
return Lsn(lsn_str)


def wait_for_upload(
pageserver_http_client: PageserverHttpClient,
tenant: TenantId,
timeline: TimelineId,
lsn: Lsn,
):
"""waits for local timeline upload up to specified lsn"""
for i in range(20):
current_lsn = remote_consistent_lsn(pageserver_http_client, tenant, timeline)
if current_lsn >= lsn:
log.info("wait finished")
return
log.info(
"waiting for remote_consistent_lsn to reach {}, now {}, iteration {}".format(
lsn, current_lsn, i + 1
)
)
time.sleep(1)
raise Exception(
"timed out while waiting for remote_consistent_lsn to reach {}, was {}".format(
lsn, current_lsn
)
)


# Does not use `wait_until` for debugging purposes
def wait_until_tenant_state(
pageserver_http: PageserverHttpClient,
tenant_id: TenantId,
expected_state: str,
iterations: int,
) -> bool:
for _ in range(iterations):
try:
tenant = pageserver_http.tenant_status(tenant_id=tenant_id)
log.debug(f"Tenant {tenant_id} data: {tenant}")
if tenant["state"] == expected_state:
return True
except Exception as e:
log.debug(f"Tenant {tenant_id} state retrieval failure: {e}")

time.sleep(1)

raise Exception(f"Tenant {tenant_id} did not become {expected_state} in {iterations} seconds")


def last_record_lsn(
pageserver_http_client: PageserverHttpClient, tenant: TenantId, timeline: TimelineId
) -> Lsn:
detail = pageserver_http_client.timeline_detail(tenant, timeline)

lsn_str = detail["last_record_lsn"]
assert isinstance(lsn_str, str)
return Lsn(lsn_str)


def wait_for_last_record_lsn(
pageserver_http_client: PageserverHttpClient,
tenant: TenantId,
timeline: TimelineId,
lsn: Lsn,
) -> Lsn:
"""waits for pageserver to catch up to a certain lsn, returns the last observed lsn."""
for i in range(10):
current_lsn = last_record_lsn(pageserver_http_client, tenant, timeline)
if current_lsn >= lsn:
return current_lsn
log.info(
"waiting for last_record_lsn to reach {}, now {}, iteration {}".format(
lsn, current_lsn, i + 1
)
)
time.sleep(1)
raise Exception(
"timed out while waiting for last_record_lsn to reach {}, was {}".format(lsn, current_lsn)
)


def wait_for_last_flush_lsn(
env: NeonEnv, pg: Postgres, tenant: TenantId, timeline: TimelineId
) -> Lsn:
Expand Down Expand Up @@ -3057,23 +2913,3 @@ def wait_for_sk_commit_lsn_to_reach_remote_storage(
ps_http.timeline_checkpoint(tenant_id, timeline_id)
wait_for_upload(ps_http, tenant_id, timeline_id, lsn)
return lsn


def wait_for_upload_queue_empty(
pageserver: NeonPageserver, tenant_id: TenantId, timeline_id: TimelineId
):
ps_http = pageserver.http_client()
while True:
all_metrics = ps_http.get_metrics()
tl = all_metrics.query_all(
"pageserver_remote_timeline_client_calls_unfinished",
{
"tenant_id": str(tenant_id),
"timeline_id": str(timeline_id),
},
)
assert len(tl) > 0
log.info(f"upload queue for {tenant_id}/{timeline_id}: {tl}")
if all(m.value == 0 for m in tl):
return
time.sleep(0.2)
145 changes: 145 additions & 0 deletions test_runner/fixtures/pageserver/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
import time

from fixtures.log_helper import log
from fixtures.pageserver.http import PageserverHttpClient
from fixtures.types import Lsn, TenantId, TimelineId


def assert_tenant_status(
pageserver_http: PageserverHttpClient, tenant: TenantId, expected_status: str
):
tenant_status = pageserver_http.tenant_status(tenant)
log.info(f"tenant_status: {tenant_status}")
assert tenant_status["state"] == expected_status, tenant_status


def tenant_exists(pageserver_http: PageserverHttpClient, tenant_id: TenantId):
tenants = pageserver_http.tenant_list()
matching = [t for t in tenants if TenantId(t["id"]) == tenant_id]
assert len(matching) < 2
if len(matching) == 0:
return None
return matching[0]


def remote_consistent_lsn(
pageserver_http: PageserverHttpClient, tenant: TenantId, timeline: TimelineId
) -> Lsn:
detail = pageserver_http.timeline_detail(tenant, timeline)

if detail["remote_consistent_lsn"] is None:
# No remote information at all. This happens right after creating
# a timeline, before any part of it has been uploaded to remote
# storage yet.
return Lsn(0)
else:
lsn_str = detail["remote_consistent_lsn"]
assert isinstance(lsn_str, str)
return Lsn(lsn_str)


def wait_for_upload(
pageserver_http: PageserverHttpClient,
tenant: TenantId,
timeline: TimelineId,
lsn: Lsn,
):
"""waits for local timeline upload up to specified lsn"""
for i in range(20):
current_lsn = remote_consistent_lsn(pageserver_http, tenant, timeline)
if current_lsn >= lsn:
log.info("wait finished")
return
log.info(
"waiting for remote_consistent_lsn to reach {}, now {}, iteration {}".format(
lsn, current_lsn, i + 1
)
)
time.sleep(1)
raise Exception(
"timed out while waiting for remote_consistent_lsn to reach {}, was {}".format(
lsn, current_lsn
)
)


def wait_until_tenant_state(
pageserver_http: PageserverHttpClient,
tenant_id: TenantId,
expected_state: str,
iterations: int,
) -> bool:
"""
Does not use `wait_until` for debugging purposes
"""
for _ in range(iterations):
try:
tenant = pageserver_http.tenant_status(tenant_id=tenant_id)
log.debug(f"Tenant {tenant_id} data: {tenant}")
if tenant["state"] == expected_state:
return True
except Exception as e:
log.debug(f"Tenant {tenant_id} state retrieval failure: {e}")

time.sleep(1)

raise Exception(f"Tenant {tenant_id} did not become {expected_state} in {iterations} seconds")


def wait_until_tenant_active(
pageserver_http: PageserverHttpClient, tenant_id: TenantId, iterations: int = 30
):
wait_until_tenant_state(
pageserver_http, tenant_id, expected_state="Active", iterations=iterations
)


def last_record_lsn(
pageserver_http_client: PageserverHttpClient, tenant: TenantId, timeline: TimelineId
) -> Lsn:
detail = pageserver_http_client.timeline_detail(tenant, timeline)

lsn_str = detail["last_record_lsn"]
assert isinstance(lsn_str, str)
return Lsn(lsn_str)


def wait_for_last_record_lsn(
pageserver_http: PageserverHttpClient,
tenant: TenantId,
timeline: TimelineId,
lsn: Lsn,
) -> Lsn:
"""waits for pageserver to catch up to a certain lsn, returns the last observed lsn."""
for i in range(10):
current_lsn = last_record_lsn(pageserver_http, tenant, timeline)
if current_lsn >= lsn:
return current_lsn
log.info(
"waiting for last_record_lsn to reach {}, now {}, iteration {}".format(
lsn, current_lsn, i + 1
)
)
time.sleep(1)
raise Exception(
"timed out while waiting for last_record_lsn to reach {}, was {}".format(lsn, current_lsn)
)


def wait_for_upload_queue_empty(
pageserver_http: PageserverHttpClient, tenant_id: TenantId, timeline_id: TimelineId
):
while True:
all_metrics = pageserver_http.get_metrics()
tl = all_metrics.query_all(
"pageserver_remote_timeline_client_calls_unfinished",
{
"tenant_id": str(tenant_id),
"timeline_id": str(timeline_id),
},
)
assert len(tl) > 0
log.info(f"upload queue for {tenant_id}/{timeline_id}: {tl}")
if all(m.value == 0 for m in tl):
return
time.sleep(0.2)
16 changes: 16 additions & 0 deletions test_runner/fixtures/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,3 +278,19 @@ def wait_until(number_of_iterations: int, interval: float, func: Fn):
continue
return res
raise Exception("timed out while waiting for %s" % func) from last_exception


def wait_while(number_of_iterations: int, interval: float, func):
"""
Wait until 'func' returns false, or throws an exception.
"""
for i in range(number_of_iterations):
try:
if not func():
return
log.info("waiting for %s iteration %s failed", func, i + 1)
time.sleep(interval)
continue
except Exception:
return
raise Exception("timed out while waiting for %s" % func)
2 changes: 1 addition & 1 deletion test_runner/performance/test_branch_creation.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from fixtures.benchmark_fixture import MetricReport
from fixtures.compare_fixtures import NeonCompare
from fixtures.log_helper import log
from fixtures.neon_fixtures import wait_for_last_record_lsn
from fixtures.pageserver.utils import wait_for_last_record_lsn
from fixtures.types import Lsn


Expand Down
3 changes: 1 addition & 2 deletions test_runner/regress/test_compatibility.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,9 @@
NeonEnvBuilder,
PgBin,
PortDistributor,
wait_for_last_record_lsn,
wait_for_upload,
)
from fixtures.pageserver.http import PageserverHttpClient
from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload
from fixtures.types import Lsn
from pytest import FixtureRequest

Expand Down
Loading

0 comments on commit 625e269

Please sign in to comment.