Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove unused code and imports #632

Merged
merged 2 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions hivemind/averaging/key_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

GroupKey = str
GROUP_PATTERN = re.compile("^(([^.])+)[.]0b[01]*$") # e.g. bert_exp4_averaging.0b01001101
DEFAULT_NUM_BUCKETS = 256
logger = get_logger(__name__)


Expand Down Expand Up @@ -92,7 +91,7 @@ async def get_averagers(self, group_key: GroupKey, only_active: bool) -> List[Tu
logger.warning(f"Could not parse group key {key} ({looking_for_group}, exc={e})")
return averagers

async def update_key_on_group_assembled(self, group_info: GroupInfo, is_leader: bool = True):
async def update_key_on_group_assembled(self, group_info: GroupInfo):
"""this function is triggered every time an averager finds an allreduce group"""
rng = random.Random(group_info.group_id)
index = group_info.peer_ids.index(self.peer_id)
Expand Down
4 changes: 2 additions & 2 deletions hivemind/averaging/matchmaking.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@


class Matchmaking:
f"""
"""
An internal class that is used to form groups of averages for running allreduce
See DecentralizedAverager docstring for the detailed description of all parameters

Expand Down Expand Up @@ -383,7 +383,7 @@

logger.debug(f"{self.peer_id} - assembled group of {len(ordered_peer_ids)} peers")
group_info = GroupInfo(group_id, tuple(ordered_peer_ids), gathered)
await self.group_key_manager.update_key_on_group_assembled(group_info, is_leader=True)
await self.group_key_manager.update_key_on_group_assembled(group_info)
self.assembled_group.set_result(group_info)
return group_info

Expand Down Expand Up @@ -490,7 +490,7 @@
self.update_finished.clear()
continue
else:
raise asyncio.TimeoutError("pop_next_leader was invalidated: re-declared averager in background")

Check failure on line 493 in hivemind/averaging/matchmaking.py

View workflow job for this annotation

GitHub Actions / codespell

re-declared ==> redeclared

del self.leader_queue[maybe_next_leader]
self.past_attempts.add((maybe_next_leader, entry.expiration_time))
Expand Down
5 changes: 3 additions & 2 deletions hivemind/dht/routing.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from hivemind.utils import MSGPackSerializer, get_dht_time

DHTKey = Subkey = DHTValue = Any
BinaryDHTID = BinaryDHTValue = bytes
BinaryDHTValue = bytes


class RoutingTable:
Expand Down Expand Up @@ -251,7 +251,8 @@ def __repr__(self):
class DHTID(int):
HASH_FUNC = hashlib.sha1
HASH_NBYTES = 20 # SHA1 produces a 20-byte (aka 160bit) number
RANGE = MIN, MAX = 0, 2 ** (HASH_NBYTES * 8) # inclusive min, exclusive max
MIN = 0
MAX = 2 ** (HASH_NBYTES * 8)

def __new__(cls, value: int):
assert cls.MIN <= value < cls.MAX, f"DHTID must be in [{cls.MIN}, {cls.MAX}) but got {value}"
Expand Down
1 change: 0 additions & 1 deletion hivemind/dht/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ def validate(self, record: DHTRecord) -> bool:
return False
[field_name] = list(record.keys())

n_outside_schema = 0
validation_errors = []
for schema in self._schemas:
try:
Expand Down
2 changes: 1 addition & 1 deletion hivemind/moe/server/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from collections import namedtuple
from concurrent.futures import Future
from queue import Empty
from typing import Any, Dict, Generator, List, Tuple
from typing import Any, Dict, List, Tuple

import torch

Expand Down
2 changes: 1 addition & 1 deletion hivemind/optim/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from hivemind.optim.grad_scaler import GradScaler, HivemindGradScaler
from hivemind.optim.grad_scaler import GradScaler
from hivemind.optim.optimizer import Optimizer
from hivemind.optim.training_averager import TrainingAverager
6 changes: 0 additions & 6 deletions hivemind/optim/grad_scaler.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,3 @@ def _unscale_grads_(
def are_grads_finite(self, optimizer: TorchOptimizer, use_cached: bool = False) -> bool:
opt_dict = self._found_inf_per_device(optimizer) if use_cached else self._check_inf_per_device(optimizer)
return not sum(v.item() for v in opt_dict.values())


class HivemindGradScaler(GradScaler):
def __init__(self, *args, **kwargs):
logger.warning("HivemindGradScaler was renamed to hivemind.GradScaler, this reference will be removed in v1.1")
super().__init__(*args, **kwargs)
35 changes: 1 addition & 34 deletions hivemind/p2p/p2p_daemon_bindings/datastructures.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,16 @@
import base58
import multihash
from cryptography.hazmat.primitives import serialization
from multiaddr import Multiaddr, protocols
from multiaddr import Multiaddr

from hivemind.proto import crypto_pb2, p2pd_pb2


class PeerID:
def __init__(self, peer_id_bytes: bytes) -> None:
self._bytes = peer_id_bytes
self._xor_id = int(sha256_digest(self._bytes).hex(), 16)
self._b58_str = base58.b58encode(self._bytes).decode()

@property
def xor_id(self) -> int:
return self._xor_id

def to_bytes(self) -> bytes:
return self._bytes

Expand Down Expand Up @@ -137,31 +132,3 @@ def __str__(self):

def __repr__(self):
return f"PeerInfo(peer_id={repr(self.peer_id)}, addrs={repr(self.addrs)})"


class InvalidAddrError(ValueError):
pass


def info_from_p2p_addr(addr: Multiaddr) -> PeerInfo:
if addr is None:
raise InvalidAddrError("`addr` should not be `None`")

parts = addr.split()
if not parts:
raise InvalidAddrError(f"`parts`={parts} should at least have a protocol `P_P2P`")

p2p_part = parts[-1]
last_protocol_code = p2p_part.protocols()[0].code
if last_protocol_code != protocols.P_P2P:
raise InvalidAddrError(f"The last protocol should be `P_P2P` instead of `{last_protocol_code}`")

# make sure the /p2p value parses as a peer.ID
peer_id_str: str = p2p_part.value_for_protocol(protocols.P_P2P)
peer_id = PeerID.from_base58(peer_id_str)

# we might have received just an / p2p part, which means there's no addr.
if len(parts) > 1:
addr = Multiaddr.join(*parts[:-1])

return PeerInfo(peer_id, [addr])
1 change: 0 additions & 1 deletion hivemind/utils/asyncio.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import asyncio
import concurrent.futures
import multiprocessing as mp
import os
from concurrent.futures import ThreadPoolExecutor
Expand Down
Loading