-
-
Notifications
You must be signed in to change notification settings - Fork 2
/
csr.py
316 lines (264 loc) · 12.8 KB
/
csr.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
from litex.build.altera.platform import AlteraPlatform
from litex.gen.fhdl.module import LiteXModule
from litex.soc.interconnect.csr import CSR, CSRField, CSRStatus, CSRStorage
from migen.fhdl.structure import Case, If, Signal
class APFAudio(LiteXModule):
def __init__(self, platform: AlteraPlatform):
audio_pins = platform.request("apf_audio")
self.out = CSR(32)
self.out.description = "The entrypoint to the audio buffer. Write two 16 bit signed values (for the left and right audio channels) here. This will push one value into the 4096 record FIFO that represents the audio buffer."
self.playback_en = CSRStorage(
1,
description="Enable audio playback (reading of the audio buffer) when set to 1. No audio playback otherwise.",
)
self.buffer_flush = CSR(1)
self.buffer_flush.description = (
"Writing 1 to this register will immediately clear the audio buffer."
)
self.buffer_fill = CSRStatus(
12,
description="The current fill level of the audio buffer. The buffer is full when set to `0xFFF`",
)
self.comb += [
audio_pins.bus_out.eq(self.out.r),
audio_pins.bus_wr.eq(self.out.re),
audio_pins.playback_en.eq(self.playback_en.storage),
audio_pins.flush.eq(self.buffer_flush.re),
self.buffer_fill.status.eq(audio_pins.buffer_fill),
]
class APFBridge(LiteXModule):
def __init__(self, platform: AlteraPlatform):
bridge_pins = platform.request("apf_bridge")
self.request_read = CSR(1)
self.request_read.description = (
"Writing 1 to this register will trigger a read request."
)
self.request_write = CSR(1)
self.request_write.description = (
"Writing 1 to this register will trigger a write request."
)
self.request_getfile = CSR(1)
self.request_getfile.description = "Writing 1 to this register will trigger a request for the filepath of the active slot."
self.request_openfile = CSR(1)
self.request_openfile.description = "Writing 1 to this register will trigger a request to change the file in the active slot to the one specified by the path in memory."
self.slot_id = CSRStorage(
16,
description="The slot ID defined in `data.json` for the desired asset/slot.",
)
self.data_offset = CSRStorage(
32,
description="The offset from the start of the asset in the selected data slot to operate on.",
)
# self.bridge_local_address = CSRStorage(32)
self.transfer_length = CSRStorage(
32,
description="The length of data to transfer as part of this bridge operation. A length of `0xFFFFFFFF` will request the entire file (NOTE: As of Pocket firmware 1.1, this is bugged, and you just request the file size instead).",
)
self.ram_data_address = CSRStorage(
32,
description="The address of RISC-V RAM to be manipulated in this operation. It is either the first write address for a read request, or the first read address for a write request.",
)
self.file_size = CSR(
32,
)
self.file_size.description = "The file size on disk of the current selected asset in slot `bridge_slot_id`. Writing to this register will update the internal size representation for this file. Note that if you do this for a readonly file, you will mess up any future reads of that slot ID."
# Will go high when operation completes. Goes low after read
self.status = CSRStatus(
1,
description="Indicates when the bridge is currently transferring a file. 1 when transferring, 0 otherwise. Clears its value on read.",
)
self.current_address = CSRStatus(
32,
description="The current address the bridge is operating on. Can be used to show a progress bar/estimate time until completion.",
)
self.command_result_code = CSRStatus(
3,
description="Reports the results of the recent file command. See https://www.analogue.co/developer/docs/host-target-commands for details on expected codes.",
)
self.prev_complete_trigger = Signal()
self.sync += [
self.prev_complete_trigger.eq(bridge_pins.complete_trigger),
# Read clear must apply before write set, because otherwise you can miss a signal
If(
self.status.we,
# Read, clear status
self.status.status.eq(0),
),
If(
bridge_pins.complete_trigger & ~self.prev_complete_trigger,
# Push status high
self.status.status.eq(1),
),
]
self.comb += [
bridge_pins.request_read.eq(self.request_read.re),
bridge_pins.request_write.eq(self.request_write.re),
bridge_pins.request_getfile.eq(self.request_getfile.re),
bridge_pins.request_openfile.eq(self.request_openfile.re),
bridge_pins.slot_id.eq(self.slot_id.storage),
bridge_pins.data_offset.eq(self.data_offset.storage),
# bridge_pins.local_address.eq(self.bridge_local_address.storage),
bridge_pins.length.eq(self.transfer_length.storage),
bridge_pins.ram_data_address.eq(self.ram_data_address.storage),
self.command_result_code.status.eq(bridge_pins.command_result_code),
self.file_size.w.eq(bridge_pins.file_size),
bridge_pins.file_size_wr.eq(self.file_size.re),
bridge_pins.new_file_size_data.eq(self.file_size.r),
self.current_address.status.eq(bridge_pins.current_address),
]
class APFID(LiteXModule):
def __init__(self, platform: AlteraPlatform):
id_pins = platform.request("apf_id")
self.id = CSRStatus(64, description="The Cyclone V chip ID.")
self.comb += [self.id.status.eq(id_pins.chip_id)]
class APFInput(LiteXModule):
def __init__(self, platform: AlteraPlatform):
input_pins = platform.request("apf_input")
self.cont1_key = CSRStatus(
size=32, description="Controller 1 inputs. See docs."
)
self.cont2_key = CSRStatus(
size=32, description="Controller 2 inputs. See docs."
)
self.cont3_key = CSRStatus(
size=32, description="Controller 3 inputs. See docs."
)
self.cont4_key = CSRStatus(
size=32, description="Controller 4 inputs. See docs."
)
self.comb += self.cont1_key.status.eq(input_pins.cont1_key)
self.comb += self.cont2_key.status.eq(input_pins.cont2_key)
self.comb += self.cont3_key.status.eq(input_pins.cont3_key)
self.comb += self.cont4_key.status.eq(input_pins.cont4_key)
self.cont1_joy = CSRStatus(
size=32, description="Controller 1 joystick values. See docs."
)
self.cont2_joy = CSRStatus(
size=32, description="Controller 2 joystick values. See docs."
)
self.cont3_joy = CSRStatus(
size=32, description="Controller 3 joystick values. See docs."
)
self.cont4_joy = CSRStatus(
size=32, description="Controller 4 joystick values. See docs."
)
self.comb += self.cont1_joy.status.eq(input_pins.cont1_joy)
self.comb += self.cont2_joy.status.eq(input_pins.cont2_joy)
self.comb += self.cont3_joy.status.eq(input_pins.cont3_joy)
self.comb += self.cont4_joy.status.eq(input_pins.cont4_joy)
self.cont1_trig = CSRStatus(
size=32,
description="Controller 1 trigger values. Values are binary on Pocket (`0 and 0xFFFF`), and analog on controllers with analog triggers. See docs.",
)
self.cont2_trig = CSRStatus(
size=32,
description="Controller 2 trigger values. Values are binary on Pocket (`0 and 0xFFFF`), and analog on controllers with analog triggers. See docs.",
)
self.cont3_trig = CSRStatus(
size=32,
description="Controller 3 trigger values. Values are binary on Pocket (`0 and 0xFFFF`), and analog on controllers with analog triggers. See docs.",
)
self.cont4_trig = CSRStatus(
size=32,
description="Controller 4 trigger values. Values are binary on Pocket (`0 and 0xFFFF`), and analog on controllers with analog triggers. See docs.",
)
self.comb += self.cont1_trig.status.eq(input_pins.cont1_trig)
self.comb += self.cont2_trig.status.eq(input_pins.cont2_trig)
self.comb += self.cont3_trig.status.eq(input_pins.cont3_trig)
self.comb += self.cont4_trig.status.eq(input_pins.cont4_trig)
class APFInteract(LiteXModule):
def __init__(self, platform: AlteraPlatform):
interact_pins = platform.request("apf_interact")
write_case_dict = {}
read_case_dict = {}
for i in range(16):
main_name = f"interact{i}"
changed_name = f"interact_changed{i}"
interact_csr = CSR(32)
interact_csr.description = f"Interact.json entry {i}."
interact_csr.name = main_name
setattr(self, main_name, interact_csr)
changed_csr = CSRStatus(1)
changed_csr.description = f"When 1, indicates the interact.json entry {i} has been updated."
changed_csr.name = changed_name
setattr(self, changed_name, changed_csr)
interact_storage = Signal(32)
write_case_dict[i] = [
interact_storage.eq(interact_pins.data),
changed_csr.status.eq(1)
]
read_case_dict[i] = [
interact_pins.q.eq(interact_storage)
]
self.comb += interact_csr.w.eq(interact_storage)
self.sync += [
# Write
If(interact_csr.re, interact_storage.eq(interact_csr.r)),
# Read
If(interact_csr.we, changed_csr.status.eq(0)),
]
self.sync += [
If(interact_pins.wr, Case(interact_pins.address, write_case_dict)),
Case(interact_pins.address, read_case_dict),
]
class APFRTC(LiteXModule):
def __init__(self, platform: AlteraPlatform):
rtc_pins = platform.request("apf_rtc")
self.unix_seconds = CSRStatus(
32, description="The current Pocket set time, from Unix epoch, in seconds."
)
self.date_bcd = CSRStatus(
32, description="The launch Pocket set date, as BCD. NOT LIVE/INCREMENTING."
)
self.time_bcd = CSRStatus(
32, description="The launch Pocket set time, as BCD. NOT LIVE/INCREMENTING."
)
self.comb += [
self.unix_seconds.status.eq(rtc_pins.unix_seconds),
self.date_bcd.status.eq(rtc_pins.date_bcd),
self.time_bcd.status.eq(rtc_pins.time_bcd),
]
class APFVideo(LiteXModule):
def __init__(self, soc, v_active: int):
vblank = Signal()
self.video = CSRStatus(
fields=[
CSRField(
"vblank_status",
size=1,
offset=0,
description="1 when in vblank, 0 otherwise.",
),
CSRField(
"vblank_triggered",
size=1,
offset=1,
description="Indicates when vblank occurs. Becomes 1 at vblank, and is set to 0 whenever read. If you read 1, vblank has started between your two reads.",
),
CSRField(
"frame_counter",
size=30,
offset=2,
description="Counts the number of frames displayed since startup. Comparing this value to a previous value can be used to track frame changes. A frame change is considered to occur at the start of vblank.",
),
]
)
self.prev_vblank_triggered = Signal()
self.sync += [
# 240 is what vactive defaults to
vblank.eq(soc.video_framebuffer_vtg.source.vcount >= v_active),
self.prev_vblank_triggered.eq(vblank),
# Read clear must apply before write set, because otherwise you can miss a signal
If(
self.video.we,
# Read, clear triggered
self.video.fields.vblank_triggered.eq(0),
),
If(
vblank & ~self.prev_vblank_triggered,
# Push status high
self.video.fields.vblank_triggered.eq(1),
self.video.fields.frame_counter.eq(self.video.fields.frame_counter + 1),
),
self.video.fields.vblank_status.eq(vblank),
]