summaryrefslogtreecommitdiff
path: root/esds
diff options
context:
space:
mode:
Diffstat (limited to 'esds')
-rw-r--r--esds/node.py24
-rw-r--r--esds/simulator.py190
2 files changed, 129 insertions, 85 deletions
diff --git a/esds/node.py b/esds/node.py
index e26e239..4c9ca5f 100644
--- a/esds/node.py
+++ b/esds/node.py
@@ -4,7 +4,7 @@ class Node:
available_node_id=0
def __init__(self,src,interfaces):
"""
-
+ self.chest: contains mutex protected data
"""
self.node_id=Node.available_node_id
Node.available_node_id+=1 # Refresh node id
@@ -71,12 +71,16 @@ class Node:
def wait(self,duration):
if type(duration) != int and type(duration) != float:
self.abort("wait() called with a non-number duration")
+ elif duration < 0:
+ self.abort("wait() called with a negative duration (duration="+str(duration)+")")
+ elif duration == 0:
+ return
self.rargs=duration
- self["request"]="timeout_add"
+ self["request"]="notify_add"
self["state"]="call_non_blocking"
- self.wait_ack(["timeout_add"])
+ self.wait_ack(["notify_add"])
self["state"]="pending"
- self.wait_ack(["timeout"])
+ self.wait_ack(["notify"])
def wait_end(self):
self["request"]="wait_end"
@@ -103,6 +107,8 @@ class Node:
self.abort("send() called with a non-number datasize")
elif type(dst) != int and type(dst) != float and dst != None:
self.abort("send() called with a non-number dst (wired interfaces) or dst is not None (wireless interfaces)")
+ elif not self["turned_on"]:
+ self.abort("send() called while node is turned off")
self.plugin_notify("send_call",(interface,data,datasize,dst))
self.rargs=(interface, data, datasize, dst)
self["request"]="send"
@@ -118,8 +124,12 @@ class Node:
self.abort("sendt() called with a non-number datasize")
elif type(timeout) != int and type(timeout) != float:
self.abort("sendt() called with a non-number timeout")
+ elif timeout < 0:
+ self.abort("sendt() called with a negative timeout (timeout="+str(timeout)+")")
elif type(dst) != int and type(dst) != float and dst != None:
self.abort("send() called with a non-number dst (wired interfaces) or dst is not None (wireless interfaces)")
+ elif not self["turned_on"]:
+ self.abort("sendt() called while node is turned off")
self.rargs=timeout
self["request"]="timeout_add"
self["state"]="call_non_blocking"
@@ -141,6 +151,8 @@ class Node:
def receive(self,interface):
if interface not in self["interfaces"]:
self.abort("receive() called with an unknown interface \""+interface+"\"")
+ elif not self["turned_on"]:
+ self.abort("receive() called while node is turned off")
self["request"]="receive"
self.rargs=interface
self["state"]="call_blocking"
@@ -154,6 +166,10 @@ class Node:
self.abort("receivet() called with an unknown interface \""+interface+"\"")
elif type(timeout) != int and type(timeout) != float:
self.abort("receivet() called with a non-number timeout")
+ elif timeout < 0:
+ self.abort("receivet() called with a negative timeout (timeout="+str(timeout)+")")
+ elif not self["turned_on"]:
+ self.abort("receivet() called while node is turned off")
self.rargs=timeout
self["request"]="timeout_add"
self["state"]="call_non_blocking"
diff --git a/esds/simulator.py b/esds/simulator.py
index e775863..4217e1d 100644
--- a/esds/simulator.py
+++ b/esds/simulator.py
@@ -7,10 +7,12 @@ class Simulator:
Flow-Level Discrete Event Simulator for Cyber-Physical Systems
The general format for an event is (type,timestamp,event,priority)
Event types:
- - 0 send (0,timestamp,(src,dst,interface,data,datasize,duration,datasize_remaining), 1)
- - 1 timeout (1,timestamp,node_id,4)
- - 2 breakpoint_manual (3,timestamp,0,0)
- - 3 breakpoint_auto (4,timestamp,0,0)
+ - 0 send (0,timestamp,(src,dst,interface,data,datasize,duration,datasize_remaining), 2)
+ - 1 timeout (1,timestamp,node_id,3)
+ - 2 breakpoint_manual (3,timestamp,0,1)
+ - 3 breakpoint_auto (4,timestamp,0,1)
+ - 4 notify (0,timestamp,node_id,0)
+
Very important notes:
- When the simulator wakes up a node (changing is state to running) data that should be received by that node
@@ -18,6 +20,8 @@ class Simulator:
Otherwise plugings such as the power states one may not gives accurate results because of missing entries in the nodes received queues.
- The state of a node should always be updated (e.g node["state"]="running") BEFORE updating its
queue (e.g node.rqueue.put(("timeout_remove",0))
+ - Notify as the same behavior as timeout. Except it has the highest priority among all the events! This is particularly usefull for wait events which SHOULD
+ be handle before any other one. That way after a wait, nodes a ready perform receivet() with timeout=0.
"""
def __init__(self,netmat):
@@ -123,21 +127,13 @@ class Simulator:
sorted_indexes=np.lexsort((self.events[:,3],self.events[:,1]))
self.events=self.events[sorted_indexes]
- def sync_node_non_blocking(self,node):
+ def sync_node_non_blocking(self,node, timeout_remove_only=False):
"""
Process all call request and wait for Node.sync() to return
"""
node.sync()
while node["state"] == "call_non_blocking":
- if node["request"] == "log":
- self.log(node.rargs,node=node.node_id)
- node["state"]="running"
- node.rqueue.put(("log",0))
- elif node["request"] == "timeout_add":
- self.add_event(1,self.time+node.rargs,node.node_id,priority=3)
- node["state"]="running"
- node.rqueue.put(("timeout_add",0))
- elif node["request"] == "timeout_remove":
+ if node["request"] == "timeout_remove":
selector=list()
for event in self.events:
if event[0] == 1 and event[2]==node.node_id:
@@ -147,69 +143,94 @@ class Simulator:
self.events=self.events[~np.array(selector)]
node["state"]="running"
node.rqueue.put(("timeout_remove",0))
- elif node["request"] == "abort":
- self.log("Simulation aborted: "+node.rargs,node=node.node_id)
- exit(1)
- elif node["request"] == "read":
- node["state"]="running"
- if node.rargs == "clock":
- node.rqueue.put(("read",float(self.time)))
- elif node.rargs[0:5] == "ncom_": # ncom_<interface> register
- interface=node.rargs[5:]
- count=0
- # Count number of communication on interface
+ elif timeout_remove_only:
+ break
+ elif not timeout_remove_only:
+ if node["request"] == "log":
+ self.log(node.rargs,node=node.node_id)
+ node["state"]="running"
+ node.rqueue.put(("log",0))
+ elif node["request"] == "timeout_add":
+ self.add_event(1,self.time+node.rargs,node.node_id,priority=3)
+ node["state"]="running"
+ node.rqueue.put(("timeout_add",0))
+ elif node["request"] == "notify_add":
+ self.add_event(4,self.time+node.rargs,node.node_id,priority=0)
+ node["state"]="running"
+ node.rqueue.put(("notify_add",0))
+ elif node["request"] == "notify_remove":
+ selector=list()
for event in self.events:
- if event[0] == 0 and event[2][1] == node.node_id and event[2][2] == interface:
- count+=1
- node.rqueue.put(("read",count))
- else:
- node.rqueue.put(("read",0)) # Always return 0 if register is unknown
- elif node["request"] == "turn_on":
- node["state"]="running"
- node.rqueue.put(("turn_on",0))
- self.log("Turned on",node=node.node_id)
- elif node["request"] == "turn_off":
- # Create communications selectors (True/False arrays)
- selector_wireless=list() # Select all wireless events where node is involved
- selector_wired=list() # Select all wired events where node is involved
- for event in self.events:
- if event[0]==0 and int(event[2][1])==node.node_id:
- if self.netmat[event[2][2]]["is_wired"]:
- selector_wireless.append(False)
- selector_wired.append(True)
+ if event[0] == 4 and event[2]==node.node_id:
+ selector.append(True)
else:
- selector_wireless.append(True)
- selector_wired.append(False)
- else:
- selector_wireless.append(False)
- selector_wired.append(False)
- # Informed senders of wired events to cancel send
- for event in self.events[selector_wired]:
- sender=self.nodes[int(event[2][0])]
- sender["state"]="running"
- sender.rqueue.put(("send_cancel",2))
- # Remove communications from the event list
- if(len(self.events) != 0):
- self.events=self.events[~(np.array(selector_wireless)|np.array(selector_wired))]
- # Refresh wired sharing
- for interface in self.sharing.keys():
- self.sharing[interface][node.node_id]=0 # Sharing goes back to zero
- # Update node state after turning off
- node["state"]="running"
- node.rqueue.put(("turn_off",0))
- self.log("Turned off",node=node.node_id)
- elif node["request"] == "send_cancel":
- selector=list()
- for event in self.events:
- if event[0]==0 and int(event[2][0]) == node.node_id:
- selector.append(True)
- if self.netmat[event[2][2]]["is_wired"]:
- self.update_sharing(int(event[2][1]),-1,event[2][2])
+ selector.append(False)
+ self.events=self.events[~np.array(selector)]
+ node["state"]="running"
+ node.rqueue.put(("notify_remove",0))
+ elif node["request"] == "abort":
+ self.log("Simulation aborted: "+node.rargs,node=node.node_id)
+ exit(1)
+ elif node["request"] == "read":
+ node["state"]="running"
+ if node.rargs == "clock":
+ node.rqueue.put(("read",float(self.time)))
+ elif node.rargs[0:5] == "ncom_": # ncom_<interface> register
+ interface=node.rargs[5:]
+ count=0
+ # Count number of communication on interface
+ for event in self.events:
+ if event[0] == 0 and event[2][1] == node.node_id and event[2][2] == interface:
+ count+=1
+ node.rqueue.put(("read",count))
else:
- selector.append(False)
- self.events=self.events[~np.array(selector)]
- node["state"]="running"
- node.rqueue.put(("send_cancel",0))
+ node.rqueue.put(("read",0)) # Always return 0 if register is unknown
+ elif node["request"] == "turn_on":
+ node["state"]="running"
+ node.rqueue.put(("turn_on",0))
+ self.log("Turned on",node=node.node_id)
+ elif node["request"] == "turn_off":
+ # Create communications selectors (True/False arrays)
+ selector_wireless=list() # Select all wireless events where node is involved
+ selector_wired=list() # Select all wired events where node is involved
+ for event in self.events:
+ if event[0]==0 and int(event[2][1])==node.node_id:
+ if self.netmat[event[2][2]]["is_wired"]:
+ selector_wireless.append(False)
+ selector_wired.append(True)
+ else:
+ selector_wireless.append(True)
+ selector_wired.append(False)
+ else:
+ selector_wireless.append(False)
+ selector_wired.append(False)
+ # Informed senders of wired events to cancel send
+ for event in self.events[selector_wired]:
+ sender=self.nodes[int(event[2][0])]
+ sender["state"]="running"
+ sender.rqueue.put(("send_cancel",2))
+ # Remove communications from the event list
+ if(len(self.events) != 0):
+ self.events=self.events[~(np.array(selector_wireless)|np.array(selector_wired))]
+ # Refresh wired sharing
+ for interface in self.sharing.keys():
+ self.sharing[interface][node.node_id]=0 # Sharing goes back to zero
+ # Update node state after turning off
+ node["state"]="running"
+ node.rqueue.put(("turn_off",0))
+ self.log("Turned off",node=node.node_id)
+ elif node["request"] == "send_cancel":
+ selector=list()
+ for event in self.events:
+ if event[0]==0 and int(event[2][0]) == node.node_id:
+ selector.append(True)
+ if self.netmat[event[2][2]]["is_wired"]:
+ self.update_sharing(int(event[2][1]),-1,event[2][2])
+ else:
+ selector.append(False)
+ self.events=self.events[~np.array(selector)]
+ node["state"]="running"
+ node.rqueue.put(("send_cancel",0))
node.sync()
def update_sharing(self, dst, amount,interface):
@@ -326,7 +347,7 @@ class Simulator:
return np.arange(0,selector.shape[0])[selector]
- def add_event(self,event_type,event_ts,event,priority=1):
+ def add_event(self,event_type,event_ts,event,priority=2):
"""
Call this function with sort=True the least amount of time possible
"""
@@ -341,9 +362,9 @@ class Simulator:
self.startat=time.time()
self.interferences=interferences
for bp in breakpoints:
- self.add_event(2,bp,0,0)
+ self.add_event(2,bp,0,1)
if breakpoints_every != None:
- self.add_event(3,breakpoints_every,0,0)
+ self.add_event(3,breakpoints_every,0,1)
if debug:
with open(self.debug_file_path, "w") as f:
f.write("Python version {}\n".format(sys.version))
@@ -396,9 +417,10 @@ class Simulator:
dst["interfaces_queue_size"][interface]-=1
dst["state"]="running"
dst.rqueue.put(("receive",0))
- self.sync_node_non_blocking(dst)
+ self.sync_node_non_blocking(dst,timeout_remove_only=True)
src["state"]="running"
src.rqueue.put(("send",0))
+ self.sync_node_non_blocking(src,timeout_remove_only=True)
else:
if src.node_id != dst.node_id:
dst["interfaces"][interface].put((data,start_at,self.time))
@@ -409,19 +431,25 @@ class Simulator:
dst["interfaces_queue_size"][interface]-=1
dst["state"]="running"
dst.rqueue.put(("receive",0))
- self.sync_node_non_blocking(dst)
+ self.sync_node_non_blocking(dst,timeout_remove_only=True)
else:
src["state"]="running"
src.rqueue.put(("send",0))
+ self.sync_node_non_blocking(src,timeout_remove_only=True)
elif event_type == 1:
node=self.nodes[int(event)]
node["state"]="running"
node.rqueue.put(("timeout",0))
- self.sync_node_non_blocking(node)
+ self.sync_node_non_blocking(node,timeout_remove_only=True)
+ elif event_type == 4:
+ node=self.nodes[int(event)]
+ node["state"]="running"
+ node.rqueue.put(("notify",0))
+ self.sync_node_non_blocking(node,timeout_remove_only=True)
elif event_type == 2 or event_type == 3:
breakpoint_callback(self)
if event_type == 3:
- self.add_event(3,self.time+breakpoints_every,0,0)
+ self.add_event(3,self.time+breakpoints_every,0,1)
##### Simulation ends
self.log("Simulation ends")