diff options
Diffstat (limited to 'esds/simulator.py')
| -rw-r--r-- | esds/simulator.py | 71 |
1 files changed, 32 insertions, 39 deletions
diff --git a/esds/simulator.py b/esds/simulator.py index cdc85d3..fcc759b 100644 --- a/esds/simulator.py +++ b/esds/simulator.py @@ -9,7 +9,7 @@ class Simulator: Flow-Level Discrete Event Simulator for Cyber-Physical Systems The general format for an event is (type,timestamp,event,priority) Event types: - - 0 send (0,timestamp,(src,dst,interface,data,datasize,duration,datasize_remaining,start_timestamp, perform_delivery, receiver_required), 2) + - 0 send (0,timestamp,(src,dst,interface,data,datasize,duration,datasize_remaining,start_timestamp, perform_delivery, receiver_required, RCode), 2) - 1 timeout (1,timestamp,node_id,3) - 2 breakpoint_manual (2,timestamp,0,1) - 3 breakpoint_auto (3,timestamp,0,1) @@ -57,7 +57,7 @@ class Simulator: if int(event[0]) == 0: cur_event=event[2] ts=float(event[1]) - src_id,dst_id,interface, data, datasize,duration, datasize_remaining,start_at,perform_delivery,receiver_required=cur_event + src_id,dst_id,interface, data, datasize,duration, datasize_remaining,start_at,perform_delivery,receiver_required,rcode=cur_event new_bw=netmat[interface]["bandwidth"][int(src_id),int(dst_id)] old_bw=self.netmat[interface]["bandwidth"][int(src_id),int(dst_id)] new_lat=netmat[interface]["latency"][int(src_id),int(dst_id)] @@ -185,7 +185,7 @@ class Simulator: selector=list() for event in self.events: if event[0]==0: - src_id,dst_id,interface, data, datasize,duration,datasize_remaining,start_at,perform_delivery,receiver_required=event[2] + src_id,dst_id,interface, data, datasize,duration,datasize_remaining,start_at,perform_delivery,receiver_required,rcode=event[2] is_wired=self.netmat[interface]["is_wired"] is_wireless=not is_wired if src_id == node_id: @@ -206,7 +206,7 @@ class Simulator: # Update sharing of wired communications and build sender to notify set senders_to_notify=set() for event in self.events[selector]: - src_id,dst_id,interface, data, datasize,duration,datasize_remaining,start_at,perform_delivery,receiver_required=event[2] + src_id,dst_id,interface, data, datasize,duration,datasize_remaining,start_at,perform_delivery,receiver_required,rcode=event[2] if self.netmat[interface]["is_wired"]: # If node is sender if src_id == node_id: @@ -216,7 +216,7 @@ class Simulator: senders_to_notify.add(src_id) # We do not notify sender here since it may change the event list (invalidate selector) # Notify plugins for event in self.events[selector]: - src_id,dst_id,interface, data, datasize,duration,datasize_remaining,start_at,perform_delivery,receiver_required=event[2] + src_id,dst_id,interface, data, datasize,duration,datasize_remaining,start_at,perform_delivery,receiver_required,rcode=event[2] if self.netmat[interface]["is_wired"]: self.notify_node_plugins(self.nodes[src_id], "on_communication_end", event) self.notify_node_plugins(self.nodes[dst_id], "on_communication_end", event) @@ -261,32 +261,21 @@ class Simulator: new events and existing events. """ status=False - selector=list() - notify=set() for event in self.events: event_type=event[0] com=event[2] if event_type==0 and com[2] == interface: com_sender=int(com[0]) com_receiver=int(com[1]) - select=False - if receiver==com_sender: - status=True - notify.add(receiver) - elif receiver==com_receiver: + # All cases where interferences occurs: + receiver_is_sending=(receiver==com_sender) + receiver_is_receiving=(receiver==com_receiver and sender!=com_sender) # We check also if its not our own communication + sender_is_receiving=(sender==com_receiver and com_sender!=com_sender) # We check also if its not our own communication + # Apply rules: + if receiver_is_sending or receiver_is_receiving or sender_is_receiving: status=True - select=True - notify.add(receiver) - if sender==com_receiver and com_sender != com_receiver: - select=True - notify.add(sender) - selector.append(select) - else: - selector.append(False) - if len(selector) != 0: - self.events=self.events[~np.array(selector)] - for node in notify: - self.log("Interferences on "+interface,node=self.nodes[node]) + if com_sender != com_receiver: + event[2][10]=RCode.INTERFERENCES # Tell the sender/receiver interferences occured return status def sync_node_blocking(self, node): @@ -301,9 +290,10 @@ class Simulator: if not (dst >=0 and dst <=len(self.nodes)): self.log("Invalid dst used in send() or sendt(), node "+str(dst)+" not found", node=node) exit(1) - if not self.communicate(interface, node.node_id, dst, data, datasize,receiver_required): + code=self.communicate(interface, node.node_id, dst, data, datasize, receiver_required) + if code!=RCode.SUCCESS: node["state"]="running" - node.rqueue.put(("send",RCode.RECEIVER_UNAVAILABLE)) + node.rqueue.put(("send",code)) # Do not forget to collect the next event (since current event did not happend) # Be careful in node implementation to have no infinite loop when receiver_required=True self.sync_node_non_blocking(node) @@ -336,11 +326,11 @@ class Simulator: exit(1) self.log("Send "+str(datasize)+" bytes to n"+str(dst)+" on "+interface,node=nsrc) if not self.nodes[dst]["turned_on"] and receiver_required: - return(False) + return(RCode.RECEIVER_TURNED_OFF) self.update_sharing(dst,1,interface) # Update sharing first # Note that in the following we send more data than expected to handle bandwidth sharing (datasize*8*sharing): duration=datasize*8/(self.netmat[interface]["bandwidth"][src,dst]/self.sharing[interface][dst])+self.netmat[interface]["latency"][src,dst] - self.add_event(0,duration+self.time,(src,dst,interface,data,datasize,duration,datasize,self.time,self.nodes[dst]["turned_on"],receiver_required)) + self.add_event(0,duration+self.time,(src,dst,interface,data,datasize,duration,datasize,self.time,self.nodes[dst]["turned_on"],receiver_required,RCode.SUCCESS)) else: self.log("Send "+str(datasize)+" bytes on "+interface,node=nsrc) for dst in self.list_receivers(nsrc,interface): @@ -349,12 +339,12 @@ class Simulator: if src == dst: # This event (where src == dst) is used to notify the sender when data is received! # Correspond to the diagonal of the network matrices (bandwidth and latency) - self.add_event(0,duration+self.time,(src,dst,interface,data,datasize,duration,datasize,self.time,True,False)) - elif not self.interferences: - self.add_event(0,duration+self.time,(src,dst,interface,data,datasize,duration,datasize,self.time,True,False)) - elif not self.handle_interferences(src,dst, interface): - self.add_event(0,duration+self.time,(src,dst,interface,data,datasize,duration,datasize,self.time,True,False)) - return(True) + self.add_event(0,duration+self.time,(src,dst,interface,data,datasize,duration,datasize,self.time,True,False,RCode.SUCCESS)) + else: + self.add_event(0,duration+self.time,(src,dst,interface,data,datasize,duration,datasize,self.time,True,False,RCode.SUCCESS)) + if self.interferences: + self.handle_interferences(src,dst, interface) + return(RCode.SUCCESS) def list_receivers(self,node,interface): """ Deduce reachable receivers from the bandwidth matrix (sender is included in the list!) @@ -422,12 +412,12 @@ class Simulator: content=event[2] # Event content self.events=np.delete(self.events,0,0) # Consume events NOW! not at the end of the loop (event list may change in between) if event_type == 0: - src_id,dst_id,interface, data, datasize,duration,datasize_remaining,start_at,perform_delivery,receiver_required=content + src_id,dst_id,interface, data, datasize,duration,datasize_remaining,start_at,perform_delivery,receiver_required,rcode=content src=self.nodes[int(src_id)] dst=self.nodes[int(dst_id)] if self.netmat[interface]["is_wired"]: if perform_delivery: - dst["interfaces"][interface].put((data,start_at,self.time)) + dst["interfaces"][interface].put((data,start_at,self.time,rcode)) dst["interfaces_queue_size"][interface]+=1 self.log("Receive "+str(datasize)+" bytes on "+interface,node=dst) # If node is receiving makes it consume (this way if there is a timeout, it will be removed!) @@ -446,9 +436,12 @@ class Simulator: else: if src.node_id != dst.node_id: if perform_delivery: - dst["interfaces"][interface].put((data,start_at,self.time)) + dst["interfaces"][interface].put((data,start_at,self.time,rcode)) dst["interfaces_queue_size"][interface]+=1 - self.log("Receive "+str(datasize)+" bytes on "+interface,node=dst) + if rcode==RCode.SUCCESS: + self.log("Receive "+str(datasize)+" bytes on "+interface,node=dst) + else: + self.log("Receive "+str(datasize)+" bytes on "+interface+" with errors",node=dst) # If node is receiving makes it consume (this way if there is a timeout, it will be removed!) if dst["state"] == "call_blocking" and dst["request"] == "receive": dst["interfaces_queue_size"][interface]-=1 @@ -458,7 +451,7 @@ class Simulator: self.notify_node_plugins(dst, "on_communication_end", event) else: src["state"]="running" - src.rqueue.put(("send",RCode.SUCCESS)) + src.rqueue.put(("send",rcode)) self.sync_node_non_blocking(src,timeout_remove_only=True) self.notify_node_plugins(src, "on_communication_end", event) elif event_type == 1: # Timeout |
