diff options
Diffstat (limited to 'esds/simulator.py')
| -rw-r--r-- | esds/simulator.py | 48 |
1 files changed, 26 insertions, 22 deletions
diff --git a/esds/simulator.py b/esds/simulator.py index b774468..44c45c4 100644 --- a/esds/simulator.py +++ b/esds/simulator.py @@ -1,6 +1,7 @@ import numpy as np import threading,sys,time from esds.node import Node +from esds.rcode import RCode class Simulator: """ @@ -139,22 +140,22 @@ class Simulator: selector.append(False) self.events=self.events[~np.array(selector)] node["state"]="running" - node.rqueue.put(("timeout_remove",0)) + node.rqueue.put(("timeout_remove",RCode.SUCCESS)) elif timeout_remove_only: break elif not timeout_remove_only: if node["request"] == "log": self.log(node.rargs,node=node.node_id) node["state"]="running" - node.rqueue.put(("log",0)) + node.rqueue.put(("log",RCode.SUCCESS)) elif node["request"] == "timeout_add": self.add_event(1,self.time+node.rargs,node.node_id,priority=3) node["state"]="running" - node.rqueue.put(("timeout_add",0)) + node.rqueue.put(("timeout_add",RCode.SUCCESS)) elif node["request"] == "notify_add": self.add_event(4,self.time+node.rargs,node.node_id,priority=0) node["state"]="running" - node.rqueue.put(("notify_add",0)) + node.rqueue.put(("notify_add",RCode.SUCCESS)) elif node["request"] == "notify_remove": selector=list() for event in self.events: @@ -164,7 +165,7 @@ class Simulator: selector.append(False) self.events=self.events[~np.array(selector)] node["state"]="running" - node.rqueue.put(("notify_remove",0)) + node.rqueue.put(("notify_remove",RCode.SUCCESS)) elif node["request"] == "abort": self.log("Simulation aborted: "+node.rargs,node=node.node_id) exit(1) @@ -184,7 +185,7 @@ class Simulator: node.rqueue.put(("read",0)) # Always return 0 if register is unknown elif node["request"] == "turn_on": node["state"]="running" - node.rqueue.put(("turn_on",0)) + node.rqueue.put(("turn_on",RCode.SUCCESS)) self.log("Turned on",node=node.node_id) elif node["request"] == "turn_off": # Create communications selectors (True/False arrays) @@ -209,7 +210,7 @@ class Simulator: selector_wireless.append(False) selector_wired.append(False) # Build the set of senders to notify (only in wired connections, - # indeed IRL, in wireless communications sender would send all its data) + # indeed IRL, in wireless communications, sender would send all its data) senders_to_notify=set() for event in self.events[selector_wired]: senders_to_notify.add(int(event[2][0])) @@ -218,14 +219,14 @@ class Simulator: self.events=self.events[~(np.array(selector_wireless)|np.array(selector_wired))] # Update node state after turning off node["state"]="running" - node.rqueue.put(("turn_off",0)) + node.rqueue.put(("turn_off",RCode.SUCCESS)) self.log("Turned off",node=node.node_id) # Informed senders of wired events that communication ended for sender_id in senders_to_notify: # Notify sender (node that wired sharing is updated in the send_cancel request) sender_node=self.nodes[sender_id] sender_node["state"]="running" - sender_node.rqueue.put(("send_cancel",2)) + sender_node.rqueue.put(("send_cancel",RCode.RECEIVER_TURNED_OFF)) # The node should resume at current self.time. So, sync the sender now: self.sync_node_non_blocking(sender_node) self.sync_node_blocking(sender_node) @@ -247,7 +248,7 @@ class Simulator: for com in sharing_to_update: self.update_sharing(com[0],-1,com[1]) node["state"]="running" - node.rqueue.put(("send_cancel",0)) + node.rqueue.put(("send_cancel",RCode.SUCCESS)) node.sync() def update_sharing(self, dst, amount,interface): @@ -316,20 +317,23 @@ class Simulator: exit(1) if not self.communicate(interface, node.node_id, dst, data, datasize,receiver_required): node["state"]="running" - node.rqueue.put(("send",4)) - self.sync_node_non_blocking(node,timeout_remove_only=True) + node.rqueue.put(("send",RCode.RECEIVER_UNAVAILABLE)) + # Do not forget to collect the next event (since current event did not happend) + # Be careful in node implementation to have no infinite loop when receiver_required=True + self.sync_node_non_blocking(node) + self.sync_node_blocking(node) elif node["request"] == "receive": interface=node.rargs if node["interfaces_queue_size"][interface] > 0: node["interfaces_queue_size"][interface]-=1 node["state"]="running" - node.rqueue.put(("receive",0)) + node.rqueue.put(("receive",RCode.SUCCESS)) # Do not forget to collect the next event. This is the only request which is processed here self.sync_node_non_blocking(node) self.sync_node_blocking(node) elif node["request"] == "wait_end": node["state"]="pending" - node.rqueue.put(("wait_end",0)) + node.rqueue.put(("wait_end",RCode.SUCCESS)) self.wait_end_nodes.append(node.node_id) def communicate(self, interface, src, dst, data, datasize,receiver_required): @@ -413,10 +417,10 @@ class Simulator: for node in self.nodes: if node["state"] != "terminated": node["state"]="running" - node.rqueue.put(("sim_end",0)) + node.rqueue.put(("sim_end",RCode.SUCCESS)) self.sync_node_non_blocking(node) # Allow them for make non-blocking call requests (printing logs for example) else: - node.rqueue.put(("sim_end",0)) + node.rqueue.put(("sim_end",RCode.SUCCESS)) break # End the event processing loop # Update simulation time @@ -442,12 +446,12 @@ class Simulator: if dst["state"] == "call_blocking" and dst["request"] == "receive": dst["interfaces_queue_size"][interface]-=1 dst["state"]="running" - dst.rqueue.put(("receive",0)) + dst.rqueue.put(("receive",RCode.SUCCESS)) self.sync_node_non_blocking(dst,timeout_remove_only=True) self.notify_node_plugins(dst, "on_communication_end", event) self.update_sharing(dst.node_id,-1,interface) src["state"]="running" - code=0 if perform_delivery else 1 + code=RCode.SUCCESS if perform_delivery else RCode.FAIL src.rqueue.put(("send",code)) self.sync_node_non_blocking(src,timeout_remove_only=True) self.notify_node_plugins(src, "on_communication_end", event) @@ -461,23 +465,23 @@ class Simulator: if dst["state"] == "call_blocking" and dst["request"] == "receive": dst["interfaces_queue_size"][interface]-=1 dst["state"]="running" - dst.rqueue.put(("receive",0)) + dst.rqueue.put(("receive",RCode.SUCCESS)) self.sync_node_non_blocking(dst,timeout_remove_only=True) self.notify_node_plugins(dst, "on_communication_end", event) else: src["state"]="running" - src.rqueue.put(("send",0)) + src.rqueue.put(("send",RCode.SUCCESS)) self.sync_node_non_blocking(src,timeout_remove_only=True) self.notify_node_plugins(src, "on_communication_end", event) elif event_type == 1: # Timeout node=self.nodes[int(event)] node["state"]="running" - node.rqueue.put(("timeout",0)) + node.rqueue.put(("timeout",RCode.SUCCESS)) self.sync_node_non_blocking(node,timeout_remove_only=True) elif event_type == 4: node=self.nodes[int(event)] node["state"]="running" - node.rqueue.put(("notify",0)) + node.rqueue.put(("notify",RCode.SUCCESS)) self.sync_node_non_blocking(node,timeout_remove_only=True) elif event_type == 2 or event_type == 3: breakpoint_callback(self) |
