Skip to content

feature: kavach shared key implemented and tested #31

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ idna==3.4
requests==2.31.0
urllib3==2.0.2
web3
eth-accounts
eth-accounts
py_ecc=8.0.0
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ charset-normalizer==3.1.0
idna==3.4
requests==2.31.0
urllib3==2.0.2
eth-account==0.13.7
eth-account==0.13.7
py_ecc=8.0.0
22 changes: 22 additions & 0 deletions src/lighthouseweb3/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

import os
import io
from typing import Any, Dict

from .functions.encryption import shared_key as sharedKey
from .functions import (
upload as d,
deal_status,
Expand Down Expand Up @@ -224,3 +227,22 @@ def getTagged(self, tag: str):
except Exception as e:
raise e


class Kavach:
@staticmethod
def sharedKey(key: str, threshold: int = 3, key_count: int = 5) -> Dict[str, Any]:
"""
Splits a secret key into shards using Shamir's Secret Sharing on BLS12-381 curve.

:param key: Hex string of the master secret key
:param threshold: Minimum number of shards required to reconstruct the key
:param key_count: Total number of shards to generate

:return: Dict containing isShardable flag and list of key shards with their indices
"""

try:
return sharedKey.shard_key(key, threshold, key_count)
except Exception as e:
raise e

50 changes: 50 additions & 0 deletions src/lighthouseweb3/functions/encryption/shared_key.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
from py_ecc.bls import G2ProofOfPossession as bls
from py_ecc.optimized_bls12_381.optimized_curve import curve_order
from secrets import randbits
from typing import List, Tuple, Dict, Any

def shard_key(key: str, threshold: int = 3, key_count: int = 5) -> Dict[str, Any]:

try:
# Initialize master secret key list
msk = []
id_vec = []
sec_vec = []

# Convert input hex key to integer
master_key = int(key, 16)
msk.append(master_key)

# Generate additional random coefficients for polynomial
for _ in range(threshold - 1):
# Generate random number for polynomial coefficient
sk = randbits(256) # Using 256 bits for randomness
msk.append(sk)

# Perform key sharing
for i in range(key_count):
# Create random ID (x-coordinate for polynomial evaluation)
id_val = randbits(256)
id_vec.append(id_val)

# Evaluate polynomial at id_val to create shard
# Using Shamir's secret sharing polynomial evaluation
sk = 0
for j, coef in enumerate(msk):
sk += coef * pow(id_val, j, curve_order)
sk %= curve_order
sec_vec.append(sk)

# Convert to hex format for output
return {
"isShardable": True,
"keyShards": [
{
"key": hex(sk)[2:].zfill(64), # Remove '0x' and pad to 64 chars
"index": hex(id_vec[i])[2:].zfill(64)
}
for i, sk in enumerate(sec_vec)
]
}
except Exception as e:
return {"isShardable": False, "keyShards": []}
96 changes: 96 additions & 0 deletions tests/test_encryption/test_encryption_shared_key.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
#!/usr/bin/env python3
import unittest
from src.lighthouseweb3 import Kavach
from py_ecc.bls import G2ProofOfPossession as bls
from typing import List, Dict
from py_ecc.optimized_bls12_381.optimized_curve import curve_order

def recover_key(shards: List[Dict[str, str]]) -> Dict[str, str]:
"""
Recovers the master key from a list of key shards using Lagrange interpolation.

:param shards: List of dictionaries containing 'key' and 'index' as hex strings
:return: Dictionary containing the recovered master key
"""
try:
# Convert hex strings to integers
x_coords = [int(shard['index'], 16) for shard in shards]
y_coords = [int(shard['key'], 16) for shard in shards]

# Lagrange interpolation to recover the constant term (secret)
def lagrange_interpolate(x: int, x_s: List[int], y_s: List[int], p: int) -> int:
total = 0
for i in range(len(x_s)):
numerator = denominator = 1
for j in range(len(x_s)):
if i != j:
numerator = (numerator * (x - x_s[j])) % p
denominator = (denominator * (x_s[i] - x_s[j])) % p
l = (y_s[i] * numerator * pow(denominator, -1, p)) % p
total = (total + l) % p
return total

# Recover the master key at x=0
master_key = lagrange_interpolate(0, x_coords, y_coords, curve_order)

return {"masterKey": hex(master_key)[2:].zfill(64)}
except Exception as e:
print(e)
return {"masterKey": ""}

class TestKavachSharedKey(unittest.TestCase):

def test_key_shardable_false(self):
"""Test if key is not shardable with invalid input"""
result = Kavach.sharedKey("invalid_hex")
self.assertFalse(result["isShardable"], "Key should not be shardable")

def test_key_shardable_true(self):
"""Test if key is shardable with valid input and correct number of shards"""
result = Kavach.sharedKey("0b2ccf87909bd1215858e5c0ec99359cadfcf918a4fac53e3e88e9ec28bec678")
self.assertTrue(result["isShardable"], "Key should be shardable")
self.assertEqual(len(result["keyShards"]), 5, "Should generate 5 shards")

def test_master_key_recovery_4_of_5(self):
"""Test master key recovery with 4 out of 5 shards"""
known_key = "0a16088df55283663f7fea6c5f315bde968024b7ac5d715af0325a5507700e5e"
result = Kavach.sharedKey(known_key, 3, 5)
self.assertTrue(result["isShardable"], "Key should be shardable")

# Recover using 4 shards
recovered = recover_key([
result["keyShards"][2],
result["keyShards"][0],
result["keyShards"][1],
result["keyShards"][4]
])
self.assertEqual(recovered["masterKey"], known_key, "Recovered key should match original")

def test_master_key_recovery_5_of_5(self):
"""Test master key recovery with all 5 shards"""
known_key = "0a16088df55283663f7fea6c5f315bde968024b7ac5d715af0325a5507700e5e"
result = Kavach.sharedKey(known_key, 3, 5)
self.assertTrue(result["isShardable"], "Key should be shardable")

# Recover using all 5 shards
recovered = recover_key([
result["keyShards"][2],
result["keyShards"][0],
result["keyShards"][1],
result["keyShards"][4],
result["keyShards"][3]
])
self.assertEqual(recovered["masterKey"], known_key, "Recovered key should match original")

def test_master_key_recovery_2_of_5(self):
"""Test master key recovery fails with only 2 out of 5 shards"""
known_key = "0a16088df55283663f7fea6c5f315bde968024b7ac5d715af0325a5507700e5e"
result = Kavach.sharedKey(known_key, 3, 5)
self.assertTrue(result["isShardable"], "Key should be shardable")

# Try to recover with only 2 shards (below threshold)
recovered = recover_key([
result["keyShards"][0],
result["keyShards"][1]
])
self.assertNotEqual(recovered["masterKey"], known_key, "Recovered key should not match with insufficient shards")