summaryrefslogtreecommitdiff
path: root/esds/simulator.py
diff options
context:
space:
mode:
Diffstat (limited to 'esds/simulator.py')
-rw-r--r--esds/simulator.py71
1 files changed, 32 insertions, 39 deletions
diff --git a/esds/simulator.py b/esds/simulator.py
index cdc85d3..fcc759b 100644
--- a/esds/simulator.py
+++ b/esds/simulator.py
@@ -9,7 +9,7 @@ class Simulator:
Flow-Level Discrete Event Simulator for Cyber-Physical Systems
The general format for an event is (type,timestamp,event,priority)
Event types:
- - 0 send (0,timestamp,(src,dst,interface,data,datasize,duration,datasize_remaining,start_timestamp, perform_delivery, receiver_required), 2)
+ - 0 send (0,timestamp,(src,dst,interface,data,datasize,duration,datasize_remaining,start_timestamp, perform_delivery, receiver_required, RCode), 2)
- 1 timeout (1,timestamp,node_id,3)
- 2 breakpoint_manual (2,timestamp,0,1)
- 3 breakpoint_auto (3,timestamp,0,1)
@@ -57,7 +57,7 @@ class Simulator:
if int(event[0]) == 0:
cur_event=event[2]
ts=float(event[1])
- src_id,dst_id,interface, data, datasize,duration, datasize_remaining,start_at,perform_delivery,receiver_required=cur_event
+ src_id,dst_id,interface, data, datasize,duration, datasize_remaining,start_at,perform_delivery,receiver_required,rcode=cur_event
new_bw=netmat[interface]["bandwidth"][int(src_id),int(dst_id)]
old_bw=self.netmat[interface]["bandwidth"][int(src_id),int(dst_id)]
new_lat=netmat[interface]["latency"][int(src_id),int(dst_id)]
@@ -185,7 +185,7 @@ class Simulator:
selector=list()
for event in self.events:
if event[0]==0:
- src_id,dst_id,interface, data, datasize,duration,datasize_remaining,start_at,perform_delivery,receiver_required=event[2]
+ src_id,dst_id,interface, data, datasize,duration,datasize_remaining,start_at,perform_delivery,receiver_required,rcode=event[2]
is_wired=self.netmat[interface]["is_wired"]
is_wireless=not is_wired
if src_id == node_id:
@@ -206,7 +206,7 @@ class Simulator:
# Update sharing of wired communications and build sender to notify set
senders_to_notify=set()
for event in self.events[selector]:
- src_id,dst_id,interface, data, datasize,duration,datasize_remaining,start_at,perform_delivery,receiver_required=event[2]
+ src_id,dst_id,interface, data, datasize,duration,datasize_remaining,start_at,perform_delivery,receiver_required,rcode=event[2]
if self.netmat[interface]["is_wired"]:
# If node is sender
if src_id == node_id:
@@ -216,7 +216,7 @@ class Simulator:
senders_to_notify.add(src_id) # We do not notify sender here since it may change the event list (invalidate selector)
# Notify plugins
for event in self.events[selector]:
- src_id,dst_id,interface, data, datasize,duration,datasize_remaining,start_at,perform_delivery,receiver_required=event[2]
+ src_id,dst_id,interface, data, datasize,duration,datasize_remaining,start_at,perform_delivery,receiver_required,rcode=event[2]
if self.netmat[interface]["is_wired"]:
self.notify_node_plugins(self.nodes[src_id], "on_communication_end", event)
self.notify_node_plugins(self.nodes[dst_id], "on_communication_end", event)
@@ -261,32 +261,21 @@ class Simulator:
new events and existing events.
"""
status=False
- selector=list()
- notify=set()
for event in self.events:
event_type=event[0]
com=event[2]
if event_type==0 and com[2] == interface:
com_sender=int(com[0])
com_receiver=int(com[1])
- select=False
- if receiver==com_sender:
- status=True
- notify.add(receiver)
- elif receiver==com_receiver:
+ # All cases where interferences occurs:
+ receiver_is_sending=(receiver==com_sender)
+ receiver_is_receiving=(receiver==com_receiver and sender!=com_sender) # We check also if its not our own communication
+ sender_is_receiving=(sender==com_receiver and com_sender!=com_sender) # We check also if its not our own communication
+ # Apply rules:
+ if receiver_is_sending or receiver_is_receiving or sender_is_receiving:
status=True
- select=True
- notify.add(receiver)
- if sender==com_receiver and com_sender != com_receiver:
- select=True
- notify.add(sender)
- selector.append(select)
- else:
- selector.append(False)
- if len(selector) != 0:
- self.events=self.events[~np.array(selector)]
- for node in notify:
- self.log("Interferences on "+interface,node=self.nodes[node])
+ if com_sender != com_receiver:
+ event[2][10]=RCode.INTERFERENCES # Tell the sender/receiver interferences occured
return status
def sync_node_blocking(self, node):
@@ -301,9 +290,10 @@ class Simulator:
if not (dst >=0 and dst <=len(self.nodes)):
self.log("Invalid dst used in send() or sendt(), node "+str(dst)+" not found", node=node)
exit(1)
- if not self.communicate(interface, node.node_id, dst, data, datasize,receiver_required):
+ code=self.communicate(interface, node.node_id, dst, data, datasize, receiver_required)
+ if code!=RCode.SUCCESS:
node["state"]="running"
- node.rqueue.put(("send",RCode.RECEIVER_UNAVAILABLE))
+ node.rqueue.put(("send",code))
# Do not forget to collect the next event (since current event did not happend)
# Be careful in node implementation to have no infinite loop when receiver_required=True
self.sync_node_non_blocking(node)
@@ -336,11 +326,11 @@ class Simulator:
exit(1)
self.log("Send "+str(datasize)+" bytes to n"+str(dst)+" on "+interface,node=nsrc)
if not self.nodes[dst]["turned_on"] and receiver_required:
- return(False)
+ return(RCode.RECEIVER_TURNED_OFF)
self.update_sharing(dst,1,interface) # Update sharing first
# Note that in the following we send more data than expected to handle bandwidth sharing (datasize*8*sharing):
duration=datasize*8/(self.netmat[interface]["bandwidth"][src,dst]/self.sharing[interface][dst])+self.netmat[interface]["latency"][src,dst]
- self.add_event(0,duration+self.time,(src,dst,interface,data,datasize,duration,datasize,self.time,self.nodes[dst]["turned_on"],receiver_required))
+ self.add_event(0,duration+self.time,(src,dst,interface,data,datasize,duration,datasize,self.time,self.nodes[dst]["turned_on"],receiver_required,RCode.SUCCESS))
else:
self.log("Send "+str(datasize)+" bytes on "+interface,node=nsrc)
for dst in self.list_receivers(nsrc,interface):
@@ -349,12 +339,12 @@ class Simulator:
if src == dst:
# This event (where src == dst) is used to notify the sender when data is received!
# Correspond to the diagonal of the network matrices (bandwidth and latency)
- self.add_event(0,duration+self.time,(src,dst,interface,data,datasize,duration,datasize,self.time,True,False))
- elif not self.interferences:
- self.add_event(0,duration+self.time,(src,dst,interface,data,datasize,duration,datasize,self.time,True,False))
- elif not self.handle_interferences(src,dst, interface):
- self.add_event(0,duration+self.time,(src,dst,interface,data,datasize,duration,datasize,self.time,True,False))
- return(True)
+ self.add_event(0,duration+self.time,(src,dst,interface,data,datasize,duration,datasize,self.time,True,False,RCode.SUCCESS))
+ else:
+ self.add_event(0,duration+self.time,(src,dst,interface,data,datasize,duration,datasize,self.time,True,False,RCode.SUCCESS))
+ if self.interferences:
+ self.handle_interferences(src,dst, interface)
+ return(RCode.SUCCESS)
def list_receivers(self,node,interface):
"""
Deduce reachable receivers from the bandwidth matrix (sender is included in the list!)
@@ -422,12 +412,12 @@ class Simulator:
content=event[2] # Event content
self.events=np.delete(self.events,0,0) # Consume events NOW! not at the end of the loop (event list may change in between)
if event_type == 0:
- src_id,dst_id,interface, data, datasize,duration,datasize_remaining,start_at,perform_delivery,receiver_required=content
+ src_id,dst_id,interface, data, datasize,duration,datasize_remaining,start_at,perform_delivery,receiver_required,rcode=content
src=self.nodes[int(src_id)]
dst=self.nodes[int(dst_id)]
if self.netmat[interface]["is_wired"]:
if perform_delivery:
- dst["interfaces"][interface].put((data,start_at,self.time))
+ dst["interfaces"][interface].put((data,start_at,self.time,rcode))
dst["interfaces_queue_size"][interface]+=1
self.log("Receive "+str(datasize)+" bytes on "+interface,node=dst)
# If node is receiving makes it consume (this way if there is a timeout, it will be removed!)
@@ -446,9 +436,12 @@ class Simulator:
else:
if src.node_id != dst.node_id:
if perform_delivery:
- dst["interfaces"][interface].put((data,start_at,self.time))
+ dst["interfaces"][interface].put((data,start_at,self.time,rcode))
dst["interfaces_queue_size"][interface]+=1
- self.log("Receive "+str(datasize)+" bytes on "+interface,node=dst)
+ if rcode==RCode.SUCCESS:
+ self.log("Receive "+str(datasize)+" bytes on "+interface,node=dst)
+ else:
+ self.log("Receive "+str(datasize)+" bytes on "+interface+" with errors",node=dst)
# If node is receiving makes it consume (this way if there is a timeout, it will be removed!)
if dst["state"] == "call_blocking" and dst["request"] == "receive":
dst["interfaces_queue_size"][interface]-=1
@@ -458,7 +451,7 @@ class Simulator:
self.notify_node_plugins(dst, "on_communication_end", event)
else:
src["state"]="running"
- src.rqueue.put(("send",RCode.SUCCESS))
+ src.rqueue.put(("send",rcode))
self.sync_node_non_blocking(src,timeout_remove_only=True)
self.notify_node_plugins(src, "on_communication_end", event)
elif event_type == 1: # Timeout