From 1eee1ac81a0ef7f86c7acc2d431b0ac084aebf14 Mon Sep 17 00:00:00 2001 From: Loic Guegan Date: Fri, 1 Jul 2022 10:03:54 +0200 Subject: Change API. Indeed, currently wired communications are aborted when receiver node turned off. This may not be desire when implementing UDP communications. Now user can use a boolean when using send()/sendt() to change this behavior. --- esds/simulator.py | 58 ++++++++++++++++++++++++++++++------------------------- 1 file changed, 32 insertions(+), 26 deletions(-) (limited to 'esds/simulator.py') diff --git a/esds/simulator.py b/esds/simulator.py index 8092b37..052b704 100644 --- a/esds/simulator.py +++ b/esds/simulator.py @@ -7,7 +7,7 @@ class Simulator: Flow-Level Discrete Event Simulator for Cyber-Physical Systems The general format for an event is (type,timestamp,event,priority) Event types: - - 0 send (0,timestamp,(src,dst,interface,data,datasize,duration,datasize_remaining,start_timestamp), 2) + - 0 send (0,timestamp,(src,dst,interface,data,datasize,duration,datasize_remaining,start_timestamp, perform_delivery,cancel_on_turn_off), 2) - 1 timeout (1,timestamp,node_id,3) - 2 breakpoint_manual (3,timestamp,0,1) - 3 breakpoint_auto (4,timestamp,0,1) @@ -54,7 +54,7 @@ class Simulator: if int(event[0]) == 0: cur_event=event[2] ts=float(event[1]) - src_id,dst_id,interface, data, datasize,duration, datasize_remaining,start_at=cur_event + src_id,dst_id,interface, data, datasize,duration, datasize_remaining,start_at,perform_delivery,cancel_on_turn_off=cur_event new_bw=netmat[interface]["bandwidth"][int(src_id),int(dst_id)] old_bw=self.netmat[interface]["bandwidth"][int(src_id),int(dst_id)] new_lat=netmat[interface]["latency"][int(src_id),int(dst_id)] @@ -197,7 +197,11 @@ class Simulator: if event[0]==0 and int(event[2][1])==node.node_id: if self.netmat[event[2][2]]["is_wired"]: selector_wireless.append(False) - selector_wired.append(True) + if event[2][9]: # Check if should be cancel on turn_off + selector_wired.append(True) + else: + selector_wired.append(False) + event[2][8]=False # So set delivery to False!! else: selector_wireless.append(True) selector_wired.append(False) @@ -299,12 +303,12 @@ class Simulator: if node["state"] == "call_blocking": if node["request"] == "send": node["state"]="pending" - interface, data, datasize, dst=node.rargs + interface, data, datasize, dst,cancel_on_turn_off=node.rargs if dst != None: if not (dst >=0 and dst <=len(self.nodes)): self.log("Invalid dst used in send() or sendt(), node "+str(dst)+" not found", node=node.node_id) exit(1) - self.communicate(interface, node.node_id, dst, data, datasize) + self.communicate(interface, node.node_id, dst, data, datasize,cancel_on_turn_off) elif node["request"] == "receive": interface=node.rargs if node["interfaces_queue_size"][interface] > 0: @@ -319,7 +323,7 @@ class Simulator: node.rqueue.put(("wait_end",0)) self.wait_end_nodes.append(node.node_id) - def communicate(self, interface, src, dst, data, datasize): + def communicate(self, interface, src, dst, data, datasize, cancel_on_turn_off=True): """ Create communication event between src and dst """ @@ -330,7 +334,7 @@ class Simulator: self.update_sharing(dst,1,interface) # Update sharing first # Note that in the following we send more data than expected to handle bandwidth sharing (datasize*8*sharing): duration=datasize*8/(self.netmat[interface]["bandwidth"][src,dst]/self.sharing[interface][dst])+self.netmat[interface]["latency"][src,dst] - self.add_event(0,duration+self.time,(src,dst,interface,data,datasize,duration,datasize,self.time)) + self.add_event(0,duration+self.time,(src,dst,interface,data,datasize,duration,datasize,self.time,True,cancel_on_turn_off)) else: nsrc["state"]="call_blocking" # Try later when node is on else: @@ -341,11 +345,11 @@ class Simulator: if src == dst: # This event (where src == dst) is used to notify the sender when data is received! # Correspond to the diagonal of the network matrices (bandwidth and latency) - self.add_event(0,duration+self.time,(src,dst,interface,data,datasize,duration,datasize,self.time)) + self.add_event(0,duration+self.time,(src,dst,interface,data,datasize,duration,datasize,self.time,True,cancel_on_turn_off)) elif not self.interferences: - self.add_event(0,duration+self.time,(src,dst,interface,data,datasize,duration,datasize,self.time)) + self.add_event(0,duration+self.time,(src,dst,interface,data,datasize,duration,datasize,self.time,True,cancel_on_turn_off)) elif not self.handle_interferences(src,dst, interface): - self.add_event(0,duration+self.time,(src,dst,interface,data,datasize,duration,datasize,self.time)) + self.add_event(0,duration+self.time,(src,dst,interface,data,datasize,duration,datasize,self.time,True,cancel_on_turn_off)) def list_receivers(self,node,interface): """ @@ -412,27 +416,14 @@ class Simulator: event=self.events[0,2] self.events=np.delete(self.events,0,0) # Consume events NOW! not at the end of the loop (event list may change in between) if event_type == 0: - src_id,dst_id,interface, data, datasize,duration,datasize_remaining,start_at=event + src_id,dst_id,interface, data, datasize,duration,datasize_remaining,start_at,perform_delivery,cancel_on_turn_off=event src=self.nodes[int(src_id)] dst=self.nodes[int(dst_id)] if self.netmat[interface]["is_wired"]: - dst["interfaces"][interface].put((data,start_at,self.time)) - dst["interfaces_queue_size"][interface]+=1 - self.update_sharing(dst.node_id,-1,interface) - self.log("Receive "+str(datasize)+" bytes on "+interface,node=int(dst_id)) - # If node is receiving makes it consume (this way if there is a timeout, it will be removed!) - if dst["state"] == "call_blocking" and dst["request"] == "receive": - dst["interfaces_queue_size"][interface]-=1 - dst["state"]="running" - dst.rqueue.put(("receive",0)) - self.sync_node_non_blocking(dst,timeout_remove_only=True) - src["state"]="running" - src.rqueue.put(("send",0)) - self.sync_node_non_blocking(src,timeout_remove_only=True) - else: - if src.node_id != dst.node_id: + if perform_delivery: dst["interfaces"][interface].put((data,start_at,self.time)) dst["interfaces_queue_size"][interface]+=1 + self.update_sharing(dst.node_id,-1,interface) self.log("Receive "+str(datasize)+" bytes on "+interface,node=int(dst_id)) # If node is receiving makes it consume (this way if there is a timeout, it will be removed!) if dst["state"] == "call_blocking" and dst["request"] == "receive": @@ -440,6 +431,21 @@ class Simulator: dst["state"]="running" dst.rqueue.put(("receive",0)) self.sync_node_non_blocking(dst,timeout_remove_only=True) + src["state"]="running" + src.rqueue.put(("send",0)) + self.sync_node_non_blocking(src,timeout_remove_only=True) + else: + if src.node_id != dst.node_id: + if perform_delivery: + dst["interfaces"][interface].put((data,start_at,self.time)) + dst["interfaces_queue_size"][interface]+=1 + self.log("Receive "+str(datasize)+" bytes on "+interface,node=int(dst_id)) + # If node is receiving makes it consume (this way if there is a timeout, it will be removed!) + if dst["state"] == "call_blocking" and dst["request"] == "receive": + dst["interfaces_queue_size"][interface]-=1 + dst["state"]="running" + dst.rqueue.put(("receive",0)) + self.sync_node_non_blocking(dst,timeout_remove_only=True) else: src["state"]="running" src.rqueue.put(("send",0)) -- cgit v1.2.3