-
-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathpressureTest.py
1160 lines (1038 loc) · 49.8 KB
/
pressureTest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# payShield test utility by Marco S. Zuppone - msz@msz.eu
# This utility is released under AGPL 3.0 license.
# Please refer to the LICENSE file for more information about licensing
# and to README.md file for more information about the usage of it.
import socket
import ssl
import binascii
import string
import sys
from struct import *
import argparse
from pathlib import Path
from typing import Tuple, Dict
from types import FunctionType
VERSION = "1.3.1"
class PayConnector:
"""It represents the connection with the payShield host port. It supports tcp,udp and tls.
Attributes
----------
ssl_sock : SSLSocket
The SSLSocket in case of tls connection.
connection : socket.socket
The connection. It should not be accessed directly
host : str
The host ip or hostname.
port : int
The tcp/udp port to connect with.
protocol: str
The protol to use to connect to the host. Can be only tcp, tls or udp.
connected: bool
When True, the connection has been established already and there is no need to open a new one.
When False, the connection needs to be opened.
keyfile : str
In case of tls protocol this is the full path of the client key file.
crtfile : str
In case of tls protocol this is the full path of the client certificate file.
context : ssl.SSLContext
The SSLContext object.
"""
def __init__(self, host: str, port: int, protocol: str, keyfile: str = None, crtfile: str = None):
"""
Constructor for the PayConnector class. It sets all the initial parameters.
Parameters
----------
host : str
The host ip or hostname.
port : int
The tcp/udp port to connect with.
protocol : str
The protol to use to connect to the host. Can be only tcp, tls or udp.
keyfile : str, optional
In case of tls protocol this is the full path of the client key file.
crtfile : str, optional
In case of tls protocol this is the full path of the client certificate file.
Raises
------
ValueError
If the protocol is not 'tcp', 'tls', or 'udp'.
ValueError
If the protocol is 'tls' but keyfile or crtfile is not provided.
"""
self.keyfile = keyfile
self.crtfile = crtfile
self.ssl_sock = None
self.connection = None
self.context = None
self.host = host
self.port = port
self.protocol = protocol
self.connected = False
if protocol not in ['udp', 'tcp', 'tls']:
raise ValueError("protocol must me udp, tcp or tls")
if protocol == 'tls':
if (keyfile is None) or (crtfile is None):
raise ValueError("keyfile and crtfile parameters are both required")
def send_command(self, host_command: str) -> bytes:
"""
sends the command specified in the parameter to the payShield and return the response.
If establishes the connection if it's not established yet, otherwise reuses the open connection
Parameters
----------
host_command : str
The command to send to the payshield host port.
Returns
-------
bytes
The response from the host.
"""
size = pack('>h', len(host_command))
# join everything together in python3
message = size + host_command.encode()
# Connect to the host and gather the reply in TCP or UDP
buffer_size = 4096
try:
if self.protocol == 'tcp':
if not self.connected:
self.connection = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.connection.connect((self.host, self.port))
# send message
self.connection.send(message)
# receive data
data: bytes = self.connection.recv(buffer_size)
self.connected = True
return data
elif self.protocol == "tls":
# creates the TCP TLS socket
if not self.connected:
# Let's srt uo the context
self.context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
self.context.load_cert_chain(certfile=self.crtfile, keyfile=self.keyfile)
self.context.check_hostname = False
self.context.verify_mode = ssl.CERT_NONE
self.connection = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.ssl_sock = self.context.wrap_socket(self.connection, server_side=False)
self.ssl_sock.connect((self.host, self.port))
# send message
self.ssl_sock.send(message)
# receive data
data: bytes = self.ssl_sock.recv(buffer_size)
self.connected = True
return data
elif self.protocol == 'udp':
if not self.connected:
# create the UDP socket
self.connection = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.connected = True
# send data
self.connection.sendto(message, (self.host, self.port))
# receive data
self.connection.settimeout(5)
data_tuple = self.connection.recvfrom(buffer_size)
data: bytes = data_tuple[0]
return data
except (ConnectionError, TimeoutError) as e:
print("Connection issue: ", e)
self.connected = False
except FileNotFoundError as e:
print("The client certificate file or the client key file cannot be found or accessed.\n" +
"Check value passed to the parameters --keyfile and --crtfile", e)
except ssl.SSLError as e:
raise ssl.SSLError("TLS connection error: ", e)
except Exception as e:
print("Unexpected issue: ", e)
self.connected = False
def close(self):
"""
It invokes the close method of the connection
"""
if self.connected:
self.connection.close()
self.connected = False
def __del__(self):
"""
Destructor for the PayConnector class.
It invokes the close method of the connection
"""
if hasattr(self, 'connection') and self.connection:
self.close()
# End Class
def decode_n0(response_to_decode: bytes, head_len: int):
"""
It decodes the result of the command N0 and prints the meaning of the returned output
Parameters
___________
response_to_decode: bytes
The response returned by the payShield
head_len: int
The length of the header
Returns
___________
nothing
"""
response_to_decode_str, msg_len, str_pointer = common_parser(response_to_decode, head_len)
if response_to_decode_str[str_pointer:str_pointer + 2] == '01':
print("Invalid Random Value Length")
elif response_to_decode_str[str_pointer:str_pointer + 2] == '00':
print("Random payload:(HEX)",
bytes.hex(response_to_decode[6 + head_len:]))
def decode_no(response_to_decode: bytes, head_len: int):
"""
It decodes the result of the command NO and prints the meaning of the returned output
Parameters
___________
response_to_decode: bytes
The response returned by the payShield
head_len: int
The length of the header
Returns
___________
nothing
"""
BUFFER_SIZE: Dict[str, str] = {
'0': '2K bytes', '1': '8K bytes', '2': '16K bytes', '3': '32K bytes'}
NET_PROTO: Dict[str, str] = {'0': 'UDP', '1': 'TCP'}
response_to_decode, msg_len, str_pointer = common_parser(response_to_decode, head_len)
if response_to_decode[str_pointer:str_pointer + 2] == '00': # No errors
if len(response_to_decode) >= (24 + head_len): # Mode 00
# I obtained the value 24 in this way: 2 for the response len, 2 for the error code and the rest is for the
# sum of the field len as indicated by the Core Host Command Manual
str_pointer = str_pointer + 2
print("I/O buffer size: ", BUFFER_SIZE.get(response_to_decode[str_pointer:str_pointer + 1], "Unknown"))
str_pointer = str_pointer + 1
print("Type of connection: ", NET_PROTO.get(response_to_decode[str_pointer:str_pointer + 1], "Unknown"))
str_pointer = str_pointer + 1
if len(response_to_decode) > (24 + head_len): # FW 1.8a or more
socket_field_len = 4 # From FW 1.8a the Number of TCP sockets is 4 character long instead of 2
else:
socket_field_len = 2
print("Number of TCP sockets: ", response_to_decode[str_pointer:str_pointer + socket_field_len])
str_pointer = str_pointer + socket_field_len
print("Firmware number: ", response_to_decode[str_pointer:str_pointer + 9])
str_pointer = str_pointer + 9
print("Reserved: ", response_to_decode[str_pointer:str_pointer + 1])
str_pointer = str_pointer + 1
print("Reserved: ", response_to_decode[str_pointer:str_pointer + 4])
else: # Mode 01
str_pointer = str_pointer + 2
if response_to_decode[str_pointer:str_pointer + 1] == '0':
print(
"Some of the security settings relevant to PCI HSM compliance have non-compliant values.\n"
"\"The Enforce key type 002 separation for PCI HSM compliance\" setting is one of these.")
elif response_to_decode[str_pointer:str_pointer + 1] == '1':
print("All security settings relevant to PCI HSM compliance have compliant values.")
elif response_to_decode[str_pointer:str_pointer + 1] == '2':
print(
"Some of the security settings relevant to PCI HSM compliance have non-compliant values.\n"
"\"The Enforce key type 002 separation for PCI HSM compliance\" setting is not one of these.")
def decode_ni(response_to_decode: bytes, head_len: int):
"""
It decodes the result of the command NI and prints the meaning of the returned output
Parameters
___________
response_to_decode: bytes
The response returned by the payShield
head_len: int
The length of the header
Returns
___________
nothing
"""
NET_PROTO: Dict[str, str] = {'0': 'TCP', '1': 'UDP'}
SPECIFIC_ERROR: Dict[str, str] = {'01': 'Failed to execute NETSTAT',
'82': 'Invalid Ethernet Statistics value'}
NET_CONNECTION_STATUS: Dict[str, str] = {'0': 'ESTABLISHED', '1': 'CLOSED'}
response_to_decode, msg_len, str_pointer = common_parser(response_to_decode, head_len)
if response_to_decode[str_pointer:str_pointer + 2] == '00': # No errors
str_pointer = str_pointer + 2
print("Records to follow: ", response_to_decode[str_pointer:str_pointer + 4])
records_to_follow = int(response_to_decode[str_pointer:str_pointer + 4])
str_pointer = str_pointer + 4
for record in range(records_to_follow):
print("Protocol: ", NET_PROTO.get(response_to_decode[str_pointer:str_pointer + 1],
"Unknown"))
str_pointer = str_pointer + 1
print("Local port number: ", response_to_decode[str_pointer:str_pointer + 4])
str_pointer = str_pointer + 4
print("IP Address: ", hex2ip(response_to_decode[str_pointer:str_pointer + 8]))
str_pointer = str_pointer + 8
print("Remote port number: ", response_to_decode[str_pointer:str_pointer + 4])
str_pointer = str_pointer + 4
print("Connection Status: ", NET_CONNECTION_STATUS.get(response_to_decode[str_pointer:str_pointer + 1],
'Reserved'))
str_pointer = str_pointer + 1
print("Duration: ", response_to_decode[str_pointer:str_pointer + 8])
str_pointer = str_pointer + 8
print("Total Bytes Sent: ", int(response_to_decode[str_pointer:str_pointer + 16], 16))
str_pointer = str_pointer + 16
print("Total Bytes Received: ", int(response_to_decode[str_pointer:str_pointer + 16], 16))
str_pointer = str_pointer + 16
print("Total Unicast Packets Sent: ", int(response_to_decode[str_pointer:str_pointer + 8], 16))
str_pointer = str_pointer + 8
print("Total Unicast Packets Received: ", int(response_to_decode[str_pointer:str_pointer + 8], 16))
str_pointer = str_pointer + 8
print("Total Non-unicast packets Sent: ", int(response_to_decode[str_pointer:str_pointer + 8], 16))
str_pointer = str_pointer + 8
print("Total Non-unicast packets Received: ", int(response_to_decode[str_pointer:str_pointer + 8], 16))
str_pointer = str_pointer + 8
print("Total Packets Discarded During Send: ", int(response_to_decode[str_pointer:str_pointer + 8], 16))
str_pointer = str_pointer + 8
print("Total Packets Discarded During Receive: ", int(response_to_decode[str_pointer:str_pointer + 8], 16))
str_pointer = str_pointer + 8
print("Total Errors During Send: ", int(response_to_decode[str_pointer:str_pointer + 8], 16))
str_pointer = str_pointer + 8
print("Total Errors During Receive: ", int(response_to_decode[str_pointer:str_pointer + 8], 16))
str_pointer = str_pointer + 8
print("Total Unknown Packets: ", int(response_to_decode[str_pointer:str_pointer + 8], 16))
str_pointer = str_pointer + 8
else:
if SPECIFIC_ERROR.get(response_to_decode[str_pointer:str_pointer + 2]) is not None:
print("Command specific error: ", SPECIFIC_ERROR.get(response_to_decode[str_pointer:str_pointer + 2]))
def decode_nc(response_to_decode: bytes, head_len: int):
"""
It decodes the result of the command NC and prints the meaning of the returned output
The message trailer is not considered
Parameters
___________
response_to_decode: bytes
The response returned by the payShield
head_len: int
The length of the header
Returns
___________
nothing
"""
response_to_decode, msg_len, str_pointer = common_parser(response_to_decode, head_len)
if response_to_decode[str_pointer:str_pointer + 2] == '00':
str_pointer = str_pointer + 2
print("LMK CRC:", response_to_decode[str_pointer:str_pointer + 16])
str_pointer = str_pointer + 16
print("Firmware number:", response_to_decode[str_pointer:str_pointer + 9])
def decode_j8(response_to_decode: bytes, head_len: int):
"""
It decodes the result of the command J8 and prints the meaning of the returned output
The message trailer is not considered
Parameters
___________
response_to_decode: bytes
The response returned by the payShield
head_len: int
The length of the header
Returns
___________
nothing
"""
response_to_decode, msg_len, str_pointer = common_parser(response_to_decode, head_len)
if response_to_decode[str_pointer:str_pointer + 2] == '00':
str_pointer = str_pointer + 2
print("Serial Number: ", response_to_decode[str_pointer:str_pointer + 12])
str_pointer = str_pointer + 12
print("Start Date: ", response_to_decode[str_pointer:str_pointer + 6])
str_pointer = str_pointer + 6
print("Start Time: ", response_to_decode[str_pointer:str_pointer + 6])
str_pointer = str_pointer + 6
print("End Date: ", response_to_decode[str_pointer:str_pointer + 6])
str_pointer = str_pointer + 6
print("End Time: ", response_to_decode[str_pointer:str_pointer + 6])
str_pointer = str_pointer + 6
print("Current Date: ", response_to_decode[str_pointer:str_pointer + 6])
str_pointer = str_pointer + 6
print("Current Time: ", response_to_decode[str_pointer:str_pointer + 6])
str_pointer = str_pointer + 6
print("Reboots: ", response_to_decode[str_pointer:str_pointer + 10])
str_pointer = str_pointer + 10
print("Tampers: ", response_to_decode[str_pointer:str_pointer + 10])
str_pointer = str_pointer + 10
print("Pin verifies/minute: ", response_to_decode[str_pointer:str_pointer + 7])
str_pointer = str_pointer + 7
print("Pin verifies/hour: ", response_to_decode[str_pointer:str_pointer + 5])
str_pointer = str_pointer + 5
print("Pin attacks: ", response_to_decode[str_pointer:str_pointer + 8])
def decode_b2(response_to_decode: bytes, head_len: int):
"""
It decodes the result of the command B2 and prints the meaning of the returned output
The message trailer is not considered
Parameters
___________
response_to_decode: bytes
The response returned by the payShield
head_len: int
The length of the header
Returns
___________
nothing
"""
response_to_decode, msg_len, str_pointer = common_parser(response_to_decode, head_len)
if response_to_decode[str_pointer:str_pointer + 2] == '00': # no errors
str_pointer = str_pointer + 2
print("Payload echoed: ", response_to_decode[str_pointer:])
def decode_j2(response_to_decode: bytes, head_len: int):
"""
It decodes the result of the command J2 and prints the meaning of the returned output
The message trailer is not considered
Parameters
___________
response_to_decode: bytes
The response returned by the payShield
head_len: int
The length of the header
Returns
___________
nothing
"""
response_to_decode, msg_len, str_pointer = common_parser(response_to_decode, head_len)
if response_to_decode[str_pointer:str_pointer + 2] == '00':
str_pointer = str_pointer + 2
print("Serial Number: ", response_to_decode[str_pointer:str_pointer + 12])
str_pointer = str_pointer + 12
print("Start Date: ", response_to_decode[str_pointer:str_pointer + 6])
str_pointer = str_pointer + 6
print("Start Time: ", response_to_decode[str_pointer:str_pointer + 6])
str_pointer = str_pointer + 6
print("End Date: ", response_to_decode[str_pointer:str_pointer + 6])
str_pointer = str_pointer + 6
print("End Time: ", response_to_decode[str_pointer:str_pointer + 6])
str_pointer = str_pointer + 6
print("Current Date: ", response_to_decode[str_pointer:str_pointer + 6])
str_pointer = str_pointer + 6
print("Current Time: ", response_to_decode[str_pointer:str_pointer + 6])
str_pointer = str_pointer + 6
print("Seconds: ", response_to_decode[str_pointer:str_pointer + 10])
str_pointer = str_pointer + 10
while (str_pointer + 15) <= msg_len:
print("Starting percentage: ", response_to_decode[str_pointer:str_pointer + 3])
str_pointer = str_pointer + 3
print("Ending percentage: ", response_to_decode[str_pointer:str_pointer + 3])
str_pointer = str_pointer + 3
print("Number Times Periods: ", response_to_decode[str_pointer:str_pointer + 10])
str_pointer = str_pointer + 10
print("Delimiter: ", response_to_decode[str_pointer:str_pointer + 1])
str_pointer = str_pointer + 1
print("")
def decode_j4(response_to_decode: bytes, head_len: int):
"""
It decodes the result of the command J4 and prints the meaning of the returned output
The message trailer is not considered
Parameters
___________
response_to_decode: bytes
The response returned by the payShield
head_len: int
The length of the header
Returns
___________
nothing
"""
response_to_decode, msg_len, str_pointer = common_parser(response_to_decode, head_len)
if response_to_decode[str_pointer:str_pointer + 2] == '00':
str_pointer = str_pointer + 2
print("Serial Number: ", response_to_decode[str_pointer:str_pointer + 12])
str_pointer = str_pointer + 12
print("Start Date: ", response_to_decode[str_pointer:str_pointer + 6])
str_pointer = str_pointer + 6
print("Start Time: ", response_to_decode[str_pointer:str_pointer + 6])
str_pointer = str_pointer + 6
print("End Date: ", response_to_decode[str_pointer:str_pointer + 6])
str_pointer = str_pointer + 6
print("End Time: ", response_to_decode[str_pointer:str_pointer + 6])
str_pointer = str_pointer + 6
print("Current Date: ", response_to_decode[str_pointer:str_pointer + 6])
str_pointer = str_pointer + 6
print("Current Time: ", response_to_decode[str_pointer:str_pointer + 6])
str_pointer = str_pointer + 6
print("Seconds: ", response_to_decode[str_pointer:str_pointer + 10])
str_pointer = str_pointer + 10
while (str_pointer + 12) <= msg_len:
print("Command Code: ", response_to_decode[str_pointer:str_pointer + 2])
str_pointer = str_pointer + 2
print("Transactions: ", response_to_decode[str_pointer:str_pointer + 12])
str_pointer = str_pointer + 12
def decode_jk(response_to_decode: bytes, head_len: int):
"""
It decodes the result of the command JK and prints the meaning of the returned output
The message trailer is not considered
Parameters
___________
response_to_decode: bytes
The response returned by the payShield
head_len: int
The length of the header
Returns
___________
nothing
"""
# structures to decode the result
# We can use CONSOLE_STATUS_CODE to check the status of the payShield Manager as well.
CONSOLE_STATUS_CODE = {
'0': 'unknown',
'1': 'running',
'2': 'not running',
'3': 'console disabled by GUI'}
TAMPER_STATUS_CODE = {
'0': 'Unknown',
'1': 'Not Tampered',
'2': 'Tampered'}
HOST_STATUS_CODE = {
'0': 'unknown',
'1': 'running',
'2': 'not running',
'3': 'not configured'
}
TAMPER_CAUSE_CODE = {
'00': 'unknown',
'01': 'temp out of range',
'02': 'battery low',
'03': 'erase button pressed',
'04': 'security processor watchdog',
'05': 'power too high',
'06': 'security processor restart',
'07': 'motion detected',
'08': 'case tampered',
'09': 'TSPP Module',
'10': 'General'
}
LMK_ALGORITHM_CODE = {
'0': '3DES2Key',
'1': '3DES3Key',
'2': 'AES 256-bit'
}
LMK_SCHEME_CODE = {
'V': 'Variant',
'K': 'Keyblock'
}
LMK_STATUS_CODE = {
'L': 'Live',
'T': 'Test'
}
LMK_AUTH_CODE = {
'0': 'Not authorized',
'1': 'Authorized'
}
FRAUD_CODE = {
'0': 'not exceeded (or not enabled)',
'1': 'exceeded'
}
response_to_decode, msg_len, str_pointer = common_parser(response_to_decode, head_len)
if response_to_decode[str_pointer:str_pointer + 2] == '00':
str_pointer = str_pointer + 2
print("Serial Number: ", response_to_decode[str_pointer:str_pointer + 12])
str_pointer = str_pointer + 12
print("System Date: ", response_to_decode[str_pointer:str_pointer + 6])
str_pointer = str_pointer + 6
print("System Time: ", response_to_decode[str_pointer:str_pointer + 6])
str_pointer = str_pointer + 6
print("Console State: ", CONSOLE_STATUS_CODE.get(response_to_decode[str_pointer:str_pointer + 1], '?'))
str_pointer = str_pointer + 1
print("payShield Manager State: ",
CONSOLE_STATUS_CODE.get(response_to_decode[str_pointer:str_pointer + 1], '?'))
str_pointer = str_pointer + 1
print("HOST 1 State: ", HOST_STATUS_CODE.get(response_to_decode[str_pointer:str_pointer + 1], '?'))
str_pointer = str_pointer + 1
print("HOST 2 State: ", HOST_STATUS_CODE.get(response_to_decode[str_pointer:str_pointer + 1], '?'))
str_pointer = str_pointer + 1
print("Reserved: ", response_to_decode[str_pointer:str_pointer + 1])
str_pointer = str_pointer + 1
print("Reserved: ", response_to_decode[str_pointer:str_pointer + 1])
str_pointer = str_pointer + 1
tamper_state = response_to_decode[str_pointer:str_pointer + 1]
print("Tamper State: ", TAMPER_STATUS_CODE.get(tamper_state, '?'))
str_pointer = str_pointer + 1
if tamper_state == '2':
print("Tamper Cause: ", TAMPER_CAUSE_CODE.get(response_to_decode[str_pointer:str_pointer + 2], '?'))
str_pointer = str_pointer + 2
print("Tamper Date: ", response_to_decode[str_pointer:str_pointer + 6])
str_pointer = str_pointer + 6
print("Tamper Time: ", response_to_decode[str_pointer:str_pointer + 6])
str_pointer = str_pointer + 6
lmk_loaded = response_to_decode[str_pointer:str_pointer + 2]
print("Number of LMK Loaded: ", lmk_loaded)
str_pointer = str_pointer + 2
print("Number of Test LMK: ", response_to_decode[str_pointer:str_pointer + 2])
str_pointer = str_pointer + 2
print("Number of Old LMK: ", response_to_decode[str_pointer:str_pointer + 2])
str_pointer = str_pointer + 2
print("There are ", lmk_loaded, " LMK(s) loaded")
try:
lmks_loaded_num = int(lmk_loaded)
except ValueError:
lmks_loaded_num = -1
if lmks_loaded_num > 0:
remaining_to_decode = response_to_decode[str_pointer:]
lmks_string = str.split(remaining_to_decode, '\x15')[0]
lmks_array = str.split(lmks_string, '\x14')
for lmk in lmks_array:
if len(lmk) > 0:
local_lmk_pointer = 0
print("LMK ID: ", lmk[local_lmk_pointer:local_lmk_pointer + 2])
local_lmk_pointer = local_lmk_pointer + 2
print("Authorised: ", LMK_AUTH_CODE.get(lmk[local_lmk_pointer:local_lmk_pointer + 1], '?'))
local_lmk_pointer = local_lmk_pointer + 1
print("Num Authorised Activities: ", lmk[local_lmk_pointer:local_lmk_pointer + 2])
local_lmk_pointer = local_lmk_pointer + 2
print("LMK Scheme: ", LMK_SCHEME_CODE.get(lmk[local_lmk_pointer:local_lmk_pointer + 1], '?'))
local_lmk_pointer = local_lmk_pointer + 1
print("Algorithm: ", LMK_ALGORITHM_CODE.get(lmk[local_lmk_pointer:local_lmk_pointer + 1], '?'))
local_lmk_pointer = local_lmk_pointer + 1
print("Status: ", LMK_STATUS_CODE.get(lmk[local_lmk_pointer:local_lmk_pointer + 1], '?'))
local_lmk_pointer = local_lmk_pointer + 1
print("Comments: ", lmk[local_lmk_pointer:])
print("")
fraud_detection = str.split(response_to_decode[str_pointer:], '\x15')[1]
print("Fraud detection Exceeded: ", FRAUD_CODE.get(fraud_detection[0], '?'))
print("PIN attacks exceeded: ", FRAUD_CODE.get(fraud_detection[1], '?'))
print("")
def decode_ecc(response_to_decode: bytes, head_len: int):
"""
It decodes the result of the command FY and prints the meaning of the returned output
Parameters
___________
response_to_decode: bytes
The response returned by the payShield
head_len: int
The length of the header
Returns
___________
nothing
"""
response_to_decode_str, msg_len, str_pointer = common_parser(response_to_decode, head_len)
if response_to_decode_str[str_pointer:str_pointer + 2] == '00':
str_pointer = str_pointer + 2
key_len = int(response_to_decode_str[str_pointer:str_pointer + 4])
print("ECC Public Key Length: ", key_len)
str_pointer = str_pointer + 4
print("ECC Public Key",
bytes.hex(response_to_decode[str_pointer:str_pointer + key_len]))
print("Public/private separator: ",
response_to_decode[str_pointer + key_len:str_pointer + key_len + 1].decode('ascii', 'ignore'))
str_pointer = str_pointer + key_len + 1
print("ECC Private Key under LMK",
bytes.hex(response_to_decode[str_pointer:]))
def payshield_error_codes(error_code: str) -> str:
"""This function maps the result code with the error message.
I derived the list of errors and messages from the following manual:
payShield 10K Core Host Commands v1
Revision: A
Date: 04 August 2020
Doc.Number: PUGD0537 - 004
Parameters
----------
error_code: str
The status code returned from the payShield 10k
Returns
----------
a string containing the message of the error code
"""
PAYSHIELD_ERROR_CODE = {
'00': 'No error',
'01': 'Verification failure or warning of imported key parity error',
'02': 'Key inappropriate length for algorithm',
'04': 'Invalid key type code',
'05': 'Invalid key length flag',
'10': 'Source key parity error',
'11': 'Destination key parity error or key all zeros',
'12': 'Contents of user storage not available. Reset, power-down or overwrite',
'13': 'Invalid LMK Identifier',
'14': 'PIN encrypted under LMK pair 02-03 is invalid',
'15': 'Invalid input data (invalid format, invalid characters, or not enough data provided)',
'16': 'Console or printer not ready or not connected',
'17': 'HSM not authorized, or operation prohibited by security settings',
'18': 'Document format definition not loaded',
'19': 'Specified Diebold Table is invalid',
'20': 'PIN block does not contain valid values',
'21': 'Invalid index value, or index/block count would cause an overflow condition',
'22': 'Invalid account number',
'23': 'Invalid PIN block format code. (Use includes where the security setting to implement PCI HSM '
'limitations on PIN Block format usage is applied, and a Host command attempts to convert a PIN Block '
'to a disallowed format.)',
'24': 'PIN is fewer than 4 or more than 12 digits in length',
'25': 'Decimalization Table error',
'26': 'Invalid key scheme',
'27': 'Incompatible key length',
'28': 'Invalid key type',
'29': 'Key function not permitted',
'30': 'Invalid reference number',
'31': 'Insufficient solicitation entries for batch',
'32': 'AES not licensed',
'33': 'LMK key change storage is corrupted',
'39': 'Fraud detection',
'40': 'Invalid checksum',
'41': 'Internal hardware/software error: bad RAM, invalid error codes, etc.',
'42': 'DES failure',
'43': 'RSA Key Generation Failure',
'46': 'Invalid tag for encrypted PIN',
'47': 'Algorithm not licensed',
'49': 'Private key error, report to supervisor',
'51': 'Invalid message header',
'65': 'Transaction Key Scheme set to None',
'67': 'Command not licensed',
'68': 'Command has been disabled',
'69': 'PIN block format has been disabled',
'74': 'Invalid digest info syntax (no hash mode only)',
'75': 'Single length key masquerading as double or triple length key',
'76': 'RSA public key length error or RSA encrypted data length error',
'77': 'Clear data block error',
'78': 'Private key length error',
'79': 'Hash algorithm object identifier error',
'80': 'Data length error. The amount of MAC data (or other data) is greater than or less than the expected '
'amount.',
'81': 'Invalid certificate header',
'82': 'Invalid check value length',
'83': 'Key block format error',
'84': 'Key block check value error',
'85': 'Invalid OAEP Mask Generation Function',
'86': 'Invalid OAEP MGF Hash Function',
'87': 'OAEP Parameter Error',
'90': 'Data parity error in the request message received by the HSM',
'A1': 'Incompatible LMK schemes',
'A2': 'Incompatible LMK identifiers',
'A3': 'Incompatible key block LMK identifiers',
'A4': 'Key block authentication failure',
'A5': 'Incompatible key length',
'A6': 'Invalid key usage',
'A7': 'Invalid algorithm',
'A8': 'Invalid mode of use',
'A9': 'Invalid key version number',
'AA': 'Invalid export field',
'AB': 'Invalid number of optional blocks',
'AC': 'Optional header block error',
'AD': 'Key status optional block error',
'AE': 'Invalid start date/time',
'AF': 'Invalid end date/time',
'B0': 'Invalid encryption mode',
'B1': 'Invalid authentication mode',
'B2': 'Miscellaneous key block error',
'B3': 'Invalid number of optional blocks',
'B4': 'Optional block data error',
'B5': 'Incompatible components',
'B6': 'Incompatible key status optional blocks',
'B7': 'Invalid change field',
'B8': 'Invalid old value',
'B9': 'Invalid new value',
'BA': 'No key status block in the key block',
'BB': 'Invalid wrapping key',
'BC': 'Repeated optional block',
'BD': 'Incompatible key types',
'BE': 'Invalid key block header ID',
'D2': 'Invalid curve reference',
'D3': 'Invalid Key Encoding',
'E0': 'Invalid command version number'
}
return PAYSHIELD_ERROR_CODE.get(error_code, "Unknown error")
def check_returned_command_verb(result_returned: bytes, head_len: int, command_sent: str) -> Tuple[int, str, str]:
"""
Checks if the command returned by the payShield is congruent to the command sent
Parameters
----------
result_returned: bytes
The output returned from the payShield
head_len: int
The length of the header
command_sent: str
The command send to the payShield
Returns
----------
a Tuple[int, str, str]
where the first value is 0 of the command is congruent or -1 if it is not
the second value is the command sent
the third value is the command returned by te payShield
"""
verb_returned = result_returned[2 + head_len:][:2]
verb_sent = command_sent[head_len:][:2]
verb_expected = verb_sent[0:1] + chr(ord(verb_sent[1:2]) + 1)
if verb_returned != verb_expected.encode():
return -1, verb_sent, verb_returned.decode()
else:
return 0, verb_sent, verb_returned.decode()
def check_return_message(result_returned: bytes, head_len: int) -> Tuple[str, str]:
if len(result_returned) < 2 + head_len + 2: # 2 bytes for len + 2 header len + 2 for command
return "ZZ", "Incomplete message"
# decode the first two bytes returned and transform them in integer
try:
expected_msg_len = int.from_bytes(result_returned[:2], byteorder='big', signed=False)
except ValueError:
return "ZZ", "Malformed message"
except Exception:
return "ZZ", "Unknown message length parsing error"
# compares the effective message length with then one stated in the first two bytes of the message
if len(result_returned) - 2 != expected_msg_len:
return "ZZ", "Length mismatch"
ret_code_position = 2 + head_len + 2
# better be safe than sorry
try:
# ret_code = int(result_returned[ret_code_position:ret_code_position + 2])
ret_code = result_returned[ret_code_position:ret_code_position + 2].decode()
except (ValueError, UnicodeDecodeError):
return "ZZ", "message result code parsing error"
except Exception:
return "ZZ", "Unknown message result code parsing error"
# try to describe the error
return ret_code, payshield_error_codes(ret_code)
def test_printable(input_str):
return all(c in string.printable for c in input_str)
def hex2ip(hex_ip):
addr_long = int(hex_ip, 16)
hex_ip = socket.inet_ntoa(pack(">L", addr_long))
return hex_ip
def run_test(payConnectorInstance: PayConnector, host_command: str, header_len: int = 4,
decoder_funct: FunctionType = None) -> str:
"""
It connects to the specified host and port, using the specified protocol (tcp, udp or tls) and sends the command.
Parameters
___________
payConnectorInstance: PayConnector
The instance of the PayConnector class
host_command: str
The command to send to the payShield complete of the header part
header_len: int
The length of the header. If not specified the value is 4 because it is the default factory value
in payShield 10k
decoder_funct: FunctionType
If provided needs to be a reference to a function that is able to parse the command and print the meaning of it
If not provided the default is None
Returns
___________
The return code from the command
"""
return_code_tuple = ['ZZ', 'Error']
try:
# calculate the size and format it correctly
size = pack('>h', len(host_command))
# join everything together in python3
message = size + host_command.encode()
# Connect to the host and gather the reply in TCP or UDP
data = payConnectorInstance.send_command(host_command)
# If no data is returned
if data is None:
return 'Error'
# try to decode the result code contained in the reply of the payShield
check_result_tuple = (-1, "", "")
return_code_tuple = check_return_message(data, header_len)
if return_code_tuple[0] != "ZZ":
print()
check_result_tuple = check_returned_command_verb(data, header_len, host_command)
print("Return code: " + str(return_code_tuple[0]) + " " + return_code_tuple[1])
if check_result_tuple[0] != 0:
print("NOTE: The response received from the HSM seems unrelated to the request!")
print("Command sent/received: " + check_result_tuple[1] + " ==> " + check_result_tuple[2])
# don't print ascii if msg or resp contains non printable chars
if test_printable(message[2:].decode("ascii", "ignore")):
print("sent data (ASCII) :", message[2:].decode("ascii", "ignore"))
print("sent data (HEX) :", bytes.hex(message))
if test_printable((data[2:]).decode("ascii", "ignore")):
print("received data (ASCII):", data[2:].decode("ascii", "ignore"))
print("received data (HEX) :", bytes.hex(data))
if (decoder_funct is not None) and callable(decoder_funct):
print("")
print("-----DECODING RESPONSE-----")
decoder_funct(data, header_len)
except ConnectionError as e:
print("Connection issue: ", e)
except FileNotFoundError as e:
print("The client certificate file or the client key file cannot be found or accessed.\n" +
"Check value passed to the parameters --keyfile and --crtfile", e)
except Exception as e:
print("Unexpected issue:", e)
finally:
return return_code_tuple[0]
def common_parser(response_to_decode: bytes, head_len: int) -> Tuple[str, int, int]:
"""
This function is a helper used by the decode_XX functions.
It converts the response_to_decode in ascii, calculates and prints the message size and
prints the header, the command returned and the error code.
Parameters
___________
response_to_decode: bytes
The response returned by the payShield
head_len: int
The length of the header
Returns
___________
returns a tuple:
message_to_decode: str
The message_to_decode converted in ascii
msg_len: int
The length of the message
str_pointer: int
the pointer (position) of the last interpreted/parsed character of the message_to_decode
"""
msg_len = int.from_bytes(response_to_decode[:2], byteorder='big', signed=False)
print("Message length: ", msg_len)
response_to_decode = response_to_decode.decode('ascii', 'replace')
str_pointer: int = 2
print("Header: ", response_to_decode[str_pointer:str_pointer + head_len])
str_pointer = str_pointer + head_len
print("Command returned: ", response_to_decode[str_pointer:str_pointer + 2])
str_pointer = str_pointer + 2
print("Error returned: ", response_to_decode[str_pointer:str_pointer + 2])
return response_to_decode, msg_len, str_pointer
# End
if __name__ == "__main__":
print("PayShield stress utility, version " + VERSION + ", by Marco S. Zuppone - msz@msz.eu - https://msz.eu")
print("To get more info about the usage invoke it with the -h option")
print("This software is open source and it is under the Affero AGPL 3.0 license")
print("")
# List of decoder functions used to interpreter the result.
# The reference to the function is used as parameter in the run_test function.
# If the parameter is not passed because a decoder for that command it is not defined the default value of the
# parameter assumes the value of None
DECODERS = {
'NO': decode_no,
'NC': decode_nc,
'N0': decode_n0,
'J8': decode_j8,