summaryrefslogtreecommitdiff
path: root/esds/simulator.py
diff options
context:
space:
mode:
Diffstat (limited to 'esds/simulator.py')
-rw-r--r--esds/simulator.py116
1 files changed, 62 insertions, 54 deletions
diff --git a/esds/simulator.py b/esds/simulator.py
index 6048cdc..90ad427 100644
--- a/esds/simulator.py
+++ b/esds/simulator.py
@@ -161,69 +161,77 @@ class Simulator:
node.rqueue.put(("turn_on",RCode.SUCCESS))
self.log("Turned on",node=node.node_id)
elif node["request"] == "turn_off":
- # Create communications selectors (True/False arrays)
- selector_wireless=list() # Select all wireless events where node is receiver
- selector_wired=list() # Select all wired events where node is receiver
- for event in self.events:
- if event[0]==0 and int(event[2][1])==node.node_id:
- if self.netmat[event[2][2]]["is_wired"]:
- selector_wireless.append(False)
- if event[2][9]: # Check if should be cancel on turn_off (receiver_required)
- selector_wired.append(True)
- else:
- selector_wired.append(False)
- event[2][8]=False # So set delivery to False!!
- else:
- selector_wireless.append(True)
- selector_wired.append(False)
- # Call the sender/receiver callbacks
- self.notify_node_plugins(self.nodes[int(event[2][1])], "on_communication_end", event)
- self.notify_node_plugins(self.nodes[int(event[2][0])], "on_communication_end", event)
- else:
- selector_wireless.append(False)
- selector_wired.append(False)
- # Build the set of senders to notify (only in wired connections,
- # indeed IRL, in wireless communications, sender would send all its data)
- senders_to_notify=set()
- for event in self.events[selector_wired]:
- senders_to_notify.add(int(event[2][0]))
- # Remove communications from the event list
- if(len(self.events) != 0):
- self.events=self.events[~(np.array(selector_wireless)|np.array(selector_wired))]
# Update node state after turning off
node["state"]="running"
node.rqueue.put(("turn_off",RCode.SUCCESS))
self.log("Turned off",node=node.node_id)
- # Informed senders of wired events that communication ended
- for sender_id in senders_to_notify:
- # Notify sender (node that wired sharing is updated in the send_cancel request)
- sender_node=self.nodes[sender_id]
- sender_node["state"]="running"
- sender_node.rqueue.put(("send_cancel",RCode.RECEIVER_TURNED_OFF))
- # The node should resume at current self.time. So, sync the sender now:
- self.sync_node_non_blocking(sender_node)
- self.sync_node_blocking(sender_node)
+ # We cancel communication after node has turned off
+ self.cancel_communications(node.node_id,reason=RCode.RECEIVER_TURNED_OFF)
elif node["request"] == "send_cancel":
- selector=list()
- sharing_to_update=list()
- for event in self.events:
- if event[0]==0 and int(event[2][0]) == node.node_id:
- selector.append(True)
- if self.netmat[event[2][2]]["is_wired"]:
- sharing_to_update.append((int(event[2][1]),event[2][2]))
- self.notify_node_plugins(node, "on_communication_end", event)
- elif int(event[2][0]) == int(event[2][1]): # If it is the sender event of a wireless communication (when sender_id==receiver_id)
- self.notify_node_plugins(node, "on_communication_end", event)
- else:
- selector.append(False)
- self.events=self.events[~np.array(selector)]
- # Now Update receiver of cancel communication sharing (since update_sharing sort event, selector would have been invalidated if done before)
- for com in sharing_to_update:
- self.update_sharing(com[0],-1,com[1])
+ self.cancel_communications(node.node_id)
node["state"]="running"
node.rqueue.put(("send_cancel",RCode.SUCCESS))
node.sync()
+ def cancel_communications(self, node_id, reason=RCode.UNKNOWN):
+ if(len(self.events) == 0):
+ return
+ # Build list of impacted events
+ selector=list()
+ for event in self.events:
+ if event[0]==0:
+ src_id,dst_id,interface, data, datasize,duration,datasize_remaining,start_at,perform_delivery,receiver_required=event[2]
+ is_wired=self.netmat[interface]["is_wired"]
+ is_wireless=not is_wired
+ if src_id == node_id:
+ selector.append(True)
+ elif dst_id == node_id:
+ if is_wireless:
+ selector.append(True)
+ else:
+ if receiver_required:
+ selector.append(True)
+ else:
+ selector.append(False)
+ event[2][8]=False # So set delivery to False!!
+ else:
+ selector.append(False)
+ else:
+ selector.append(False)
+ # Update sharing of wired communications and build sender to notify set
+ senders_to_notify=set()
+ for event in self.events[selector]:
+ src_id,dst_id,interface, data, datasize,duration,datasize_remaining,start_at,perform_delivery,receiver_required=event[2]
+ if self.netmat[interface]["is_wired"]:
+ # If node is sender
+ if src_id == node_id:
+ self.update_sharing(dst_id,-1,interface)
+ else:
+ self.update_sharing(node_id,-1,interface)
+ senders_to_notify.add(src_id) # We do not notify sender here since it may change the event list (invalidate selector)
+ # Notify plugins
+ for event in self.events[selector]:
+ src_id,dst_id,interface, data, datasize,duration,datasize_remaining,start_at,perform_delivery,receiver_required=event[2]
+ if self.netmat[interface]["is_wired"]:
+ self.notify_node_plugins(self.nodes[src_id], "on_communication_end", event)
+ self.notify_node_plugins(self.nodes[dst_id], "on_communication_end", event)
+ elif src_id == dst_id:
+ self.notify_node_plugins(self.nodes[src_id], "on_communication_end", event)
+ else:
+ self.notify_node_plugins(self.nodes[dst_id], "on_communication_end", event)
+ # Delete related events
+ self.events=self.events[~(np.array(selector))]
+ # Notify sender at the end to not corrupt the event list and invalidate selector
+ for sender in senders_to_notify:
+ # Notify sender (node that wired sharing is updated in the send_cancel request)
+ sender_node=self.nodes[sender]
+ sender_node["state"]="running"
+ sender_node.rqueue.put(("send_cancel",reason))
+ # The node should resume at current self.time. So, sync the sender now:
+ self.sync_node_non_blocking(sender_node)
+ self.sync_node_blocking(sender_node)
+
+
def update_sharing(self, dst, amount,interface):
"""
Manage bandwidth sharing on wired interfaces