-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathbus_apb_monitor.py
81 lines (73 loc) · 3.04 KB
/
bus_apb_monitor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
from uvm.macros import uvm_component_utils, uvm_fatal, uvm_info, uvm_error
from uvm.comps.uvm_monitor import UVMMonitor
from uvm.tlm1.uvm_analysis_port import UVMAnalysisPort
from uvm.base.uvm_config_db import UVMConfigDb
from cocotb.triggers import Timer, RisingEdge, FallingEdge
from EF_UVM.bus_env.bus_item import bus_item
from uvm.base.uvm_object_globals import UVM_HIGH, UVM_LOW
from EF_UVM.bus_env.bus_agent.bus_base_monitor import bus_base_monitor
import cocotb
class bus_apb_monitor(bus_base_monitor):
def __init__(self, name="bus_apb_monitor", parent=None):
super().__init__(name, parent)
self.counter = 0
async def run_phase(self, phase):
await cocotb.start(self.watch_reset())
while True:
tr = None
# wait for a transaction
while True:
await self.sample_delay()
if (
self.vif.PSEL.value.binstr == "1"
and self.vif.PENABLE.value.binstr == "0"
):
break
tr = bus_item.type_id.create("tr", self)
tr.kind = bus_item.WRITE if self.vif.PWRITE.value == 1 else bus_item.READ
tr.addr = self.vif.PADDR.value.integer
await self.sample_delay()
if self.vif.PENABLE.value.binstr != "1":
uvm_error(
self.tag,
f"APB protocol violation: SETUP cycle not followed by ENABLE cycle PENABLE={self.vif.PENABLE.value.binstr}",
)
await self.wait_ready()
if tr.kind == bus_item.WRITE:
tr.data = self.vif.PWDATA.value.integer
else:
try:
tr.data = self.vif.PRDATA.value.integer
except ValueError:
uvm_error(
self.tag,
f"PRDATA is not an integer {self.vif.PRDATA.value.binstr}",
)
tr.data = self.vif.PRDATA.value.binstr
self.monitor_port.write(tr)
self.counter += 1
# if self.counter > 40:
# uvm_fatal(self.tag, "sampled too many transactions")
# update reg value #TODO: move this to the ref_model later
# self.regs.write_reg_value(tr.addr, tr.data)
uvm_info(
self.tag, "sampled APB transaction: " + tr.convert2string(), UVM_HIGH
)
async def watch_reset(self):
while True:
await FallingEdge(self.vif.RESETn)
# send reset tr
tr = bus_item.type_id.create("tr", self)
tr.kind = bus_item.RESET
tr.addr = 0
self.monitor_port.write(tr)
uvm_info(
self.tag, "sampled reset transaction: " + tr.convert2string(), UVM_HIGH
)
async def sample_delay(self):
await RisingEdge(self.vif.CLK)
# await Timer(1, "NS")
async def wait_ready(self):
while self.vif.PREADY == 0:
await self.sample_delay()
uvm_component_utils(bus_apb_monitor)