diff options
Diffstat (limited to 'esds/simulator.py')
| -rw-r--r-- | esds/simulator.py | 190 |
1 files changed, 109 insertions, 81 deletions
diff --git a/esds/simulator.py b/esds/simulator.py index e775863..4217e1d 100644 --- a/esds/simulator.py +++ b/esds/simulator.py @@ -7,10 +7,12 @@ class Simulator: Flow-Level Discrete Event Simulator for Cyber-Physical Systems The general format for an event is (type,timestamp,event,priority) Event types: - - 0 send (0,timestamp,(src,dst,interface,data,datasize,duration,datasize_remaining), 1) - - 1 timeout (1,timestamp,node_id,4) - - 2 breakpoint_manual (3,timestamp,0,0) - - 3 breakpoint_auto (4,timestamp,0,0) + - 0 send (0,timestamp,(src,dst,interface,data,datasize,duration,datasize_remaining), 2) + - 1 timeout (1,timestamp,node_id,3) + - 2 breakpoint_manual (3,timestamp,0,1) + - 3 breakpoint_auto (4,timestamp,0,1) + - 4 notify (0,timestamp,node_id,0) + Very important notes: - When the simulator wakes up a node (changing is state to running) data that should be received by that node @@ -18,6 +20,8 @@ class Simulator: Otherwise plugings such as the power states one may not gives accurate results because of missing entries in the nodes received queues. - The state of a node should always be updated (e.g node["state"]="running") BEFORE updating its queue (e.g node.rqueue.put(("timeout_remove",0)) + - Notify as the same behavior as timeout. Except it has the highest priority among all the events! This is particularly usefull for wait events which SHOULD + be handle before any other one. That way after a wait, nodes a ready perform receivet() with timeout=0. """ def __init__(self,netmat): @@ -123,21 +127,13 @@ class Simulator: sorted_indexes=np.lexsort((self.events[:,3],self.events[:,1])) self.events=self.events[sorted_indexes] - def sync_node_non_blocking(self,node): + def sync_node_non_blocking(self,node, timeout_remove_only=False): """ Process all call request and wait for Node.sync() to return """ node.sync() while node["state"] == "call_non_blocking": - if node["request"] == "log": - self.log(node.rargs,node=node.node_id) - node["state"]="running" - node.rqueue.put(("log",0)) - elif node["request"] == "timeout_add": - self.add_event(1,self.time+node.rargs,node.node_id,priority=3) - node["state"]="running" - node.rqueue.put(("timeout_add",0)) - elif node["request"] == "timeout_remove": + if node["request"] == "timeout_remove": selector=list() for event in self.events: if event[0] == 1 and event[2]==node.node_id: @@ -147,69 +143,94 @@ class Simulator: self.events=self.events[~np.array(selector)] node["state"]="running" node.rqueue.put(("timeout_remove",0)) - elif node["request"] == "abort": - self.log("Simulation aborted: "+node.rargs,node=node.node_id) - exit(1) - elif node["request"] == "read": - node["state"]="running" - if node.rargs == "clock": - node.rqueue.put(("read",float(self.time))) - elif node.rargs[0:5] == "ncom_": # ncom_<interface> register - interface=node.rargs[5:] - count=0 - # Count number of communication on interface + elif timeout_remove_only: + break + elif not timeout_remove_only: + if node["request"] == "log": + self.log(node.rargs,node=node.node_id) + node["state"]="running" + node.rqueue.put(("log",0)) + elif node["request"] == "timeout_add": + self.add_event(1,self.time+node.rargs,node.node_id,priority=3) + node["state"]="running" + node.rqueue.put(("timeout_add",0)) + elif node["request"] == "notify_add": + self.add_event(4,self.time+node.rargs,node.node_id,priority=0) + node["state"]="running" + node.rqueue.put(("notify_add",0)) + elif node["request"] == "notify_remove": + selector=list() for event in self.events: - if event[0] == 0 and event[2][1] == node.node_id and event[2][2] == interface: - count+=1 - node.rqueue.put(("read",count)) - else: - node.rqueue.put(("read",0)) # Always return 0 if register is unknown - elif node["request"] == "turn_on": - node["state"]="running" - node.rqueue.put(("turn_on",0)) - self.log("Turned on",node=node.node_id) - elif node["request"] == "turn_off": - # Create communications selectors (True/False arrays) - selector_wireless=list() # Select all wireless events where node is involved - selector_wired=list() # Select all wired events where node is involved - for event in self.events: - if event[0]==0 and int(event[2][1])==node.node_id: - if self.netmat[event[2][2]]["is_wired"]: - selector_wireless.append(False) - selector_wired.append(True) + if event[0] == 4 and event[2]==node.node_id: + selector.append(True) else: - selector_wireless.append(True) - selector_wired.append(False) - else: - selector_wireless.append(False) - selector_wired.append(False) - # Informed senders of wired events to cancel send - for event in self.events[selector_wired]: - sender=self.nodes[int(event[2][0])] - sender["state"]="running" - sender.rqueue.put(("send_cancel",2)) - # Remove communications from the event list - if(len(self.events) != 0): - self.events=self.events[~(np.array(selector_wireless)|np.array(selector_wired))] - # Refresh wired sharing - for interface in self.sharing.keys(): - self.sharing[interface][node.node_id]=0 # Sharing goes back to zero - # Update node state after turning off - node["state"]="running" - node.rqueue.put(("turn_off",0)) - self.log("Turned off",node=node.node_id) - elif node["request"] == "send_cancel": - selector=list() - for event in self.events: - if event[0]==0 and int(event[2][0]) == node.node_id: - selector.append(True) - if self.netmat[event[2][2]]["is_wired"]: - self.update_sharing(int(event[2][1]),-1,event[2][2]) + selector.append(False) + self.events=self.events[~np.array(selector)] + node["state"]="running" + node.rqueue.put(("notify_remove",0)) + elif node["request"] == "abort": + self.log("Simulation aborted: "+node.rargs,node=node.node_id) + exit(1) + elif node["request"] == "read": + node["state"]="running" + if node.rargs == "clock": + node.rqueue.put(("read",float(self.time))) + elif node.rargs[0:5] == "ncom_": # ncom_<interface> register + interface=node.rargs[5:] + count=0 + # Count number of communication on interface + for event in self.events: + if event[0] == 0 and event[2][1] == node.node_id and event[2][2] == interface: + count+=1 + node.rqueue.put(("read",count)) else: - selector.append(False) - self.events=self.events[~np.array(selector)] - node["state"]="running" - node.rqueue.put(("send_cancel",0)) + node.rqueue.put(("read",0)) # Always return 0 if register is unknown + elif node["request"] == "turn_on": + node["state"]="running" + node.rqueue.put(("turn_on",0)) + self.log("Turned on",node=node.node_id) + elif node["request"] == "turn_off": + # Create communications selectors (True/False arrays) + selector_wireless=list() # Select all wireless events where node is involved + selector_wired=list() # Select all wired events where node is involved + for event in self.events: + if event[0]==0 and int(event[2][1])==node.node_id: + if self.netmat[event[2][2]]["is_wired"]: + selector_wireless.append(False) + selector_wired.append(True) + else: + selector_wireless.append(True) + selector_wired.append(False) + else: + selector_wireless.append(False) + selector_wired.append(False) + # Informed senders of wired events to cancel send + for event in self.events[selector_wired]: + sender=self.nodes[int(event[2][0])] + sender["state"]="running" + sender.rqueue.put(("send_cancel",2)) + # Remove communications from the event list + if(len(self.events) != 0): + self.events=self.events[~(np.array(selector_wireless)|np.array(selector_wired))] + # Refresh wired sharing + for interface in self.sharing.keys(): + self.sharing[interface][node.node_id]=0 # Sharing goes back to zero + # Update node state after turning off + node["state"]="running" + node.rqueue.put(("turn_off",0)) + self.log("Turned off",node=node.node_id) + elif node["request"] == "send_cancel": + selector=list() + for event in self.events: + if event[0]==0 and int(event[2][0]) == node.node_id: + selector.append(True) + if self.netmat[event[2][2]]["is_wired"]: + self.update_sharing(int(event[2][1]),-1,event[2][2]) + else: + selector.append(False) + self.events=self.events[~np.array(selector)] + node["state"]="running" + node.rqueue.put(("send_cancel",0)) node.sync() def update_sharing(self, dst, amount,interface): @@ -326,7 +347,7 @@ class Simulator: return np.arange(0,selector.shape[0])[selector] - def add_event(self,event_type,event_ts,event,priority=1): + def add_event(self,event_type,event_ts,event,priority=2): """ Call this function with sort=True the least amount of time possible """ @@ -341,9 +362,9 @@ class Simulator: self.startat=time.time() self.interferences=interferences for bp in breakpoints: - self.add_event(2,bp,0,0) + self.add_event(2,bp,0,1) if breakpoints_every != None: - self.add_event(3,breakpoints_every,0,0) + self.add_event(3,breakpoints_every,0,1) if debug: with open(self.debug_file_path, "w") as f: f.write("Python version {}\n".format(sys.version)) @@ -396,9 +417,10 @@ class Simulator: dst["interfaces_queue_size"][interface]-=1 dst["state"]="running" dst.rqueue.put(("receive",0)) - self.sync_node_non_blocking(dst) + self.sync_node_non_blocking(dst,timeout_remove_only=True) src["state"]="running" src.rqueue.put(("send",0)) + self.sync_node_non_blocking(src,timeout_remove_only=True) else: if src.node_id != dst.node_id: dst["interfaces"][interface].put((data,start_at,self.time)) @@ -409,19 +431,25 @@ class Simulator: dst["interfaces_queue_size"][interface]-=1 dst["state"]="running" dst.rqueue.put(("receive",0)) - self.sync_node_non_blocking(dst) + self.sync_node_non_blocking(dst,timeout_remove_only=True) else: src["state"]="running" src.rqueue.put(("send",0)) + self.sync_node_non_blocking(src,timeout_remove_only=True) elif event_type == 1: node=self.nodes[int(event)] node["state"]="running" node.rqueue.put(("timeout",0)) - self.sync_node_non_blocking(node) + self.sync_node_non_blocking(node,timeout_remove_only=True) + elif event_type == 4: + node=self.nodes[int(event)] + node["state"]="running" + node.rqueue.put(("notify",0)) + self.sync_node_non_blocking(node,timeout_remove_only=True) elif event_type == 2 or event_type == 3: breakpoint_callback(self) if event_type == 3: - self.add_event(3,self.time+breakpoints_every,0,0) + self.add_event(3,self.time+breakpoints_every,0,1) ##### Simulation ends self.log("Simulation ends") |
