-
-
Notifications
You must be signed in to change notification settings - Fork 818
/
Copy pathfunctions.py
2682 lines (2157 loc) · 90 KB
/
functions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import hashlib
import math
import operator
from vyper import ast as vy_ast
from vyper.abi_types import ABI_Tuple
from vyper.ast.validation import validate_call_args
from vyper.codegen.abi_encoder import abi_encode
from vyper.codegen.context import Context, VariableRecord
from vyper.codegen.core import (
STORE,
IRnode,
add_ofst,
bytes_data_ptr,
calculate_type_for_external_return,
check_external_call,
clamp,
clamp2,
clamp_basetype,
clamp_nonzero,
copy_bytes,
dummy_node_for_type,
ensure_eval_once,
ensure_in_memory,
eval_seq,
get_bytearray_length,
get_type_for_exact_size,
ir_tuple_from_args,
make_setter,
promote_signed_int,
sar,
shl,
shr,
unwrap_location,
)
from vyper.codegen.expr import Expr
from vyper.codegen.ir_node import Encoding, scope_multi
from vyper.codegen.keccak256_helper import keccak256_helper
from vyper.evm.address_space import MEMORY, STORAGE
from vyper.exceptions import (
ArgumentException,
CompilerPanic,
InvalidLiteral,
InvalidType,
StateAccessViolation,
StructureException,
TypeMismatch,
UnfoldableNode,
ZeroDivisionException,
)
from vyper.semantics.analysis.base import Modifiability, VarInfo
from vyper.semantics.analysis.utils import (
get_common_types,
get_exact_type_from_node,
get_possible_types_from_node,
validate_expected_type,
)
from vyper.semantics.types import (
TYPE_T,
AddressT,
BoolT,
BytesM_T,
BytesT,
DArrayT,
DecimalT,
HashMapT,
IntegerT,
KwargSettings,
SArrayT,
StringT,
TupleT,
)
from vyper.semantics.types.bytestrings import _BytestringT
from vyper.semantics.types.shortcuts import (
BYTES4_T,
BYTES32_T,
INT128_T,
INT256_T,
UINT8_T,
UINT256_T,
)
from vyper.semantics.types.utils import type_from_annotation
from vyper.utils import (
DECIMAL_DIVISOR,
EIP_170_LIMIT,
SHA3_PER_WORD,
MemoryPositions,
bytes_to_int,
ceil32,
fourbytes_to_int,
keccak256,
method_id,
method_id_int,
vyper_warn,
)
from ._convert import convert
from ._signatures import BuiltinFunctionT, process_inputs
SHA256_ADDRESS = 2
SHA256_BASE_GAS = 60
SHA256_PER_WORD_GAS = 12
class FoldedFunctionT(BuiltinFunctionT):
# Base class for nodes which should always be folded
_modifiability = Modifiability.CONSTANT
class TypenameFoldedFunctionT(FoldedFunctionT):
# Base class for builtin functions that:
# (1) take a typename as the only argument; and
# (2) should always be folded.
_inputs = [("typename", TYPE_T.any())]
def fetch_call_return(self, node):
type_ = self.infer_arg_types(node)[0].typedef
return type_
def infer_arg_types(self, node, expected_return_typ=None):
validate_call_args(node, 1)
input_typedef = TYPE_T(type_from_annotation(node.args[0]))
return [input_typedef]
class Floor(BuiltinFunctionT):
_id = "floor"
_inputs = [("value", DecimalT())]
# TODO: maybe use int136?
_return_type = INT256_T
def _try_fold(self, node):
validate_call_args(node, 1)
value = node.args[0].get_folded_value()
if not isinstance(value, vy_ast.Decimal):
raise UnfoldableNode
value = math.floor(value.value)
return vy_ast.Int.from_node(node, value=value)
@process_inputs
def build_IR(self, expr, args, kwargs, context):
arg = args[0]
with arg.cache_when_complex("arg") as (b1, arg):
ret = IRnode.from_list(
[
"if",
["slt", arg, 0],
["sdiv", ["sub", arg, DECIMAL_DIVISOR - 1], DECIMAL_DIVISOR],
["sdiv", arg, DECIMAL_DIVISOR],
],
typ=INT256_T,
)
return b1.resolve(ret)
class Ceil(BuiltinFunctionT):
_id = "ceil"
_inputs = [("value", DecimalT())]
# TODO: maybe use int136?
_return_type = INT256_T
def _try_fold(self, node):
validate_call_args(node, 1)
value = node.args[0].get_folded_value()
if not isinstance(value, vy_ast.Decimal):
raise UnfoldableNode
value = math.ceil(value.value)
return vy_ast.Int.from_node(node, value=value)
@process_inputs
def build_IR(self, expr, args, kwargs, context):
arg = args[0]
with arg.cache_when_complex("arg") as (b1, arg):
ret = IRnode.from_list(
[
"if",
["slt", arg, 0],
["sdiv", arg, DECIMAL_DIVISOR],
["sdiv", ["add", arg, DECIMAL_DIVISOR - 1], DECIMAL_DIVISOR],
],
typ=INT256_T,
)
return b1.resolve(ret)
class Convert(BuiltinFunctionT):
_id = "convert"
def fetch_call_return(self, node):
_, target_typedef = self.infer_arg_types(node)
# note: more type conversion validation happens in convert.py
return target_typedef.typedef
# TODO: push this down into convert.py for more consistency
def infer_arg_types(self, node, expected_return_typ=None):
validate_call_args(node, 2)
target_type = type_from_annotation(node.args[1])
value_types = get_possible_types_from_node(node.args[0])
# For `convert` of integer literals, we need to match type inference rules in
# convert.py codegen routines.
# TODO: This can probably be removed once constant folding for `convert` is implemented
if len(value_types) > 1 and all(isinstance(v, IntegerT) for v in value_types):
# Get the smallest (and unsigned if available) type for non-integer target types
# (note this is different from the ordering returned by `get_possible_types_from_node`)
if not isinstance(target_type, IntegerT):
value_types = sorted(value_types, key=lambda v: (v.is_signed, v.bits), reverse=True)
else:
# filter out the target type from list of possible types
value_types = [i for i in value_types if not target_type.compare_type(i)]
value_type = value_types.pop()
# block conversions between same type
if target_type.compare_type(value_type):
raise InvalidType(f"Value and target type are both '{target_type}'", node)
return [value_type, TYPE_T(target_type)]
def build_IR(self, expr, context):
return convert(expr, context)
ADHOC_SLICE_NODE_MACROS = ["~calldata", "~selfcode", "~extcode"]
# make sure we don't overrun the source buffer, checking for overflow:
# valid inputs satisfy:
# `assert !(start+length > src_len || start+length < start`
def _make_slice_bounds_check(start, length, src_len):
with start.cache_when_complex("start") as (b1, start):
with add_ofst(start, length).cache_when_complex("end") as (b2, end):
arithmetic_overflow = ["lt", end, start]
buffer_oob = ["gt", end, src_len]
ok = ["iszero", ["or", arithmetic_overflow, buffer_oob]]
return b1.resolve(b2.resolve(["assert", ok]))
def _build_adhoc_slice_node(sub: IRnode, start: IRnode, length: IRnode, context: Context) -> IRnode:
assert length.is_literal, "typechecker failed"
assert isinstance(length.value, int) # mypy hint
dst_typ = BytesT(length.value)
# allocate a buffer for the return value
np = context.new_internal_variable(dst_typ)
# `msg.data` by `calldatacopy`
if sub.value == "~calldata":
node = [
"seq",
_make_slice_bounds_check(start, length, "calldatasize"),
["mstore", np, length],
["calldatacopy", np + 32, start, length],
np,
]
# `self.code` by `codecopy`
elif sub.value == "~selfcode":
node = [
"seq",
_make_slice_bounds_check(start, length, "codesize"),
["mstore", np, length],
["codecopy", np + 32, start, length],
np,
]
# `<address>.code` by `extcodecopy`
else:
assert sub.value == "~extcode" and len(sub.args) == 1
node = [
"with",
"_extcode_address",
sub.args[0],
[
"seq",
_make_slice_bounds_check(start, length, ["extcodesize", "_extcode_address"]),
["mstore", np, length],
["extcodecopy", "_extcode_address", np + 32, start, length],
np,
],
]
assert isinstance(length.value, int) # mypy hint
return IRnode.from_list(node, typ=BytesT(length.value), location=MEMORY)
# note: this and a lot of other builtins could be refactored to accept any uint type
class Slice(BuiltinFunctionT):
_id = "slice"
_inputs = [
("b", (BYTES32_T, BytesT.any(), StringT.any())),
("start", UINT256_T),
("length", UINT256_T),
]
def fetch_call_return(self, node):
arg_type, _, _ = self.infer_arg_types(node)
if isinstance(arg_type, StringT):
return_type = StringT()
else:
return_type = BytesT()
# validate start and length are in bounds
arg = node.args[0]
start_expr = node.args[1]
length_expr = node.args[2]
# CMC 2022-03-22 NOTE slight code duplication with semantics/analysis/local
is_adhoc_slice = arg.get("attr") == "code" or (
arg.get("value.id") == "msg" and arg.get("attr") == "data"
)
start_literal = start_expr.value if isinstance(start_expr, vy_ast.Int) else None
length_literal = length_expr.value if isinstance(length_expr, vy_ast.Int) else None
if not is_adhoc_slice:
if length_literal is not None:
if length_literal < 1:
raise ArgumentException("Length cannot be less than 1", length_expr)
if length_literal > arg_type.length:
raise ArgumentException(f"slice out of bounds for {arg_type}", length_expr)
if start_literal is not None:
if start_literal > arg_type.length:
raise ArgumentException(f"slice out of bounds for {arg_type}", start_expr)
if length_literal is not None and start_literal + length_literal > arg_type.length:
raise ArgumentException(f"slice out of bounds for {arg_type}", node)
# we know the length statically
if length_literal is not None:
return_type.set_length(length_literal)
else:
return_type.set_min_length(arg_type.length)
return return_type
def infer_arg_types(self, node, expected_return_typ=None):
self._validate_arg_types(node)
# return a concrete type for `b`
b_type = get_possible_types_from_node(node.args[0]).pop()
return [b_type, self._inputs[1][1], self._inputs[2][1]]
@process_inputs
def build_IR(self, expr, args, kwargs, context):
src, start, length = args
# Handle `msg.data`, `self.code`, and `<address>.code`
if src.value in ADHOC_SLICE_NODE_MACROS:
return _build_adhoc_slice_node(src, start, length, context)
is_bytes32 = src.typ == BYTES32_T
if src.location is None:
# it's not a pointer; force it to be one since
# copy_bytes works on pointers.
assert is_bytes32, src
src = ensure_in_memory(src, context)
with src.cache_when_complex("src") as (b1, src), start.cache_when_complex("start") as (
b2,
start,
), length.cache_when_complex("length") as (b3, length):
if is_bytes32:
src_maxlen = 32
else:
src_maxlen = src.typ.maxlen
dst_maxlen = length.value if length.is_literal else src_maxlen
buflen = dst_maxlen
# add 32 bytes to the buffer size bc word access might
# be unaligned (see below)
if src.location == STORAGE:
buflen += 32
# Get returntype string or bytes
assert isinstance(src.typ, _BytestringT) or is_bytes32
# TODO: try to get dst_typ from semantic analysis
if isinstance(src.typ, StringT):
dst_typ = StringT(dst_maxlen)
else:
dst_typ = BytesT(dst_maxlen)
# allocate a buffer for the return value
buf = context.new_internal_variable(BytesT(buflen))
# assign it the correct return type.
# (note mismatch between dst_maxlen and buflen)
dst = IRnode.from_list(buf, typ=dst_typ, location=MEMORY)
dst_data = bytes_data_ptr(dst)
if is_bytes32:
src_len = 32
src_data = src
else:
src_len = get_bytearray_length(src)
src_data = bytes_data_ptr(src)
# general case. byte-for-byte copy
if src.location == STORAGE:
# because slice uses byte-addressing but storage
# is word-aligned, this algorithm starts at some number
# of bytes before the data section starts, and might copy
# an extra word. the pseudocode is:
# dst_data = dst + 32
# copy_dst = dst_data - start % 32
# src_data = src + 32
# copy_src = src_data + (start - start % 32) / 32
# = src_data + (start // 32)
# copy_bytes(copy_dst, copy_src, length)
# //set length AFTER copy because the length word has been clobbered!
# mstore(src, length)
# start at the first word-aligned address before `start`
# e.g. start == byte 7 -> we start copying from byte 0
# start == byte 32 -> we start copying from byte 32
copy_src = IRnode.from_list(
["add", src_data, ["div", start, 32]], location=src.location
)
# e.g. start == byte 0 -> we copy to dst_data + 0
# start == byte 7 -> we copy to dst_data - 7
# start == byte 33 -> we copy to dst_data - 1
copy_dst = IRnode.from_list(
["sub", dst_data, ["mod", start, 32]], location=dst.location
)
# len + (32 if start % 32 > 0 else 0)
copy_len = ["add", length, ["mul", 32, ["iszero", ["iszero", ["mod", start, 32]]]]]
copy_maxlen = buflen
else:
# all other address spaces (mem, calldata, code) we have
# byte-aligned access so we can just do the easy thing,
# memcopy(dst_data, src_data + dst_data)
copy_src = add_ofst(src_data, start)
copy_dst = dst_data
copy_len = length
copy_maxlen = buflen
do_copy = copy_bytes(copy_dst, copy_src, copy_len, copy_maxlen)
ret = [
"seq",
_make_slice_bounds_check(start, length, src_len),
do_copy,
["mstore", dst, length], # set length
dst, # return pointer to dst
]
ret = IRnode.from_list(ret, typ=dst_typ, location=MEMORY)
return b1.resolve(b2.resolve(b3.resolve(ret)))
class Len(BuiltinFunctionT):
_id = "len"
_inputs = [("b", (StringT.any(), BytesT.any(), DArrayT.any()))]
_return_type = UINT256_T
def _try_fold(self, node):
validate_call_args(node, 1)
arg = node.args[0].get_folded_value()
if isinstance(arg, (vy_ast.Str, vy_ast.Bytes)):
length = len(arg.value)
elif isinstance(arg, vy_ast.Hex):
length = len(arg.bytes_value)
else:
raise UnfoldableNode
return vy_ast.Int.from_node(node, value=length)
def infer_arg_types(self, node, expected_return_typ=None):
self._validate_arg_types(node)
# return a concrete type
typ = get_possible_types_from_node(node.args[0]).pop()
return [typ]
def build_IR(self, node, context):
arg = Expr(node.args[0], context).ir_node
if arg.value == "~calldata":
return IRnode.from_list(["calldatasize"], typ=UINT256_T)
return get_bytearray_length(arg)
class Concat(BuiltinFunctionT):
_id = "concat"
def fetch_call_return(self, node):
arg_types = self.infer_arg_types(node)
length = 0
for arg_t in arg_types:
length += arg_t.length
if isinstance(arg_types[0], (StringT)):
return_type = StringT()
else:
return_type = BytesT()
return_type.set_length(length)
return return_type
def infer_arg_types(self, node, expected_return_typ=None):
if len(node.args) < 2:
raise ArgumentException("Invalid argument count: expected at least 2", node)
if node.keywords:
raise ArgumentException("Keyword arguments are not accepted here", node.keywords[0])
ret = []
prev_typeclass = None
for arg in node.args:
validate_expected_type(arg, (BytesT.any(), StringT.any(), BytesM_T.any()))
arg_t = get_possible_types_from_node(arg).pop()
current_typeclass = "String" if isinstance(arg_t, StringT) else "Bytes"
if prev_typeclass and current_typeclass != prev_typeclass:
raise TypeMismatch(
(
"Concat expects consistent use of string or bytes types, "
"use either string or bytes."
),
arg,
)
prev_typeclass = current_typeclass
ret.append(arg_t)
return ret
def build_IR(self, expr, context):
args = [Expr(arg, context).ir_node for arg in expr.args]
if len(args) < 2:
raise StructureException("Concat expects at least two arguments", expr)
# Maximum length of the output
dst_maxlen = sum(
[arg.typ.maxlen if isinstance(arg.typ, _BytestringT) else arg.typ.m for arg in args]
)
# TODO: try to grab these from semantic analysis
if isinstance(args[0].typ, StringT):
ret_typ = StringT(dst_maxlen)
else:
ret_typ = BytesT(dst_maxlen)
# respect API of copy_bytes
bufsize = dst_maxlen + 32
buf = context.new_internal_variable(BytesT(bufsize))
# Node representing the position of the output in memory
dst = IRnode.from_list(buf, typ=ret_typ, location=MEMORY, annotation="concat destination")
ret = ["seq"]
# stack item representing our current offset in the dst buffer
ofst = "concat_ofst"
# TODO: optimize for the case where all lengths are statically known.
for arg in args:
dst_data = add_ofst(bytes_data_ptr(dst), ofst)
if isinstance(arg.typ, _BytestringT):
# Ignore empty strings
if arg.typ.maxlen == 0:
continue
with arg.cache_when_complex("arg") as (b1, arg):
argdata = bytes_data_ptr(arg)
with get_bytearray_length(arg).cache_when_complex("len") as (b2, arglen):
do_copy = [
"seq",
copy_bytes(dst_data, argdata, arglen, arg.typ.maxlen),
["set", ofst, ["add", ofst, arglen]],
]
ret.append(b1.resolve(b2.resolve(do_copy)))
else:
ret.append(STORE(dst_data, unwrap_location(arg)))
ret.append(["set", ofst, ["add", ofst, arg.typ.m]])
ret.append(STORE(dst, ofst))
# Memory location of the output
ret.append(dst)
return IRnode.from_list(
["with", ofst, 0, ret], typ=ret_typ, location=MEMORY, annotation="concat"
)
class Keccak256(BuiltinFunctionT):
_id = "keccak256"
# TODO allow any BytesM_T
_inputs = [("value", (BytesT.any(), BYTES32_T, StringT.any()))]
_return_type = BYTES32_T
def _try_fold(self, node):
validate_call_args(node, 1)
value = node.args[0].get_folded_value()
if isinstance(value, vy_ast.Bytes):
value = value.value
elif isinstance(value, vy_ast.Str):
value = value.value.encode()
elif isinstance(value, vy_ast.Hex):
value = value.bytes_value
else:
raise UnfoldableNode
hash_ = f"0x{keccak256(value).hex()}"
return vy_ast.Hex.from_node(node, value=hash_)
def infer_arg_types(self, node, expected_return_typ=None):
self._validate_arg_types(node)
# return a concrete type for `value`
value_type = get_possible_types_from_node(node.args[0]).pop()
return [value_type]
@process_inputs
def build_IR(self, expr, args, kwargs, context):
assert len(args) == 1
return keccak256_helper(args[0], context)
def _make_sha256_call(inp_start, inp_len, out_start, out_len):
return [
"assert",
[
"staticcall",
["gas"], # gas
SHA256_ADDRESS, # address
inp_start,
inp_len,
out_start,
out_len,
],
]
class Sha256(BuiltinFunctionT):
_id = "sha256"
_inputs = [("value", (BYTES32_T, BytesT.any(), StringT.any()))]
_return_type = BYTES32_T
def _try_fold(self, node):
validate_call_args(node, 1)
value = node.args[0].get_folded_value()
if isinstance(value, vy_ast.Bytes):
value = value.value
elif isinstance(value, vy_ast.Str):
value = value.value.encode()
elif isinstance(value, vy_ast.Hex):
value = value.bytes_value
else:
raise UnfoldableNode
hash_ = f"0x{hashlib.sha256(value).hexdigest()}"
return vy_ast.Hex.from_node(node, value=hash_)
def infer_arg_types(self, node, expected_return_typ=None):
self._validate_arg_types(node)
# return a concrete type for `value`
value_type = get_possible_types_from_node(node.args[0]).pop()
return [value_type]
@process_inputs
def build_IR(self, expr, args, kwargs, context):
sub = args[0]
# bytes32 input
if sub.typ == BYTES32_T:
return IRnode.from_list(
[
"seq",
["mstore", MemoryPositions.FREE_VAR_SPACE, sub],
_make_sha256_call(
inp_start=MemoryPositions.FREE_VAR_SPACE,
inp_len=32,
out_start=MemoryPositions.FREE_VAR_SPACE,
out_len=32,
),
["mload", MemoryPositions.FREE_VAR_SPACE], # push value onto stack
],
typ=BYTES32_T,
add_gas_estimate=SHA256_BASE_GAS + 1 * SHA256_PER_WORD_GAS,
)
# bytearay-like input
# special case if it's already in memory
sub = ensure_in_memory(sub, context)
return IRnode.from_list(
[
"with",
"_sub",
sub,
[
"seq",
_make_sha256_call(
# TODO use add_ofst if sub is statically known
inp_start=["add", "_sub", 32],
inp_len=["mload", "_sub"],
out_start=MemoryPositions.FREE_VAR_SPACE,
out_len=32,
),
["mload", MemoryPositions.FREE_VAR_SPACE],
],
],
typ=BYTES32_T,
add_gas_estimate=SHA256_BASE_GAS + sub.typ.maxlen * SHA256_PER_WORD_GAS,
)
class MethodID(FoldedFunctionT):
_id = "method_id"
_inputs = [("value", StringT.any())]
_kwargs = {"output_type": KwargSettings(TYPE_T.any(), BytesT(4))}
def _try_fold(self, node):
validate_call_args(node, 1, ["output_type"])
value = node.args[0].get_folded_value()
if not isinstance(value, vy_ast.Str):
raise InvalidType("method id must be given as a literal string", node.args[0])
if " " in value.value:
raise InvalidLiteral("Invalid function signature - no spaces allowed.", node.args[0])
return_type = self.infer_kwarg_types(node)["output_type"].typedef
value = method_id(value.value)
if return_type.compare_type(BYTES4_T):
return vy_ast.Hex.from_node(node, value="0x" + value.hex())
else:
return vy_ast.Bytes.from_node(node, value=value)
def fetch_call_return(self, node):
validate_call_args(node, 1, ["output_type"])
type_ = self.infer_kwarg_types(node)["output_type"].typedef
return type_
def infer_arg_types(self, node, expected_return_typ=None):
return [self._inputs[0][1]]
def infer_kwarg_types(self, node):
if node.keywords:
output_type = type_from_annotation(node.keywords[0].value)
if output_type not in (BytesT(4), BYTES4_T):
raise ArgumentException("output_type must be Bytes[4] or bytes4", node.keywords[0])
else:
# default to `Bytes[4]`
output_type = BytesT(4)
return {"output_type": TYPE_T(output_type)}
class ECRecover(BuiltinFunctionT):
_id = "ecrecover"
_inputs = [
("hash", BYTES32_T),
("v", (UINT256_T, UINT8_T)),
("r", (UINT256_T, BYTES32_T)),
("s", (UINT256_T, BYTES32_T)),
]
_return_type = AddressT()
def infer_arg_types(self, node, expected_return_typ=None):
self._validate_arg_types(node)
v_t, r_t, s_t = [get_possible_types_from_node(arg).pop() for arg in node.args[1:]]
return [BYTES32_T, v_t, r_t, s_t]
@process_inputs
def build_IR(self, expr, args, kwargs, context):
input_buf = context.new_internal_variable(get_type_for_exact_size(128))
output_buf = context.new_internal_variable(get_type_for_exact_size(32))
return IRnode.from_list(
[
"seq",
# clear output memory first, ecrecover can return 0 bytes
["mstore", output_buf, 0],
["mstore", input_buf, args[0]],
["mstore", input_buf + 32, args[1]],
["mstore", input_buf + 64, args[2]],
["mstore", input_buf + 96, args[3]],
["staticcall", "gas", 1, input_buf, 128, output_buf, 32],
["mload", output_buf],
],
typ=AddressT(),
)
class _ECArith(BuiltinFunctionT):
@process_inputs
def build_IR(self, expr, _args, kwargs, context):
args_tuple = ir_tuple_from_args(_args)
args_t = args_tuple.typ
input_buf = IRnode.from_list(
context.new_internal_variable(args_t), typ=args_t, location=MEMORY
)
ret_t = self._return_type
ret = ["seq"]
ret.append(make_setter(input_buf, args_tuple))
output_buf = context.new_internal_variable(ret_t)
args_ofst = input_buf
args_len = args_t.memory_bytes_required
out_ofst = output_buf
out_len = ret_t.memory_bytes_required
ret.append(
[
"assert",
["staticcall", ["gas"], self._precompile, args_ofst, args_len, out_ofst, out_len],
]
)
ret.append(output_buf)
return IRnode.from_list(ret, typ=ret_t, location=MEMORY)
class ECAdd(_ECArith):
_id = "ecadd"
_inputs = [("a", SArrayT(UINT256_T, 2)), ("b", SArrayT(UINT256_T, 2))]
_return_type = SArrayT(UINT256_T, 2)
_precompile = 0x6
class ECMul(_ECArith):
_id = "ecmul"
_inputs = [("point", SArrayT(UINT256_T, 2)), ("scalar", UINT256_T)]
_return_type = SArrayT(UINT256_T, 2)
_precompile = 0x7
def _generic_element_getter(op):
def f(index):
return IRnode.from_list(
[op, ["add", "_sub", ["add", 32, ["mul", 32, index]]]], typ=INT128_T
)
return f
def _storage_element_getter(index):
return IRnode.from_list(["sload", ["add", "_sub", ["add", 1, index]]], typ=INT128_T)
class Extract32(BuiltinFunctionT):
_id = "extract32"
_inputs = [("b", BytesT.any()), ("start", IntegerT.unsigneds())]
_kwargs = {"output_type": KwargSettings(TYPE_T.any(), BYTES32_T)}
def fetch_call_return(self, node):
self._validate_arg_types(node)
return_type = self.infer_kwarg_types(node)["output_type"].typedef
return return_type
def infer_arg_types(self, node, expected_return_typ=None):
self._validate_arg_types(node)
input_type = get_possible_types_from_node(node.args[0]).pop()
return [input_type, UINT256_T]
def infer_kwarg_types(self, node):
if node.keywords:
output_type = type_from_annotation(node.keywords[0].value)
if not isinstance(output_type, (AddressT, BytesM_T, IntegerT)):
raise InvalidType(
"Output type must be one of integer, bytes32 or address", node.keywords[0].value
)
output_typedef = TYPE_T(output_type)
node.keywords[0].value._metadata["type"] = output_typedef
else:
output_typedef = TYPE_T(BYTES32_T)
return {"output_type": output_typedef}
@process_inputs
def build_IR(self, expr, args, kwargs, context):
sub, index = args
ret_type = kwargs["output_type"]
# Get length and specific element
if sub.location == STORAGE:
lengetter = IRnode.from_list(["sload", "_sub"], typ=INT128_T)
elementgetter = _storage_element_getter
else:
op = sub.location.load_op
lengetter = IRnode.from_list([op, "_sub"], typ=INT128_T)
elementgetter = _generic_element_getter(op)
# TODO rewrite all this with cache_when_complex and bitshifts
# Special case: index known to be a multiple of 32
if isinstance(index.value, int) and not index.value % 32:
o = IRnode.from_list(
[
"with",
"_sub",
sub,
elementgetter(
["div", clamp2(0, index, ["sub", lengetter, 32], signed=True), 32]
),
],
typ=ret_type,
annotation="extracting 32 bytes",
)
# General case
else:
o = IRnode.from_list(
[
"with",
"_sub",
sub,
[
"with",
"_len",
lengetter,
[
"with",
"_index",
clamp2(0, index, ["sub", "_len", 32], signed=True),
[
"with",
"_mi32",
["mod", "_index", 32],
[
"with",
"_di32",
["div", "_index", 32],
[
"if",
"_mi32",
[
"add",
["mul", elementgetter("_di32"), ["exp", 256, "_mi32"]],
[
"div",
elementgetter(["add", "_di32", 1]),
["exp", 256, ["sub", 32, "_mi32"]],
],
],
elementgetter("_di32"),
],
],
],
],
],
],
typ=ret_type,
annotation="extract32",
)
return IRnode.from_list(clamp_basetype(o), typ=ret_type)
class AsWeiValue(BuiltinFunctionT):
_id = "as_wei_value"
_inputs = [("value", (IntegerT.any(), DecimalT())), ("unit", StringT.any())]
_return_type = UINT256_T
wei_denoms = {
("wei",): 1,
("femtoether", "kwei", "babbage"): 10**3,
("picoether", "mwei", "lovelace"): 10**6,
("nanoether", "gwei", "shannon"): 10**9,
("microether", "szabo"): 10**12,
("milliether", "finney"): 10**15,
("ether",): 10**18,
("kether", "grand"): 10**21,
}
def get_denomination(self, node):
value = node.args[1].get_folded_value()
if not isinstance(value, vy_ast.Str):
raise ArgumentException(
"Wei denomination must be given as a literal string", node.args[1]
)
try:
denom = next(v for k, v in self.wei_denoms.items() if value.value in k)
except StopIteration:
raise ArgumentException(f"Unknown denomination: {value.value}", node.args[1]) from None
return denom
def _try_fold(self, node):
validate_call_args(node, 2)
denom = self.get_denomination(node)
value = node.args[0].get_folded_value()
if not isinstance(value, (vy_ast.Decimal, vy_ast.Int)):
raise UnfoldableNode
value = value.value
if value < 0: