Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Additional logging and limit full file creation #18592

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions chia/_tests/core/data_layer/test_data_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -1369,6 +1369,7 @@ async def mock_http_download(
data_store=data_store,
store_id=store_id,
existing_generation=3,
target_generation=4,
root_hashes=[bytes32.random(seeded_random)],
server_info=sinfo,
client_foldername=tmp_path,
Expand All @@ -1392,6 +1393,7 @@ async def mock_http_download(
data_store=data_store,
store_id=store_id,
existing_generation=3,
target_generation=4,
root_hashes=[bytes32.random(seeded_random)],
server_info=sinfo,
client_foldername=tmp_path,
Expand Down Expand Up @@ -1908,6 +1910,7 @@ async def mock_http_download_2(
data_store=data_store,
store_id=store_id,
existing_generation=0,
target_generation=6,
root_hashes=root_hashes,
server_info=sinfo,
client_foldername=tmp_path_1,
Expand All @@ -1929,6 +1932,7 @@ async def mock_http_download_2(
data_store=data_store,
store_id=store_id,
existing_generation=0,
target_generation=6,
root_hashes=root_hashes,
server_info=sinfo,
client_foldername=tmp_path_1,
Expand Down Expand Up @@ -2032,6 +2036,7 @@ async def test_insert_from_delta_file_correct_file_exists(
data_store=data_store,
store_id=store_id,
existing_generation=0,
target_generation=6,
root_hashes=root_hashes,
server_info=sinfo,
client_foldername=tmp_path,
Expand Down Expand Up @@ -2094,6 +2099,7 @@ async def test_insert_from_delta_file_incorrect_file_exists(
data_store=data_store,
store_id=store_id,
existing_generation=1,
target_generation=6,
root_hashes=[incorrect_root_hash],
server_info=sinfo,
client_foldername=tmp_path,
Expand Down
51 changes: 43 additions & 8 deletions chia/data_layer/data_layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,7 @@ async def fetch_and_validate(self, store_id: bytes32) -> None:
servers_info = await self.data_store.get_available_servers_for_store(store_id, timestamp)
# TODO: maybe append a random object to the whole DataLayer class?
random.shuffle(servers_info)
success = False
for server_info in servers_info:
url = server_info.url

Expand Down Expand Up @@ -600,14 +601,16 @@ async def fetch_and_validate(self, store_id: bytes32) -> None:
self.data_store,
store_id,
root.generation,
[record.root for record in reversed(to_download)],
server_info,
self.server_files_location,
self.client_timeout,
self.log,
proxy_url,
await self.get_downloader(store_id, url),
self.group_files_by_store,
target_generation=singleton_record.generation,
root_hashes=[record.root for record in reversed(to_download)],
server_info=server_info,
client_foldername=self.server_files_location,
timeout=self.client_timeout,
log=self.log,
proxy_url=proxy_url,
downloader=await self.get_downloader(store_id, url),
group_files_by_store=self.group_files_by_store,
maximum_full_file_count=self.maximum_full_file_count,
)
if success:
self.log.info(
Expand All @@ -621,6 +624,29 @@ async def fetch_and_validate(self, store_id: bytes32) -> None:
except Exception as e:
self.log.warning(f"Exception while downloading files for {store_id}: {e} {traceback.format_exc()}.")

if not success:
root = await self.data_store.get_tree_root(store_id=store_id)
if root.node_hash is None:
return
filename_full_tree = get_full_tree_filename_path(
foldername=self.server_files_location,
store_id=store_id,
node_hash=root.node_hash,
generation=root.generation,
group_by_store=self.group_files_by_store,
)
# Had trouble with this generation, so generate full file for the generation we currently have
if not os.path.exists(filename_full_tree):
with open(filename_full_tree, "wb") as writer:
await self.data_store.write_tree_to_file(
root=root,
node_hash=root.node_hash,
store_id=store_id,
deltas_only=False,
writer=writer,
)
self.log.info(f"Successfully written full tree filename {filename_full_tree}.")

async def get_downloader(self, store_id: bytes32, url: str) -> Optional[PluginRemote]:
request_json = {"store_id": store_id.hex(), "url": url}
for d in self.downloaders:
Expand Down Expand Up @@ -879,11 +905,14 @@ async def get_kv_diff_paginated(

async def periodically_manage_data(self) -> None:
manage_data_interval = self.config.get("manage_data_interval", 60)
self.log.debug("DL: Starting manage data loop")
while not self._shut_down:
self.log.debug("DL: Sending all DL known subscriptions to wallet.")
async with self.subscription_lock:
try:
subscriptions = await self.data_store.get_subscriptions()
for subscription in subscriptions:
self.log.debug(f"DL: Asking wallet to track {subscription.store_id.hex()}")
await self.wallet_rpc.dl_track_new(subscription.store_id)
break
except aiohttp.client_exceptions.ClientConnectorError:
Expand All @@ -900,14 +929,17 @@ async def periodically_manage_data(self) -> None:
await asyncio.sleep(0.1)

while not self._shut_down:
self.log.debug("DL: Discover subscriptions and fetch data via worker pool.")
# Add existing subscriptions
async with self.subscription_lock:
subscriptions = await self.data_store.get_subscriptions()

self.log.debug(f"DL: Found {len(subscriptions)} subscriptions.")
# pseudo-subscribe to all unsubscribed owned stores
# Need this to make sure we process updates and generate DAT files
try:
owned_stores = await self.get_owned_stores()
self.log.debug(f"DL: Found {len(owned_stores)} owned stores.")
except ValueError:
# Sometimes the DL wallet isn't available, so we can't get the owned stores.
# We'll try again next time.
Expand Down Expand Up @@ -940,6 +972,7 @@ async def periodically_manage_data(self) -> None:
f"Can't subscribe to local store {local_id}: {type(e)} {e} {traceback.format_exc()}"
)

self.log.debug(f"DL: Managing data for {len(subscriptions)} subscriptions.")
work_queue: asyncio.Queue[Job[Subscription]] = asyncio.Queue()
async with QueuedAsyncPool.managed(
name="DataLayer subscription update pool",
Expand All @@ -959,6 +992,7 @@ async def periodically_manage_data(self) -> None:
for unsubscribe_data in self.unsubscribe_data_queue:
await self.process_unsubscribe(unsubscribe_data.store_id, unsubscribe_data.retain_data)
self.unsubscribe_data_queue.clear()
self.log.debug(f"DL: Finished managing data. Next run in {manage_data_interval}s.")
await asyncio.sleep(manage_data_interval)

async def update_subscription(
Expand All @@ -968,6 +1002,7 @@ async def update_subscription(
) -> None:
subscription = job.input

self.log.debug(f"DL: Updating subscription {subscription.store_id.hex()}")
try:
await self.update_subscriptions_from_wallet(subscription.store_id)
await self.fetch_and_validate(subscription.store_id)
Expand Down
37 changes: 24 additions & 13 deletions chia/data_layer/download_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ async def insert_into_data_store_from_file(
store_id: bytes32,
root_hash: Optional[bytes32],
filename: Path,
) -> None:
) -> int:
num_inserted = 0
with open(filename, "rb") as reader:
while True:
chunk = b""
Expand All @@ -119,8 +120,10 @@ async def insert_into_data_store_from_file(

node_type = NodeType.TERMINAL if serialized_node.is_terminal else NodeType.INTERNAL
await data_store.insert_node(node_type, serialized_node.value1, serialized_node.value2)
num_inserted += 1

await data_store.insert_root_with_ancestor_table(store_id=store_id, node_hash=root_hash, status=Status.COMMITTED)
return num_inserted


@dataclass
Expand Down Expand Up @@ -233,6 +236,7 @@ async def insert_from_delta_file(
data_store: DataStore,
store_id: bytes32,
existing_generation: int,
target_generation: int,
root_hashes: List[bytes32],
server_info: ServerInfo,
client_foldername: Path,
Expand All @@ -241,6 +245,7 @@ async def insert_from_delta_file(
proxy_url: str,
downloader: Optional[PluginRemote],
group_files_by_store: bool = False,
maximum_full_file_count: int = 1,
) -> bool:
if group_files_by_store:
client_foldername.joinpath(f"{store_id}").mkdir(parents=True, exist_ok=True)
Expand Down Expand Up @@ -283,21 +288,25 @@ async def insert_from_delta_file(
existing_generation,
group_files_by_store,
)
await insert_into_data_store_from_file(
num_inserted = await insert_into_data_store_from_file(
data_store,
store_id,
None if root_hash == bytes32([0] * 32) else root_hash,
target_filename_path,
)
log.info(
f"Successfully inserted hash {root_hash} from delta file. "
f"Generation: {existing_generation}. Store id: {store_id}."
f"Generation: {existing_generation}. Store id: {store_id}. Nodes inserted: {num_inserted}."
)

root = await data_store.get_tree_root(store_id=store_id)
with open(filename_full_tree, "wb") as writer:
await data_store.write_tree_to_file(root, root_hash, store_id, False, writer)
log.info(f"Successfully written full tree filename {filename_full_tree}.")
if target_generation - existing_generation <= maximum_full_file_count:
root = await data_store.get_tree_root(store_id=store_id)
with open(filename_full_tree, "wb") as writer:
await data_store.write_tree_to_file(root, root_hash, store_id, False, writer)
log.info(f"Successfully written full tree filename {filename_full_tree}.")
else:
log.info(f"Skipping full file generation for {existing_generation}")

await data_store.received_correct_file(store_id, server_info)
except Exception:
try:
Expand Down Expand Up @@ -367,14 +376,16 @@ async def http_download(
) as resp:
resp.raise_for_status()
size = int(resp.headers.get("content-length", 0))
log.debug(f"Downloading delta file {filename}. Size {size} bytes.")
log.info(f"Downloading delta file {filename}. Size {size} bytes.")
debug_enabled = log.isEnabledFor(logging.DEBUG)
progress_byte = 0
progress_percentage = f"{0:.0%}"
with target_filename_path.open(mode="wb") as f:
async for chunk, _ in resp.content.iter_chunks():
f.write(chunk)
progress_byte += len(chunk)
new_percentage = f"{progress_byte / size:.0%}"
if new_percentage != progress_percentage:
progress_percentage = new_percentage
log.info(f"Downloading delta file {filename}. {progress_percentage} of {size} bytes.")
if debug_enabled:
progress_byte += len(chunk)
new_percentage = f"{progress_byte / size:.0%}"
if new_percentage != progress_percentage:
progress_percentage = new_percentage
log.debug(f"Downloading delta file {filename}. {progress_percentage} of {size} bytes.")
Loading