Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor fixes on key parsing, typing and additional tests #76

Merged
merged 7 commits into from
Jan 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 27 additions & 25 deletions covert/pubkey.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import struct
from base64 import b64decode
from contextlib import suppress
from typing import Optional
from urllib.parse import quote
from urllib.request import urlopen

Expand All @@ -11,12 +12,6 @@
from covert.elliptic import egcreate, egreveal


def derive_symkey(nonce, local, remote):
assert local.sk, f"Missing secret key for {local=}"
shared = sodium.crypto_scalarmult(local.sk, remote.pk)
return sodium.crypto_hash_sha512(bytes(nonce) + shared)[:32]


class Key:

def __init__(self, *, keystr="", comment="", sk=None, pk=None, edsk=None, edpk=None, pkhash=None):
Expand Down Expand Up @@ -106,7 +101,13 @@ def _validate(self):
sodium.crypto_box_open(ciphertext, nonce, self.pk, self.sk)


def read_pk_file(keystr):
def derive_symkey(nonce, local: Key, remote: Key) -> bytes:
assert local.sk, f"Missing secret key for {local=}"
shared = sodium.crypto_scalarmult(local.sk, remote.pk)
return sodium.crypto_hash_sha512(bytes(nonce) + shared)[:32]


def read_pk_file(keystr: str) -> list[Key]:
ghuser = None
if keystr.startswith("github:"):
ghuser = keystr[7:]
Expand Down Expand Up @@ -135,13 +136,13 @@ def read_pk_file(keystr):
return keys


def read_sk_any(keystr):
def read_sk_any(keystr: str) -> list[Key]:
with suppress(ValueError):
return decode_sk(keystr)
return [decode_sk(keystr)]
return read_sk_file(keystr)


def read_sk_file(keystr):
def read_sk_file(keystr: str) -> list[Key]:
if not os.path.isfile(keystr):
raise ValueError(f"Secret key file {keystr} not found")
with open(keystr, "rb") as f:
Expand All @@ -163,16 +164,16 @@ def read_sk_file(keystr):
return keys


def decode_pk(keystr):
def decode_pk(keystr: str) -> Key:
# Age keys use Bech32 encoding
if keystr.startswith("age1"):
return decode_age_pk(keystr)
# Try Base64 encoded formats
try:
token, comment = keystr, ''
if keystr.startswith('ssh-ed25519 '):
t, token, *comment = keystr.split(' ', 2)
comment = comment[0] if comment else 'ssh'
t, token, *cmt = keystr.split(' ', 2)
comment = cmt[0] if cmt else 'ssh'
keybytes = b64decode(token, validate=True)
ssh = keybytes.startswith(b"\x00\x00\x00\x0bssh-ed25519\x00\x00\x00 ")
minisign = len(keybytes) == 42 and keybytes.startswith(b'Ed')
Expand All @@ -188,7 +189,7 @@ def decode_pk(keystr):
raise ValueError(f"Unrecognized key {keystr}")


def decode_sk(keystr):
def decode_sk(keystr: str) -> Key:
# Age secret keys in Bech32 encoding
if keystr.lower().startswith("age-secret-key-"):
return decode_age_sk(keystr)
Expand All @@ -198,48 +199,49 @@ def decode_sk(keystr):
# Plain Curve25519 key (WireGuard)
try:
keybytes = b64decode(keystr, validate=True)
if len(keybytes) == 32:
return Key(sk=keybytes)
# Must be a clamped scalar
if len(keybytes) == 32 and keybytes[0] & 8 == 0 and keybytes[31] & 0xC0 == 0x40:
return Key(keystr=keystr, sk=keybytes, comment="wg")
except ValueError:
pass
raise ValueError(f"Unable to parse private key {keystr!r}")
raise ValueError(f"Unable to parse secret key {keystr!r}")


def decode_sk_minisign(keystr, pw=None):
def decode_sk_minisign(keystr: str, pw: Optional[bytes] = None) -> Key:
# None means try without password, then ask
if pw is None:
try:
return decode_sk_minisign(keystr, b'')
except ValueError:
pass
pw = util.encode(passphrase.ask('Minisign passkey')[0])
pw = passphrase.ask('Minisign passkey')[0]
return decode_sk_minisign(keystr, pw)
data = b64decode(keystr)
fmt, salt, ops, mem, token = struct.unpack('<6s32sQQ104s', data)
if fmt != b'EdScB2' or ops != 1 << 25 or mem != 1 << 30:
raise ValueError(f'Not a (supported) Minisign secret key {fmt=}')
out = sodium.crypto_pwhash_scryptsalsa208sha256_ll(pw, salt, n=1 << 20, r=8, p=1, maxmem=float('inf'), dklen=104)
out = sodium.crypto_pwhash_scryptsalsa208sha256_ll(pw, salt, n=1 << 20, r=8, p=1, maxmem=1 << 31, dklen=104)
token = util.xor(out, token)
keyid, edsk, edpk, csum = struct.unpack('8s32s32s32s', token)
b2state = sodium.crypto_generichash_blake2b_init()
sodium.crypto_generichash.generichash_blake2b_update(b2state, fmt[:2] + keyid + edsk + edpk)
csum2 = sodium.crypto_generichash.generichash_blake2b_final(b2state)
if csum != csum2:
raise ValueError('Unable to decrypt Minisign secret key')
return Key(edsk=edsk, edpk=edpk)
return Key(edsk=edsk, edpk=edpk, comment="ms")


