summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLoic Guegan <manzerbredes@mailbox.org>2022-07-01 10:03:54 +0200
committerLoic Guegan <manzerbredes@mailbox.org>2022-07-01 10:03:54 +0200
commit1eee1ac81a0ef7f86c7acc2d431b0ac084aebf14 (patch)
tree7c96d65453044eaf4edb1db551181b79cfeaae71
parent48ca1f43f20518949bacd86a59c6cbc38e4a7408 (diff)
Change API. Indeed, currently wired communications
are aborted when receiver node turned off. This may not be desire when implementing UDP communications. Now user can use a boolean when using send()/sendt() to change this behavior.
-rw-r--r--esds/node.py8
-rw-r--r--esds/simulator.py58
-rw-r--r--tests/api_send_eth0_1s1r/out6
-rw-r--r--tests/api_send_eth0_1s1r/receiver.py8
-rw-r--r--tests/api_send_eth0_1s1r/sender.py1
5 files changed, 49 insertions, 32 deletions
diff --git a/esds/node.py b/esds/node.py
index 4c9ca5f..09569a0 100644
--- a/esds/node.py
+++ b/esds/node.py
@@ -100,7 +100,7 @@ class Node:
self["state"]="call_non_blocking"
self.wait_ack(["turn_on"])
- def send(self, interface, data, datasize, dst):
+ def send(self, interface, data, datasize, dst,cancel_on_turn_off=True):
if interface not in self["interfaces"]:
self.abort("send() called with an unknown interface \""+interface+"\"")
elif type(datasize) != int and type(datasize) != float:
@@ -110,14 +110,14 @@ class Node:
elif not self["turned_on"]:
self.abort("send() called while node is turned off")
self.plugin_notify("send_call",(interface,data,datasize,dst))
- self.rargs=(interface, data, datasize, dst)
+ self.rargs=(interface, data, datasize, dst,cancel_on_turn_off)
self["request"]="send"
self["state"]="call_blocking"
ack=self.wait_ack(["send","send_cancel"])
self.plugin_notify("send_return",(interface,data,datasize,dst,ack[1]))
return ack[1]
- def sendt(self, interface, data, datasize, dst, timeout):
+ def sendt(self, interface, data, datasize, dst, timeout,cancel_on_turn_off=True):
if interface not in self["interfaces"]:
self.abort("sendt() called with an unknown interface \""+interface+"\"")
elif type(datasize) != int and type(datasize) != float:
@@ -134,7 +134,7 @@ class Node:
self["request"]="timeout_add"
self["state"]="call_non_blocking"
self.wait_ack(["timeout_add"])
- self.rargs=(interface, data, datasize, dst)
+ self.rargs=(interface, data, datasize, dst,cancel_on_turn_off)
self["request"]="send"
self["state"]="call_blocking"
ack=self.wait_ack(["send","timeout","send_cancel"])
diff --git a/esds/simulator.py b/esds/simulator.py
index 8092b37..052b704 100644
--- a/esds/simulator.py
+++ b/esds/simulator.py
@@ -7,7 +7,7 @@ class Simulator:
Flow-Level Discrete Event Simulator for Cyber-Physical Systems
The general format for an event is (type,timestamp,event,priority)
Event types:
- - 0 send (0,timestamp,(src,dst,interface,data,datasize,duration,datasize_remaining,start_timestamp), 2)
+ - 0 send (0,timestamp,(src,dst,interface,data,datasize,duration,datasize_remaining,start_timestamp, perform_delivery,cancel_on_turn_off), 2)
- 1 timeout (1,timestamp,node_id,3)
- 2 breakpoint_manual (3,timestamp,0,1)
- 3 breakpoint_auto (4,timestamp,0,1)
@@ -54,7 +54,7 @@ class Simulator:
if int(event[0]) == 0:
cur_event=event[2]
ts=float(event[1])
- src_id,dst_id,interface, data, datasize,duration, datasize_remaining,start_at=cur_event
+ src_id,dst_id,interface, data, datasize,duration, datasize_remaining,start_at,perform_delivery,cancel_on_turn_off=cur_event
new_bw=netmat[interface]["bandwidth"][int(src_id),int(dst_id)]
old_bw=self.netmat[interface]["bandwidth"][int(src_id),int(dst_id)]
new_lat=netmat[interface]["latency"][int(src_id),int(dst_id)]
@@ -197,7 +197,11 @@ class Simulator:
if event[0]==0 and int(event[2][1])==node.node_id:
if self.netmat[event[2][2]]["is_wired"]:
selector_wireless.append(False)
- selector_wired.append(True)
+ if event[2][9]: # Check if should be cancel on turn_off
+ selector_wired.append(True)
+ else:
+ selector_wired.append(False)
+ event[2][8]=False # So set delivery to False!!
else:
selector_wireless.append(True)
selector_wired.append(False)
@@ -299,12 +303,12 @@ class Simulator:
if node["state"] == "call_blocking":
if node["request"] == "send":
node["state"]="pending"
- interface, data, datasize, dst=node.rargs
+ interface, data, datasize, dst,cancel_on_turn_off=node.rargs
if dst != None:
if not (dst >=0 and dst <=len(self.nodes)):
self.log("Invalid dst used in send() or sendt(), node "+str(dst)+" not found", node=node.node_id)
exit(1)
- self.communicate(interface, node.node_id, dst, data, datasize)
+ self.communicate(interface, node.node_id, dst, data, datasize,cancel_on_turn_off)
elif node["request"] == "receive":
interface=node.rargs
if node["interfaces_queue_size"][interface] > 0:
@@ -319,7 +323,7 @@ class Simulator:
node.rqueue.put(("wait_end",0))
self.wait_end_nodes.append(node.node_id)
- def communicate(self, interface, src, dst, data, datasize):
+ def communicate(self, interface, src, dst, data, datasize, cancel_on_turn_off=True):
"""
Create communication event between src and dst
"""
@@ -330,7 +334,7 @@ class Simulator:
self.update_sharing(dst,1,interface) # Update sharing first
# Note that in the following we send more data than expected to handle bandwidth sharing (datasize*8*sharing):
duration=datasize*8/(self.netmat[interface]["bandwidth"][src,dst]/self.sharing[interface][dst])+self.netmat[interface]["latency"][src,dst]
- self.add_event(0,duration+self.time,(src,dst,interface,data,datasize,duration,datasize,self.time))
+ self.add_event(0,duration+self.time,(src,dst,interface,data,datasize,duration,datasize,self.time,True,cancel_on_turn_off))
else:
nsrc["state"]="call_blocking" # Try later when node is on
else:
@@ -341,11 +345,11 @@ class Simulator:
if src == dst:
# This event (where src == dst) is used to notify the sender when data is received!
# Correspond to the diagonal of the network matrices (bandwidth and latency)
- self.add_event(0,duration+self.time,(src,dst,interface,data,datasize,duration,datasize,self.time))
+ self.add_event(0,duration+self.time,(src,dst,interface,data,datasize,duration,datasize,self.time,True,cancel_on_turn_off))
elif not self.interferences:
- self.add_event(0,duration+self.time,(src,dst,interface,data,datasize,duration,datasize,self.time))
+ self.add_event(0,duration+self.time,(src,dst,interface,data,datasize,duration,datasize,self.time,True,cancel_on_turn_off))
elif not self.handle_interferences(src,dst, interface):
- self.add_event(0,duration+self.time,(src,dst,interface,data,datasize,duration,datasize,self.time))
+ self.add_event(0,duration+self.time,(src,dst,interface,data,datasize,duration,datasize,self.time,True,cancel_on_turn_off))
def list_receivers(self,node,interface):
"""
@@ -412,27 +416,14 @@ class Simulator:
event=self.events[0,2]
self.events=np.delete(self.events,0,0) # Consume events NOW! not at the end of the loop (event list may change in between)
if event_type == 0:
- src_id,dst_id,interface, data, datasize,duration,datasize_remaining,start_at=event
+ src_id,dst_id,interface, data, datasize,duration,datasize_remaining,start_at,perform_delivery,cancel_on_turn_off=event
src=self.nodes[int(src_id)]
dst=self.nodes[int(dst_id)]
if self.netmat[interface]["is_wired"]:
- dst["interfaces"][interface].put((data,start_at,self.time))
- dst["interfaces_queue_size"][interface]+=1
- self.update_sharing(dst.node_id,-1,interface)
- self.log("Receive "+str(datasize)+" bytes on "+interface,node=int(dst_id))
- # If node is receiving makes it consume (this way if there is a timeout, it will be removed!)
- if dst["state"] == "call_blocking" and dst["request"] == "receive":
- dst["interfaces_queue_size"][interface]-=1
- dst["state"]="running"
- dst.rqueue.put(("receive",0))
- self.sync_node_non_blocking(dst,timeout_remove_only=True)
- src["state"]="running"
- src.rqueue.put(("send",0))
- self.sync_node_non_blocking(src,timeout_remove_only=True)
- else:
- if src.node_id != dst.node_id:
+ if perform_delivery:
dst["interfaces"][interface].put((data,start_at,self.time))
dst["interfaces_queue_size"][interface]+=1
+ self.update_sharing(dst.node_id,-1,interface)
self.log("Receive "+str(datasize)+" bytes on "+interface,node=int(dst_id))
# If node is receiving makes it consume (this way if there is a timeout, it will be removed!)
if dst["state"] == "call_blocking" and dst["request"] == "receive":
@@ -440,6 +431,21 @@ class Simulator:
dst["state"]="running"
dst.rqueue.put(("receive",0))
self.sync_node_non_blocking(dst,timeout_remove_only=True)
+ src["state"]="running"
+ src.rqueue.put(("send",0))
+ self.sync_node_non_blocking(src,timeout_remove_only=True)
+ else:
+ if src.node_id != dst.node_id:
+ if perform_delivery:
+ dst["interfaces"][interface].put((data,start_at,self.time))
+ dst["interfaces_queue_size"][interface]+=1
+ self.log("Receive "+str(datasize)+" bytes on "+interface,node=int(dst_id))
+ # If node is receiving makes it consume (this way if there is a timeout, it will be removed!)
+ if dst["state"] == "call_blocking" and dst["request"] == "receive":
+ dst["interfaces_queue_size"][interface]-=1
+ dst["state"]="running"
+ dst.rqueue.put(("receive",0))
+ self.sync_node_non_blocking(dst,timeout_remove_only=True)
else:
src["state"]="running"
src.rqueue.put(("send",0))
diff --git a/tests/api_send_eth0_1s1r/out b/tests/api_send_eth0_1s1r/out
index 1634d95..d331337 100644
--- a/tests/api_send_eth0_1s1r/out
+++ b/tests/api_send_eth0_1s1r/out
@@ -7,5 +7,9 @@
[t=3.000,src=n1] Received: Hello World!
[t=3.000,src=n1] Turned off
[t=4.000,src=n1] Turned on
+[t=4.000,src=n0] Send 10 bytes to n1 on eth0
[t=5.000,src=n1] Receive failed code=-1
-[t=5.000,src=esds] Simulation ends
+[t=5.000,src=n1] Turned off
+[t=6.000,src=n1] Turned on
+[t=7.000,src=n1] Receive failed code=-1
+[t=14.000,src=esds] Simulation ends
diff --git a/tests/api_send_eth0_1s1r/receiver.py b/tests/api_send_eth0_1s1r/receiver.py
index 96305bf..364c24c 100644
--- a/tests/api_send_eth0_1s1r/receiver.py
+++ b/tests/api_send_eth0_1s1r/receiver.py
@@ -17,4 +17,10 @@ def execute(api):
code, data=api.receivet("eth0",1)
msg="Received: "+data if code == 0 else "Receive failed code="+str(code)
api.log(msg)
-
+ ##### Ensure data is not receive turned off but communication is not cancel
+ api.turn_off()
+ api.wait(1)
+ api.turn_on()
+ code, data=api.receivet("eth0",1)
+ msg="Received: "+data if code == 0 else "Receive failed code="+str(code)
+ api.log(msg)
diff --git a/tests/api_send_eth0_1s1r/sender.py b/tests/api_send_eth0_1s1r/sender.py
index 9f4aa85..a01b371 100644
--- a/tests/api_send_eth0_1s1r/sender.py
+++ b/tests/api_send_eth0_1s1r/sender.py
@@ -5,3 +5,4 @@ def execute(api):
api.send("eth0","Hello World!",1,1)
api.wait(1) # Goto 3 seconds
api.send("eth0","Hello World!",1,1)
+ api.send("eth0","Hello World!",10,1,False) # Now communication should not be aborted even if receiver turned_off (e.g UDP)