From 732f8f189cf075bdbb52d84f4baa372ed508a8ac Mon Sep 17 00:00:00 2001 From: Kostas Papadimitriou Date: Fri, 9 Nov 2012 14:52:58 +0200 Subject: [PATCH] Tests improvements - Verify exported zeus proofs - Verify voter/anonymous access at all stages - Test excluded user feature --- zeus/tests.py | 385 +++++++++++--------------------------------------- 1 file changed, 85 insertions(+), 300 deletions(-) diff --git a/zeus/tests.py b/zeus/tests.py index 0068c31da..db17b5c91 100644 --- a/zeus/tests.py +++ b/zeus/tests.py @@ -12,6 +12,9 @@ import string import json import md5 +import os +import zipfile +import cStringIO as StringIO from random import choice from datetime import timedelta @@ -27,15 +30,15 @@ from zeus.helios_election import * from zeus.models import * -from zeus.core import get_random_selection, encode_selection, \ - prove_encryption, to_absolute_answers, gamma_encode, to_relative_answers, \ - from_canonical, to_canonical - -TRUSTEES_COUNT = 3 -VOTERS_COUNT = 20 -VOTES_COUNT = 3 -MIXNETS_COUNT = 1 -REMOTE_MIXES_COUNT = 1 +from zeus.core import * + +import datetime + +TRUSTEES_COUNT = int(os.environ.get('ZEUS_TRUSTEES_COUNT', 3)) +VOTERS_COUNT = int(os.environ.get('ZEUS_VOTERS_COUNT', 50)) +VOTES_COUNT = int(os.environ.get('ZEUS_VOTES_COUNT', 5)) +MIXNETS_COUNT = int(os.environ.get('ZEUS_MIXNETS_COUNT', 1)) +REMOTE_MIXES_COUNT = int(os.environ.get('ZEUS_REMOTE_MIXNETS_COUNT', 2)) CAST_AUDITS = False CAST_WITH_AUDIT_SECRET = False DO_REMOTE_MIXES = True @@ -285,17 +288,60 @@ def add_remote_mixes(self, uuid, count=REMOTE_MIXES_COUNT): mix_url = election.get_mix_url() mix = mixer.get(mix_url) mix = from_canonical(mix.content) - new_mix = mix_ciphers(mix, nr_rounds=128, nr_parallel=4) + new_mix = mix_ciphers(mix, nr_rounds=128, nr_parallel=4, + teller=Teller()) mixer.post(mix_url, data=to_canonical(new_mix), content_type="application/binary") self.assertEqual(election.mixnets.count(), count+1) + def exclude_voter(self, voter_obj, election, client): + excluded = voter_obj + # admin cannot delete a voter who already cast a vote + r = client.get('/helios/elections/%s/voters/%s/delete' % (election.uuid, + excluded.uuid)) + # but he may exclude him + r = client.post('/helios/elections/%s/voters/%s/exclude' % (election.uuid, + excluded.uuid), + {'reason':'spammer','confirm':1}) + self.assertEqual(r.status_code, 302) + voter = Voter.objects.get(uuid=excluded.uuid) + self.assertTrue(voter.excluded_at) + self.assertEqual(voter.exclude_reason, u'spammer') + r = client.post('/helios/elections/%s/voters/%s/exclude' % (election.uuid, + excluded.uuid), + {'reason':'spammer','confirm':1}) + self.assertEqual(r.status_code, 403) + return excluded + + def assert_anonymous_cannot_visit(self, election): + c = self.get_client() + r = c.get('/helios/elections/%s/view' % election.uuid, follow=True) + self.assertRedirects(r, '/') + + def assert_voter_cannot_vote(self, election, voter=None): + v = voter or election.voter_set.filter()[choice(range(election.voter_set.count()))] + c = self.get_client() + c.get(v.get_quick_login_url()) + c.get('/helios/elections/%s/view' % election.uuid) + r = c.post('/helios/elections/%s/cast' % (election.uuid,), + {'csrf_token': c.session.get('csrf_token')}) + self.assertEqual(r.status_code, 403) + def test_complete(self): admin1, election = self.create_random_election(settings.TEST_ADMINS[0]) kps = self.prepare_trustees(election.uuid) self.freeze_election(admin1, election.uuid) + election = Election.objects.get(uuid=election.uuid) + election.voting_starts_at = datetime.datetime.now() + timedelta(hours=2) + election.save() + + # voter cannot vote before voting_starts_at + self.assert_voter_cannot_vote(election) + self.assert_anonymous_cannot_visit(election) + + # set proper voting starts at date and do random cast election = Election.objects.get(uuid=election.uuid) election.voting_starts_at = datetime.datetime.now() election.save() @@ -305,14 +351,13 @@ def test_complete(self): election.voting_ends_at = datetime.datetime.now() election.save() - # voter cannot vote after voting ends at - v = election.voter_set.filter()[choice(range(election.voter_set.count()))] - c = self.get_client() - c.get(v.get_quick_login_url()) - c.get('/helios/elections/%s/view' % election.uuid) - r = c.post('/helios/elections/%s/cast' % (election.uuid,), - {'csrf_token': c.session.get('csrf_token')}) - self.assertEqual(r.status_code, 403) + # voter cannot vote before voting_starts_at + self.assert_voter_cannot_vote(election) + self.assert_anonymous_cannot_visit(election) + + excluded = choice(votes) + self.exclude_voter(excluded['voter'], election, admin1) + votes.remove(excluded) self.mix_election(admin1, election.uuid) # check that no mix is stored in jsonfiled, this coz its deprecated @@ -322,6 +367,10 @@ def test_complete(self): assert mix.mix == None assert mix.parts.count() > 0 + # voter cannot vote while mixing + self.assert_voter_cannot_vote(election) + self.assert_anonymous_cannot_visit(election) + if election.mix_key: self.add_remote_mixes(election.uuid) self.finish_mixing(admin1, election.uuid) @@ -339,286 +388,22 @@ def test_complete(self): votes)) self.assertEqual(sorted(election.result[0]), sorted(vote_results)) - -class TestHeliosElection(TestCase): - - UUID = "test" - - def test_zeus_helios_election(self): - pass - - @property - def election(self): - return Election.objects.get(uuid=self.UUID) - - def test_election_workflow(self): - #settings.DEBUG = True - institution = Institution(name="test institution") - e = Election(name="election test", uuid=self.UUID) - e.short_name = "test" - e.departments = [ - "Φιλοσοφική Σχολή", - "Σχολή Θετικών Επιστημών", - "Ανεξάρτητο Παιδαγωγικό Τμήμα Δημοτικής Εκπαίδευσης" - ] - candidates = [ - { - 'department': u'Φιλοσοφική Σχολή', - 'father_name': u'Γεώργιος', - 'name': u'Ιωάννης', - 'surname': u'Παναγιωτόπουλος' - }, - { - 'department': u'Φιλοσοφική Σχολή', - 'father_name': u'Γεώργιος', - 'name': u'Κώνσταντίνος', - 'surname': u'Δημητρίου' - }, - { - 'department': u'Σχολή Θετικών Επιστημών', - 'father_name': u'Γεώργιος', - 'name': u'Κώνσταντίνος', - 'surname': u'Αναστόπουλος' - }, - { - 'department': u'Ανεξάρτητο Παιδαγωγικό Τμήμα Δημοτικής Εκπαίδευσης', - 'father_name': u'Γεώργιος', - 'name': u'Βαγγέλης', - 'surname': u'Παπαδημητρίου' - }, - { - 'department': u'Ανεξάρτητο Παιδαγωγικό Τμήμα Δημοτικής Εκπαίδευσης', - 'father_name': u'Ιωάννης', - 'name': u'Κώνσταντίνος', - 'surname': u'Τσουκαλάς' - }, - ] - - NUM_ANSWERS = choice(range(5, 15)) - cands = [dict(choice(candidates)) for i in range(NUM_ANSWERS)] - for i, cand in enumerate(cands): - cand['name'] += str(i) - - e.candidates = cands - e.questions = [{'choice_type': 'stv', 'result_type': - 'absolute', 'tally_type': 'stv'}] - e.update_answers() - e.save() - - self.assertEqual(self.election.zeus_election.do_get_stage(), "CREATING") - - # generate election zeus trustee - self.election.generate_trustee() - trustee = self.election.get_helios_trustee() - self.assertTrue(self.election.get_helios_trustee()) - self.assertTrue(self.election.get_helios_trustee().secret_key) - self.assertTrue(self.election.get_helios_trustee().public_key) - - # no additional internal trustees allowed - self.election.generate_trustee() - self.assertEqual(trustee, self.election.get_helios_trustee()) - - # generate user trustees - trustees_keys = {} - for i in range(TRUSTEES_COUNT): - t1 = Trustee.objects.create(election=self.election, name="t%d" % i, - email="t%d@test.com" % i) - - for t1 in self.election.trustee_set.filter(secret_key__isnull=True): - t1_kp = ELGAMAL_PARAMS.generate_keypair() - pk = algs.EGPublicKey.from_dict(dict(p=t1_kp.pk.p, q=t1_kp.pk.q, g=t1_kp.pk.g, - y=t1_kp.pk.y)) - pok = t1_kp.sk.prove_sk(DLog_challenge_generator) - self.election.add_trustee_pk(t1, pk, pok) - trustees_keys[t1.pk] = ([t1_kp, pok]) - - for pk in trustees_keys: - self.election.reprove_trustee(self.election.trustee_set.get(pk=pk)) - - self.assertEqual(self.election.trustee_set.count(), TRUSTEES_COUNT+1) - - for i in range(VOTERS_COUNT): - voter_uuid = str(uuid.uuid4()) - voter_id = email = name = "voter%d@testvoter.com" % i - voter_name = "Ψηφοφόρος %d" % i - voter = Voter(uuid= voter_uuid, user = None, voter_login_id = - voter_id,voter_surname=name, - voter_name = name, voter_email = email, election = self.election) - voter.init_audit_passwords() - voter.generate_password() - voter.save() - - self.election.zeus_election.validate_creating() - - e = self.election - e.frozen_at = datetime.datetime.now() - e.save() - self.assertEqual(self.election.zeus_election.do_get_stage(), "VOTING") - - VOTES_CASTED = 0 - SELECTIONS = {} - for voter in range(VOTERS_COUNT): - cast = choice(range(10)) > 1 - cast_with_audit_pass = choice(range(10)) > 5 - if voter == 0: - continue - - if not VOTES_CASTED and voter == VOTERS_COUNT-1: - cast = True - - if not cast: - continue - - cast_votes_count = choice(range(VOTES_COUNT)) + 1 - voter_obj = e.voter_set.get(election=e, - voter_email='voter%d@testvoter.com' % voter) - - print "VOTER", voter - for voter_vote_index in range(cast_votes_count): - audit_password = "" - cast_audit = choice(range(5)) > 3 - - cands_size = len(e.questions[0]['answers']) - vote_size = choice(range(cands_size)) - selection = [] - for s in range(vote_size): - vchoice = choice(range(cands_size)) - if not vchoice in selection: - selection.append(vchoice) - - rel_selection = to_relative_answers(selection, cands_size) - encoded = gamma_encode(rel_selection, cands_size, cands_size) - - print "ENCODED", encoded - plaintext = algs.EGPlaintext(encoded, e.public_key) - randomness = algs.Utils.random_mpz_lt(e.public_key.q) - cipher = e.public_key.encrypt_with_r(plaintext, randomness, True) - - modulus, generator, order = e.zeus_election.do_get_cryptosystem() - enc_proof = prove_encryption(modulus, generator, order, cipher.alpha, - randomness) - - ballot = { - 'election_hash': self.election.hash, - 'election_uuid': self.election.uuid, - 'answers': [{ - 'encryption_proof': enc_proof, - 'choices':[{'alpha': cipher.alpha, 'beta': cipher.beta}] - }] - } - - if cast_with_audit_pass: - audit_password = choice(voter_obj.get_audit_passwords()) - - session_enc_vote = None - enc_vote = datatypes.LDObject.fromDict(ballot, - type_hint='phoebus/EncryptedVote').wrapped_obj - - if cast_audit: - enc_vote.answers[0].answer = [selection] - enc_vote.answers[0].randomness = [randomness] - # mess with original audit password, to force auditing vote to be - # get cast - audit_password = choice(voter_obj.get_audit_passwords()) + \ - str(choice(range(10))) - session_enc_vote = datatypes.LDObject.fromDict(ballot, - type_hint='phoebus/EncryptedVote').wrapped_obj - - signature = self.election.cast_vote(voter_obj, session_enc_vote, - audit_password) - print "VOTER", voter, "AUDIT REQUEST", selection - - enc_vote = enc_vote.ld_object.includeRandomness().wrapped_obj - if choice([0,1]) == 0: - - signature = self.election.cast_vote(voter_obj, enc_vote, - audit_password) - print "VOTER", voter, "AUDIT BALLOT", selection - continue - - if cast_audit and range(10) > 9: - continue - - self.election.cast_vote(voter_obj, enc_vote, audit_password) - print "VOTER", voter, "CASTED BALLOT", selection - SELECTIONS[voter_obj.uuid] = selection - - self.election.zeus_election.validate_voting() - - for a in AuditedBallot.objects.filter(is_request=False): - assert a.vote.encrypted_answers[0].answer != None - assert a.vote.encrypted_answers[0].randomness != None - - e = self.election - e.workflow_type = 'mixnet' - e.save() - e.voting_ended_at = datetime.datetime.now() - e.save() - - for i in range(MIXNETS_COUNT): - e.generate_helios_mixnet() - - self.assertEqual(self.election.zeus_election.do_get_stage(), "MIXING") - - tasks.election_compute_tally(e.pk) - tasks.validate_mixing(e.pk) - - self.assertEqual(self.election.bad_mixnet(), None) - self.assertTrue(self.election.encrypted_tally) - self.assertEqual(self.election.encrypted_tally.num_tallied, len(SELECTIONS.values())) - - e = self.election - tasks.tally_helios_decrypt(e.pk) - for tpk, val in trustees_keys.iteritems(): - trustee = e.trustee_set.get(pk=tpk) - sk = val[0].sk - decryption_factors = [[]] - decryption_proofs = [[]] - for vote in e.encrypted_tally.tally[0]: - dec_factor, proof = sk.decryption_factor_and_proof(vote) - decryption_factors[0].append(dec_factor) - decryption_proofs[0].append(proof) - e.add_trustee_factors(trustee, decryption_factors, - decryption_proofs) - - e = self.election - tasks.tally_decrypt(e.pk) - self.assertTrue(self.election.result) - - self.election.zeus_election.validate_decrypting() - self.assertEqual(e.zeus_election.do_get_stage(), 'FINISHED') - - results = e.pretty_result['abs_selections'] - - from helios.counter import Counter - results = [",".join(map(str, r)) for r in results] - selections = [",".join(map(str, r)) for r in SELECTIONS.values()] - - assert Counter(results) == Counter(selections) - - - # validate ecounting results - ecounting_results = self.election.ecounting_dict()['ballots'] - assert Counter(results) == Counter(selections) - - for voter, selection in SELECTIONS.iteritems(): - vote = selection - print 20*"*" - if not len(vote): - print "EMPTY VOTE" - for i, v in enumerate(vote): - if v == 0: - continue - cand = self.election.candidates[v-1] - print i+1, cand['surname'], cand['name'], cand['father_name'] - print 20*"*" - - ecounting_ballots = [[int(s['candidateTmpId'])-1 for s in ballot['votes']] for \ - ballot in self.election.ecounting_dict()['ballots']] - - ecounting_selections = [",".join(map(str, r)) for r in ecounting_ballots] - assert Counter(results) == Counter(selections) == Counter(ecounting_selections) - #print 20*"=" - print json.dumps(self.election.ecounting_dict(), ensure_ascii=0) - #print 20*"=" + r = admin1.get('/helios/elections/%s/zeus-proofs.zip' % election.uuid) + self.assertEqual(r.status_code, 200) + self._extract_and_validate_zip_proofs(r.content) + + # voter cannot vote after election is finished + self.assert_voter_cannot_vote(election) + self.assert_anonymous_cannot_visit(election) + + def _extract_and_validate_zip_proofs(self, zip_data): + z = zipfile.ZipFile(StringIO(zip_data)) + fname = z.infolist()[0].filename + pth = '/tmp/zeus_testproofs' + z.extract(fname, path=pth) + contents = file(pth+'/'+fname).read() + finished = from_canonical(contents) + election = ZeusCoreElection.new_at_finished(finished, teller=Teller(), + nr_parallel=4) + self.assertEqual(election.validate(), True)