如果想要构造一个可接受任意数量参数的函数,可以使用 * 参数来定义该函数:
def avg(first, *others):
return (first + sum(others)) / (1 + len(others))
print(avg(1,2)) # 1.5
print(avg(1,2,3,4,5)) # 3
为了接受任意数量的关键字参数,可以使用以 ** 开头的参数定义函数:
def foo(attr1, attr2, **attrs):
print(attr1, attr2, attrs)
foo(1, 2, n=1, m=2) # 1 2 {'n':1,'m':2}
综上所述,*args 和 **kwargs 分别是第一章介绍的压缩和字典参数。要求 *args 只能放在最后一个位置参数后面,**kwargs 只能放在最后一个参数后面。
将强制关键字参数放到某个 * 参数或者单个 * 后面就能达到函数的某些参数强制使用关键字参数传递的效果。
def recv(maxsize, *, block):
recv(1024, True) # TypeError
recv(1024, block=True) # Success
def mininum(*values, clip=None):
m = min(values)
if clip is not None:
m = clip if clip > m else m
return m
mininum(1, 2, 3, -4, 5) # Return -4
mininum(1, 2, 3, -4, 5, clip=0) # Return 0
def add(x:int, y:int) -> int:
return x+y
Python 解释器并不会对它有任何的语义,但是这种写法会对程序员较为友好(相当于静态语言)。函数注解存储在函数的 __annotations__ 属性中。注解的使用方法有很多(比如本方法或者注释说明),本处的作用主要还是文档。
def foo():
return 1, 2
a, b = foo()
def foo(a, b=10):
return a+b
foo(1) # 11
foo(1,2) # 3
将简单的单行函数用 lambda 表达式代替的手法,即为匿名函数:
add = lambda x, y: x+y
add(2, 3) # 5
add('hello', 'world') # hello world
def add(x, y):
return x+y
lambda 表达式的典型使用场景是排序或者数据 reduce 等。
x = 10
a = lambda y: x+y
x = 20
b = lambda y: x+y
a(10) # 30
b(10) # 30
其中的原因是 x 在这里是一个自由变量,在运行的时候才会绑定值,因此返回的结果会随着 x 的改变而变化。如果想要匿名函数在定义的时候就捕获到值,可以参考以下写法:
x = 10
a = lambda y, x=x: x+y
x = 20
b = lambda y, x=x: x+y
a(10) # 20
b(10) # 30
functools 中的 partial() 方法能固定某些参数并返回一个 callable 对象。这个新的 callable 对象接受未赋值的参数,然后根据之前已经赋值过的参数合并起来,最后将所有的参数一起传递给原始函数。
from functools import partial
def spam(a, b, c, d):
print(a, b, c, d)
s1 = partial(spam, 1)
s1(2, 3, 4) # 1 2 3 4
s2 = partial(spam, d=42)
s2(1, 2, 3) # 1 2 3 42
s3 = partial(spam, 1, 2, d=42)
s3(15) # 1 2 15 42
- 点序列到指定点的距离排序问题
points = [
(1, 2), (3, 4), (5, 6), (7, 8)
import math
def distance(p1, p2):
x1, y1 = p1
x2, y2 = p2
return math.hypot(x2 - x1, y2 - y1)
pt = (4, 3)
points.sorted(key=partial(distance, pt))
# [(3, 4), (1, 2), (5, 6), (7, 8)] 实现了距离 (4, 3) 点最近的升序排序
- 微调其他库函数使用的回调函数的参数
# 当给 apply_async() 提供回调函数时,通过使用 partial() 传递额外的 logging 参数。而 multiprocessing 模块对此一无所知。
def output_result(result, log=None):
if log is not None:
log.debug("Got:%r", result)
def add(x, y):
return x + y
if __name__ == "__main__":
import logging
from multiprocessing import Pool
from functools import partial
log = logging.getLogger('test')
p = Pool()
p.apply_async(add, (3, 4), callback=partial(output_result, log=log))
# 单方法的类写法
from urllib.request import urlopen
class UrlTemplate:
def __init__(self, template):
self.template = template
def open(self, **kwargs):
return urlopen(self.template.formate_map(kwargs))
# 闭包写法
def urltemplate(template):
def opener(**kwargs):
return urlopen(template.format_map(kwargs))
return opener
# 调用
test = UrlTemplate('http://www.test.com/s?date={date}&file={file}')
test = urlTemplate('http://www.test.com/s?date={date}&file={file}')
for line in test(date="2018", file="csv"):
def apply_async(func, args, *, callback):
result = func(*args)
def print_result(result):
print('Got:', result)
def add(x, y):
return x + y
apply_async(add, ('Hello', 'World'), callback=print_result)
# Got: HelloWorld
- 绑定类的方法替代简单函数
class ResultHandler:
def __init__(self):
self.idx = 0
def hander(self, result):
self.idx += 1
print('[{}] Got:{}'.format(self.idx, result))
r = ResultHandler()
apply_async(add, ('Hello', 'World'), callback=r.handler)
# [1] Got: HelloWorld
apply_async(add, (1, 2), callback=r.handler)
# [2] Got: 3
- 使用闭包代替类
def make_handler():
idx = 0
def handler(result):
nonlocal idx # nonlocal 声明本变量会被回调函数变更
idx += 1
print('[{}] Got:{}'.format(idx, result))
return handler
handler = make_handler()
apply_async(add, ('Hello', 'World'), callback=handler)
# [1] Got: HelloWorld
apply_async(add, (1, 2), callback=handler)
# [2] Got: 3
- 使用协程
def make_handler():
idx = 0
while True:
result = yield
idx += 1
print('[{}] Got:{}'.format(idx, result))
handler = make_handler()
next(handler) # Advance to the yield
apply_async(add, ('Hello', 'World'), callback=handler.send)
# [1] Got: HelloWorld
apply_async(add, (1, 2), callback=handler.send)
# [2] Got: 3
from queue import Queue
from functools import wraps
def apply_async(func, args, *, callback):
result = func(*args)
class Async:
def __init__(self, func, args):
self.func = func
self.args = args
def inlined_async(func):
def wrapper(*args):
f = func(*args)
result_queue = Queue()
while True:
result = result.get()
a = f.send(result)
apply_async(a.func, a.args, callback=result_queue.put)
except StopIteration:
return wrapper
def add(x, y):
return x + y
def test():
r = yield Async(add, (1, 2))
r = yield Async(add, ('Hello', 'World'))
for n in range(3):
r = yield Async(add, (n, n))
# 3
# HelloWorld
# 2
# 4
# 6
本例中使用生成器函数的主要原因是,生成器函数中出现的暂停与需要实现的中断操作其实是类似的。具体来说就是 yield 操作会使一个生成器函数产生一个值并暂停,接下来调用的 __next__() 或 send() 方法又会让它从暂停的地方继续执行。
def sample():
n = 0
def func():
print('n=', n)
def get_n():
return n
def set_n():
nonlocal n
n = value
func.get_n = get_n
func.set_n = set_n
return func
f = sample()
f() # 0
f() # n= 10
f.get_n() # 10