-
Notifications
You must be signed in to change notification settings - Fork 1
/
myspaces.py
91 lines (69 loc) · 2.23 KB
/
myspaces.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
import os
from copy import deepcopy
import numpy as np
os.chdir("/Users/zargham/Documents/GitHub/cadcad-ri")
from cadcad.spaces import space, EmptySpace #, Bit, Real
from cadcad.dynamics import block
from cadcad.points import Point
#from cadcad.systems import Experiment
@space
class CartesianPlane:
x:float
y:float
@space
class Particle:
pos:CartesianPlane
vel:CartesianPlane
#f(x,u)->x
@block
def particleIntegrator(state: Point[Particle], input: Point[CartesianPlane])-> Point[Particle]:
#input is acceleration
#output is the new particle state
#outputType = type(Point[Particle])
output = deepcopy(state)
output['pos']['x'] += state['particle']['vel']['x']
output['pos']['y'] += state['particle']['vel']['y']
output['vel']['x'] += input['x']
output['vel']['y'] += input['y']
return output
# g()-> u
@block
def randomPoint(input:Point[EmptySpace])-> Point[CartesianPlane]:
data = {}
data['x'] = .005+np.random.randn()/5.0
data['y'] = .005+np.random.randn()/5.0
return Point(CartesianPlane, data)
# y(x)->r
@block
def observePosition(input:Point[Particle])-> Point[CartesianPlane]:
pos = input.data['pos']
return Point(CartesianPlane, pos)
# h(x,r) -> u
@block
def chase(state: Point[Particle], ref: Point[CartesianPlane]) -> Point[CartesianPlane]:
acc = {}
acc['x'] = ref.data['x']-state.data['pos']['x']
acc['y'] = ref.data['y']-state.data['pos']['y']
return Point(CartesianPlane, acc)
@space
class WorldState:
cat: Particle
laser:Particle
# this block contains my wiring
# in the future wiring DSL should allow this to be done with piping logic
@block
def worldDynamics(state: Point[WorldState])->Point[WorldState]:
#split into subspaces
cat = deepcopy(state['cat'])
laser = deepcopy(state['laser'])
#move the laser pointer
emptyPoint = Point(EmptySpace, {})
laserAcc = randomPoint(emptyPoint)
laser = particleIntegrator(laser, laserAcc)
#chase the laser pointer
observedLaser = observePosition(laser)
catAcc = chase(cat, observedLaser)
cat = particleIntegrator(cat, catAcc)
data = {'cat':cat.data, 'laser':laser.data }
return Point(WorldState, data)
worldDynamics = Block.wiring( "piping expression")