def decode_age_pk(keystr):
def decode_age_pk(keystr: str) -> Key:
return Key(keystr=keystr, comment="age", pk=bech.decode("age", keystr.lower()))


def encode_age_pk(key):
def encode_age_pk(key: Key) -> str:
return bech.encode("age", key.pk)


def decode_age_sk(keystr):
def decode_age_sk(keystr: str) -> Key:
return Key(keystr=keystr, comment="age", sk=bech.decode("age-secret-key-", keystr.lower()))


def encode_age_sk(key):
def encode_age_sk(key: Key) -> str:
return bech.encode("age-secret-key-", key.sk).upper()
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
"bcrypt>=3.0.0",
"colorama>=0.4",
"cryptography>=35",
"pynacl>=1.4",
"pynacl>=1.5",
"tqdm>=4.62",
"msgpack>=1.0",
"pyperclip>=1.8",
Expand Down
2 changes: 2 additions & 0 deletions tests/keys/minisign_password.key
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
untrusted comment: minisign encrypted secret key
RWRTY0IyhY/6g2XLCLS4D+DoqjxeOJDUR9gv+ilrmp3/B4KpIaIAAAACAAAAAAAAAEAAAAAAF0HHgdqvvXFEO3QEYm11JrGdnfjZFAWeO38bLT98FUoDxGdcgvCdeCmmj8ZiHCHeTpfablIfRrEWEvjho5yyTciuN/mr6j0YAKfd3Ew9SkUDRY/t8qvvQz1bxKHdYwZRk1RChapZ32U=
2 changes: 2 additions & 0 deletions tests/keys/minisign_password.pub
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
untrusted comment: minisign public key 8A1B111AFC8CA1A
RWQaysivEbGhCD/XmdIesSX8kAROXUUSTp5M4ochKA+Ia0Iou0KgWyhr
32 changes: 32 additions & 0 deletions tests/test_pubkey.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
AGE_SK = "AGE-SECRET-KEY-1GFPYYSJZGFPYYSJZGFPYYSJZGFPYYSJZGFPYYSJZGFPYYSJZGFPQ4EGAEX"
AGE_SK_BYTES = 32 * b"\x42"

# Generated with wg genkey and wg pubkey
WG_SK = "kLkIpWh5MYKwUA7JdQHnmbc6dEiW0py4VRvqmYyPLHc="
WG_PK = "ElMfFd2qVIROK4mRaXJouYWC2lxxMApMSe9KyAZcEBc="


def test_age_key_decoding():
pk = pubkey.decode_pk(AGE_PK)
Expand All @@ -36,6 +40,25 @@ def test_age_key_decoding_and_encoding():
assert pubkey.encode_age_sk(sk) == AGE_SK


def test_wireguard_keystr():
pk = pubkey.decode_pk(WG_PK)
sk = pubkey.decode_sk(WG_SK)
# Key comparison is by public keys
assert pk == sk
assert pk.keystr == WG_PK
assert sk.keystr == WG_SK
assert pk.comment == 'wg'
assert sk.comment == 'wg'
assert repr(pk).endswith(':PK]')
assert repr(sk).endswith(':SK]')

# Trying to decode a public key as secret key should usually fail
# (works with the test key but no guarantees with others)
with pytest.raises(ValueError) as exc:
pubkey.decode_sk(WG_PK)
assert "Unable to parse secret key" in str(exc.value)


def test_ssh_key_decoding():
pk, = pubkey.read_pk_file("tests/keys/ssh_ed25519.pub")
sk, = pubkey.read_sk_file("tests/keys/ssh_ed25519")
Expand All @@ -56,6 +79,15 @@ def test_ssh_wrong_password(mocker):
sk, = pubkey.read_sk_file("tests/keys/ssh_ed25519_password")


def test_minisign_keyfiles(mocker):
mocker.patch('covert.passphrase.ask', return_value=(b"password", True))
sk, = pubkey.read_sk_file("tests/keys/minisign_password.key")
pk, = pubkey.read_pk_file("tests/keys/minisign_password.pub")
assert sk.comment == 'ms'
assert pk.comment == 'ms'
assert sk == pk


def test_key_exchange():
# Alice sends a message to Bob
nonce = token_bytes(12)
Expand Down