summaryrefslogtreecommitdiff
path: root/esds/simulator.py
diff options
context:
space:
mode:
authorLoic Guegan <manzerbredes@mailbox.org>2022-06-29 11:19:36 +0200
committerLoic Guegan <manzerbredes@mailbox.org>2022-06-29 11:19:36 +0200
commit1bef51d87860f782f687533012167bebb43c093d (patch)
tree3c1e8160eadd3b7e5d314d9b1759486783544531 /esds/simulator.py
parentbe6d819080fc6c4dccd5b7e56d1da6a1ab930d4f (diff)
Debug event processing
Diffstat (limited to 'esds/simulator.py')
-rw-r--r--esds/simulator.py190
1 files changed, 109 insertions, 81 deletions
diff --git a/esds/simulator.py b/esds/simulator.py
index e775863..4217e1d 100644
--- a/esds/simulator.py
+++ b/esds/simulator.py
@@ -7,10 +7,12 @@ class Simulator:
Flow-Level Discrete Event Simulator for Cyber-Physical Systems
The general format for an event is (type,timestamp,event,priority)
Event types:
- - 0 send (0,timestamp,(src,dst,interface,data,datasize,duration,datasize_remaining), 1)
- - 1 timeout (1,timestamp,node_id,4)
- - 2 breakpoint_manual (3,timestamp,0,0)
- - 3 breakpoint_auto (4,timestamp,0,0)
+ - 0 send (0,timestamp,(src,dst,interface,data,datasize,duration,datasize_remaining), 2)
+ - 1 timeout (1,timestamp,node_id,3)
+ - 2 breakpoint_manual (3,timestamp,0,1)
+ - 3 breakpoint_auto (4,timestamp,0,1)
+ - 4 notify (0,timestamp,node_id,0)
+
Very important notes:
- When the simulator wakes up a node (changing is state to running) data that should be received by that node
@@ -18,6 +20,8 @@ class Simulator:
Otherwise plugings such as the power states one may not gives accurate results because of missing entries in the nodes received queues.
- The state of a node should always be updated (e.g node["state"]="running") BEFORE updating its
queue (e.g node.rqueue.put(("timeout_remove",0))
+ - Notify as the same behavior as timeout. Except it has the highest priority among all the events! This is particularly usefull for wait events which SHOULD
+ be handle before any other one. That way after a wait, nodes a ready perform receivet() with timeout=0.
"""
def __init__(self,netmat):
@@ -123,21 +127,13 @@ class Simulator:
sorted_indexes=np.lexsort((self.events[:,3],self.events[:,1]))
self.events=self.events[sorted_indexes]
- def sync_node_non_blocking(self,node):
+ def sync_node_non_blocking(self,node, timeout_remove_only=False):
"""
Process all call request and wait for Node.sync() to return
"""
node.sync()
while node["state"] == "call_non_blocking":
- if node["request"] == "log":
- self.log(node.rargs,node=node.node_id)
- node["state"]="running"
- node.rqueue.put(("log",0))
- elif node["request"] == "timeout_add":
- self.add_event(1,self.time+node.rargs,node.node_id,priority=3)
- node["state"]="running"
- node.rqueue.put(("timeout_add",0))
- elif node["request"] == "timeout_remove":
+ if node["request"] == "timeout_remove":
selector=list()
for event in self.events:
if event[0] == 1 and event[2]==node.node_id:
@@ -147,69 +143,94 @@ class Simulator:
self.events=self.events[~np.array(selector)]
node["state"]="running"
node.rqueue.put(("timeout_remove",0))
- elif node["request"] == "abort":
- self.log("Simulation aborted: "+node.rargs,node=node.node_id)
- exit(1)
- elif node["request"] == "read":
- node["state"]="running"
- if node.rargs == "clock":
- node.rqueue.put(("read",float(self.time)))
- elif node.rargs[0:5] == "ncom_": # ncom_<interface> register
- interface=node.rargs[5:]
- count=0
- # Count number of communication on interface
+ elif timeout_remove_only:
+ break
+ elif not timeout_remove_only:
+ if node["request"] == "log":
+ self.log(node.rargs,node=node.node_id)
+ node["state"]="running"
+ node.rqueue.put(("log",0))
+ elif node["request"] == "timeout_add":
+ self.add_event(1,self.time+node.rargs,node.node_id,priority=3)
+ node["state"]="running"
+ node.rqueue.put(("timeout_add",0))
+ elif node["request"] == "notify_add":
+ self.add_event(4,self.time+node.rargs,node.node_id,priority=0)
+ node["state"]="running"
+ node.rqueue.put(("notify_add",0))
+ elif node["request"] == "notify_remove":
+ selector=list()
for event in self.events:
- if event[0] == 0 and event[2][1] == node.node_id and event[2][2] == interface:
- count+=1
- node.rqueue.put(("read",count))
- else:
- node.rqueue.put(("read",0)) # Always return 0 if register is unknown
- elif node["request"] == "turn_on":
- node["state"]="running"
- node.rqueue.put(("turn_on",0))
- self.log("Turned on",node=node.node_id)
- elif node["request"] == "turn_off":
- # Create communications selectors (True/False arrays)
- selector_wireless=list() # Select all wireless events where node is involved
- selector_wired=list() # Select all wired events where node is involved
- for event in self.events:
- if event[0]==0 and int(event[2][1])==node.node_id:
- if self.netmat[event[2][2]]["is_wired"]:
- selector_wireless.append(False)
- selector_wired.append(True)
+ if event[0] == 4 and event[2]==node.node_id:
+ selector.append(True)
else:
- selector_wireless.append(True)
- selector_wired.append(False)
- else:
- selector_wireless.append(False)
- selector_wired.append(False)
- # Informed senders of wired events to cancel send
- for event in self.events[selector_wired]:
- sender=self.nodes[int(event[2][0])]
- sender["state"]="running"
- sender.rqueue.put(("send_cancel",2))
- # Remove communications from the event list
- if(len(self.events) != 0):
- self.events=self.events[~(np.array(selector_wireless)|np.array(selector_wired))]
- # Refresh wired sharing
- for interface in self.sharing.keys():
- self.sharing[interface][node.node_id]=0 # Sharing goes back to zero
- # Update node state after turning off
- node["state"]="running"
- node.rqueue.put(("turn_off",0))
- self.log("Turned off",node=node.node_id)
- elif node["request"] == "send_cancel":
- selector=list()
- for event in self.events:
- if event[0]==0 and int(event[2][0]) == node.node_id:
- selector.append(True)
- if self.netmat[event[2][2]]["is_wired"]:
- self.update_sharing(int(event[2][1]),-1,event[2][2])
+ selector.append(False)
+ self.events=self.events[~np.array(selector)]
+ node["state"]="running"
+ node.rqueue.put(("notify_remove",0))
+ elif node["request"] == "abort":
+ self.log("Simulation aborted: "+node.rargs,node=node.node_id)
+ exit(1)
+ elif node["request"] == "read":
+ node["state"]="running"
+ if node.rargs == "clock":
+ node.rqueue.put(("read",float(self.time)))
+ elif node.rargs[0:5] == "ncom_": # ncom_<interface> register
+ interface=node.rargs[5:]
+ count=0
+ # Count number of communication on interface
+ for event in self.events:
+ if event[0] == 0 and event[2][1] == node.node_id and event[2][2] == interface:
+ count+=1
+ node.rqueue.put(("read",count))
else:
- selector.append(False)
- self.events=self.events[~np.array(selector)]
- node["state"]="running"
- node.rqueue.put(("send_cancel",0))
+ node.rqueue.put(("read",0)) # Always return 0 if register is unknown
+ elif node["request"] == "turn_on":
+ node["state"]="running"
+ node.rqueue.put(("turn_on",0))
+ self.log("Turned on",node=node.node_id)
+ elif node["request"] == "turn_off":
+ # Create communications selectors (True/False arrays)
+ selector_wireless=list() # Select all wireless events where node is involved
+ selector_wired=list() # Select all wired events where node is involved
+ for event in self.events:
+ if event[0]==0 and int(event[2][1])==node.node_id:
+ if self.netmat[event[2][2]]["is_wired"]:
+ selector_wireless.append(False)
+ selector_wired.append(True)
+ else:
+ selector_wireless.append(True)
+ selector_wired.append(False)
+ else:
+ selector_wireless.append(False)
+ selector_wired.append(False)
+ # Informed senders of wired events to cancel send
+ for event in self.events[selector_wired]:
+ sender=self.nodes[int(event[2][0])]
+ sender["state"]="running"
+ sender.rqueue.put(("send_cancel",2))
+ # Remove communications from the event list
+ if(len(self.events) != 0):
+ self.events=self.events[~(np.array(selector_wireless)|np.array(selector_wired))]
+ # Refresh wired sharing
+ for interface in self.sharing.keys():
+ self.sharing[interface][node.node_id]=0 # Sharing goes back to zero
+ # Update node state after turning off
+ node["state"]="running"
+ node.rqueue.put(("turn_off",0))
+ self.log("Turned off",node=node.node_id)
+ elif node["request"] == "send_cancel":
+ selector=list()
+ for event in self.events:
+ if event[0]==0 and int(event[2][0]) == node.node_id:
+ selector.append(True)
+ if self.netmat[event[2][2]]["is_wired"]:
+ self.update_sharing(int(event[2][1]),-1,event[2][2])
+ else:
+ selector.append(False)
+ self.events=self.events[~np.array(selector)]
+ node["state"]="running"
+ node.rqueue.put(("send_cancel",0))
node.sync()
def update_sharing(self, dst, amount,interface):
@@ -326,7 +347,7 @@ class Simulator:
return np.arange(0,selector.shape[0])[selector]
- def add_event(self,event_type,event_ts,event,priority=1):
+ def add_event(self,event_type,event_ts,event,priority=2):
"""
Call this function with sort=True the least amount of time possible
"""
@@ -341,9 +362,9 @@ class Simulator:
self.startat=time.time()
self.interferences=interferences
for bp in breakpoints:
- self.add_event(2,bp,0,0)
+ self.add_event(2,bp,0,1)
if breakpoints_every != None:
- self.add_event(3,breakpoints_every,0,0)
+ self.add_event(3,breakpoints_every,0,1)
if debug:
with open(self.debug_file_path, "w") as f:
f.write("Python version {}\n".format(sys.version))
@@ -396,9 +417,10 @@ class Simulator:
dst["interfaces_queue_size"][interface]-=1
dst["state"]="running"
dst.rqueue.put(("receive",0))
- self.sync_node_non_blocking(dst)
+ self.sync_node_non_blocking(dst,timeout_remove_only=True)
src["state"]="running"
src.rqueue.put(("send",0))
+ self.sync_node_non_blocking(src,timeout_remove_only=True)
else:
if src.node_id != dst.node_id:
dst["interfaces"][interface].put((data,start_at,self.time))
@@ -409,19 +431,25 @@ class Simulator:
dst["interfaces_queue_size"][interface]-=1
dst["state"]="running"
dst.rqueue.put(("receive",0))
- self.sync_node_non_blocking(dst)
+ self.sync_node_non_blocking(dst,timeout_remove_only=True)
else:
src["state"]="running"
src.rqueue.put(("send",0))
+ self.sync_node_non_blocking(src,timeout_remove_only=True)
elif event_type == 1:
node=self.nodes[int(event)]
node["state"]="running"
node.rqueue.put(("timeout",0))
- self.sync_node_non_blocking(node)
+ self.sync_node_non_blocking(node,timeout_remove_only=True)
+ elif event_type == 4:
+ node=self.nodes[int(event)]
+ node["state"]="running"
+ node.rqueue.put(("notify",0))
+ self.sync_node_non_blocking(node,timeout_remove_only=True)
elif event_type == 2 or event_type == 3:
breakpoint_callback(self)
if event_type == 3:
- self.add_event(3,self.time+breakpoints_every,0,0)
+ self.add_event(3,self.time+breakpoints_every,0,1)
##### Simulation ends
self.log("Simulation ends")