summaryrefslogtreecommitdiff
path: root/esds/simulator.py
diff options
context:
space:
mode:
authorLoic Guegan <manzerbredes@mailbox.org>2022-07-14 14:07:52 +0200
committerLoic Guegan <manzerbredes@mailbox.org>2022-07-14 14:07:52 +0200
commite0dbebe31d4288f3636ef3871f45429b86b1d737 (patch)
tree76574caae6ba69f42e304276f09e636845b9a3e9 /esds/simulator.py
parent9671fc21eea14b6128c5f85208df4be0529c69d1 (diff)
Go back to commit 1eee1ac8
Diffstat (limited to 'esds/simulator.py')
-rw-r--r--esds/simulator.py50
1 files changed, 36 insertions, 14 deletions
diff --git a/esds/simulator.py b/esds/simulator.py
index fc76daf..1678ecc 100644
--- a/esds/simulator.py
+++ b/esds/simulator.py
@@ -7,7 +7,7 @@ class Simulator:
Flow-Level Discrete Event Simulator for Cyber-Physical Systems
The general format for an event is (type,timestamp,event,priority)
Event types:
- - 0 send (0,timestamp,(src,dst,interface,data,datasize,duration,datasize_remaining,start_timestamp, perform_delivery), 2)
+ - 0 send (0,timestamp,(src,dst,interface,data,datasize,duration,datasize_remaining,start_timestamp, perform_delivery, receiver_required), 2)
- 1 timeout (1,timestamp,node_id,3)
- 2 breakpoint_manual (3,timestamp,0,1)
- 3 breakpoint_auto (4,timestamp,0,1)
@@ -54,7 +54,7 @@ class Simulator:
if int(event[0]) == 0:
cur_event=event[2]
ts=float(event[1])
- src_id,dst_id,interface, data, datasize,duration, datasize_remaining,start_at,perform_delivery=cur_event
+ src_id,dst_id,interface, data, datasize,duration, datasize_remaining,start_at,perform_delivery,receiver_required=cur_event
new_bw=netmat[interface]["bandwidth"][int(src_id),int(dst_id)]
old_bw=self.netmat[interface]["bandwidth"][int(src_id),int(dst_id)]
new_lat=netmat[interface]["latency"][int(src_id),int(dst_id)]
@@ -192,22 +192,43 @@ class Simulator:
elif node["request"] == "turn_off":
# Create communications selectors (True/False arrays)
selector_wireless=list() # Select all wireless events where node is involved
+ selector_wired=list() # Select all wired events where node is involved
for event in self.events:
if event[0]==0 and int(event[2][1])==node.node_id:
if self.netmat[event[2][2]]["is_wired"]:
selector_wireless.append(False)
- event[2][8]=False # So set delivery to False for wired communication!!
+ if event[2][9]: # Check if should be cancel on turn_off (receiver_required)
+ selector_wired.append(True)
+ else:
+ selector_wired.append(False)
+ event[2][8]=False # So set delivery to False!!
else:
selector_wireless.append(True)
+ selector_wired.append(False)
else:
selector_wireless.append(False)
- # Remove the relevent wireless communications from the event list
+ selector_wired.append(False)
+ # Build the set of senders to notify (only in wired connections,
+ # indeed IRL, in wireless communications sender would send all its data)
+ senders_to_notify=set()
+ for event in self.events[selector_wired]:
+ senders_to_notify.add(int(event[2][0]))
+ # Remove communications from the event list
if(len(self.events) != 0):
- self.events=self.events[~np.array(selector_wireless)]
+ self.events=self.events[~(np.array(selector_wireless)|np.array(selector_wired))]
# Update node state after turning off
node["state"]="running"
node.rqueue.put(("turn_off",0))
self.log("Turned off",node=node.node_id)
+ # Informed senders of wired events that communication ended
+ for sender_id in senders_to_notify:
+ # Notify sender (node that wired sharing is updated in the send_cancel request)
+ sender_node=self.nodes[sender_id]
+ sender_node["state"]="running"
+ sender_node.rqueue.put(("send_cancel",2))
+ # The node should resume at current self.time. So, sync the sender now:
+ self.sync_node_non_blocking(sender_node)
+ self.sync_node_blocking(sender_node)
elif node["request"] == "send_cancel":
selector=list()
for event in self.events:
@@ -280,12 +301,12 @@ class Simulator:
if node["state"] == "call_blocking":
if node["request"] == "send":
node["state"]="pending"
- interface, data, datasize, dst=node.rargs
+ interface, data, datasize, dst, receiver_required=node.rargs
if dst != None:
if not (dst >=0 and dst <=len(self.nodes)):
self.log("Invalid dst used in send() or sendt(), node "+str(dst)+" not found", node=node.node_id)
exit(1)
- self.communicate(interface, node.node_id, dst, data, datasize)
+ self.communicate(interface, node.node_id, dst, data, datasize,receiver_required)
elif node["request"] == "receive":
interface=node.rargs
if node["interfaces_queue_size"][interface] > 0:
@@ -300,7 +321,7 @@ class Simulator:
node.rqueue.put(("wait_end",0))
self.wait_end_nodes.append(node.node_id)
- def communicate(self, interface, src, dst, data, datasize):
+ def communicate(self, interface, src, dst, data, datasize,receiver_required):
"""
Create communication event between src and dst
"""
@@ -310,7 +331,7 @@ class Simulator:
self.update_sharing(dst,1,interface) # Update sharing first
# Note that in the following we send more data than expected to handle bandwidth sharing (datasize*8*sharing):
duration=datasize*8/(self.netmat[interface]["bandwidth"][src,dst]/self.sharing[interface][dst])+self.netmat[interface]["latency"][src,dst]
- self.add_event(0,duration+self.time,(src,dst,interface,data,datasize,duration,datasize,self.time,self.nodes[dst]["turned_on"]))
+ self.add_event(0,duration+self.time,(src,dst,interface,data,datasize,duration,datasize,self.time,self.nodes[dst]["turned_on"],receiver_required))
else:
self.log("Send "+str(datasize)+" bytes on "+interface,node=src)
for dst in self.list_receivers(nsrc,interface):
@@ -319,11 +340,11 @@ class Simulator:
if src == dst:
# This event (where src == dst) is used to notify the sender when data is received!
# Correspond to the diagonal of the network matrices (bandwidth and latency)
- self.add_event(0,duration+self.time,(src,dst,interface,data,datasize,duration,datasize,self.time,True))
+ self.add_event(0,duration+self.time,(src,dst,interface,data,datasize,duration,datasize,self.time,True,False))
elif not self.interferences:
- self.add_event(0,duration+self.time,(src,dst,interface,data,datasize,duration,datasize,self.time,True))
+ self.add_event(0,duration+self.time,(src,dst,interface,data,datasize,duration,datasize,self.time,True,False))
elif not self.handle_interferences(src,dst, interface):
- self.add_event(0,duration+self.time,(src,dst,interface,data,datasize,duration,datasize,self.time,True))
+ self.add_event(0,duration+self.time,(src,dst,interface,data,datasize,duration,datasize,self.time,True,False))
def list_receivers(self,node,interface):
"""
@@ -390,7 +411,7 @@ class Simulator:
event=self.events[0,2]
self.events=np.delete(self.events,0,0) # Consume events NOW! not at the end of the loop (event list may change in between)
if event_type == 0:
- src_id,dst_id,interface, data, datasize,duration,datasize_remaining,start_at,perform_delivery=event
+ src_id,dst_id,interface, data, datasize,duration,datasize_remaining,start_at,perform_delivery,receiver_required=event
src=self.nodes[int(src_id)]
dst=self.nodes[int(dst_id)]
if self.netmat[interface]["is_wired"]:
@@ -406,7 +427,8 @@ class Simulator:
self.sync_node_non_blocking(dst,timeout_remove_only=True)
self.update_sharing(dst.node_id,-1,interface)
src["state"]="running"
- src.rqueue.put(("send",0))
+ code=0 if perform_delivery else 1
+ src.rqueue.put(("send",code))
self.sync_node_non_blocking(src,timeout_remove_only=True)
else:
if src.node_id != dst.node_id: