diff options
| author | Loic Guegan <manzerbredes@mailbox.org> | 2023-06-28 15:46:15 +0200 |
|---|---|---|
| committer | Loic Guegan <manzerbredes@mailbox.org> | 2023-06-28 15:46:15 +0200 |
| commit | f84359ec7a79c07bc2ae86ad81db823e1912a9e8 (patch) | |
| tree | 2c9269c65456941c55270d5a2ff110c4d472d07b /esds | |
| parent | e95d50d0ff8465988841a319efb3829109c154d1 (diff) | |
Add nodes groups features
Diffstat (limited to 'esds')
| -rw-r--r-- | esds/node.py | 3 | ||||
| -rw-r--r-- | esds/platform.py | 14 | ||||
| -rw-r--r-- | esds/simulator.py | 29 |
3 files changed, 30 insertions, 16 deletions
diff --git a/esds/node.py b/esds/node.py index 12e7a13..4bfae71 100644 --- a/esds/node.py +++ b/esds/node.py @@ -3,11 +3,12 @@ from esds.rcode import RCode class Node: available_node_id=0 - def __init__(self,src,interfaces): + def __init__(self,src,interfaces,grp): """ self.chest: contains mutex protected data """ self.node_id=Node.available_node_id + self.grp=grp # Node group Node.available_node_id+=1 # Refresh node id self.src=src # Store the node source code self.args=None # Store the node arguments (passed through Simulator.create_node() diff --git a/esds/platform.py b/esds/platform.py index 341ca13..40f8301 100644 --- a/esds/platform.py +++ b/esds/platform.py @@ -58,6 +58,7 @@ class YAMLPlatformFile: "node_count": 0, "implementations": [], "arguments": [], + "groups": dict(), "interfaces": dict() } @@ -145,6 +146,14 @@ class YAMLPlatformFile: self.default["node_count"]=nodes["count"] else: self.parsing_error("node count not provided") + if "groups" in nodes: + if type(nodes["groups"]) != list: + self.parsing_error("nodes groups should be a list") + for grp in nodes["groups"]: + words=grp.split() + r=UnitsParser.node_range(words[0],self.default["node_count"]) + for node in r: + self.default["groups"][node]=words[1] if "implementations" in nodes: if type(nodes["implementations"]) != list: self.parsing_error("nodes implementations should be a list of file path") @@ -210,7 +219,10 @@ class YAMLPlatformFile: ##### Create simulator simulator=Simulator(self.default["interfaces"]) for node_id in range(0,self.default["node_count"]): - simulator.create_node(self.default["implementations"][node_id], args=self.default["arguments"][node_id]) + if node_id in self.default["groups"]: + simulator.create_node(self.default["implementations"][node_id], args=self.default["arguments"][node_id],grp=self.default["groups"][node_id]) + else: + simulator.create_node(self.default["implementations"][node_id], args=self.default["arguments"][node_id]) ##### Run simulation simulator.run( breakpoints=self.default["breakpoints"], diff --git a/esds/simulator.py b/esds/simulator.py index 90ad427..2253a49 100644 --- a/esds/simulator.py +++ b/esds/simulator.py @@ -75,18 +75,19 @@ class Simulator: event[2][5]=new_duration self.netmat=netmat - def create_node(self, src, args=None): + def create_node(self, src, args=None, grp="def"): """ Create a node thread and run it """ - node=Node(src, self.netmat.keys()) + node=Node(src, self.netmat.keys(), grp) self.nodes.append(node) thread=threading.Thread(target=node.run,args=[args]) # There must be "daemon=True" as a parameter, but we removed it to be compatible with older version of python thread.start() def log(self,msg,node=None): - src = "esds" if node is None else "n"+str(node) - logline="[t="+str(self.time_truncated)+",src="+src+"] "+msg + logline="[t="+str(self.time_truncated)+",src=esds] "+msg + if node is not None: + logline="[t="+str(self.time_truncated)+",src=n"+str(node.node_id)+",grp="+str(node.grp)+"] "+msg if self.debug is not None: self.debug.append_log(logline) print(logline) @@ -118,7 +119,7 @@ class Simulator: break elif not timeout_remove_only: if node["request"] == "log": - self.log(node.rargs,node=node.node_id) + self.log(node.rargs,node=node) node["state"]="running" node.rqueue.put(("log",RCode.SUCCESS)) elif node["request"] == "timeout_add": @@ -140,7 +141,7 @@ class Simulator: node["state"]="running" node.rqueue.put(("notify_remove",RCode.SUCCESS)) elif node["request"] == "abort": - self.log("Simulation aborted: "+node.rargs,node=node.node_id) + self.log("Simulation aborted: "+node.rargs,node=node) exit(1) elif node["request"] == "read": node["state"]="running" @@ -159,12 +160,12 @@ class Simulator: elif node["request"] == "turn_on": node["state"]="running" node.rqueue.put(("turn_on",RCode.SUCCESS)) - self.log("Turned on",node=node.node_id) + self.log("Turned on",node=node) elif node["request"] == "turn_off": # Update node state after turning off node["state"]="running" node.rqueue.put(("turn_off",RCode.SUCCESS)) - self.log("Turned off",node=node.node_id) + self.log("Turned off",node=node) # We cancel communication after node has turned off self.cancel_communications(node.node_id,reason=RCode.RECEIVER_TURNED_OFF) elif node["request"] == "send_cancel": @@ -281,7 +282,7 @@ class Simulator: if len(selector) != 0: self.events=self.events[~np.array(selector)] for node in notify: - self.log("Interferences on "+interface,node=node) + self.log("Interferences on "+interface,node=self.nodes[node]) return status def sync_node_blocking(self, node): @@ -294,7 +295,7 @@ class Simulator: interface, data, datasize, dst, receiver_required=node.rargs if dst != None: if not (dst >=0 and dst <=len(self.nodes)): - self.log("Invalid dst used in send() or sendt(), node "+str(dst)+" not found", node=node.node_id) + self.log("Invalid dst used in send() or sendt(), node "+str(dst)+" not found", node=node) exit(1) if not self.communicate(interface, node.node_id, dst, data, datasize,receiver_required): node["state"]="running" @@ -323,7 +324,7 @@ class Simulator: """ nsrc=self.nodes[src] if self.netmat[interface]["is_wired"]: - self.log("Send "+str(datasize)+" bytes to n"+str(dst)+" on "+interface,node=src) + self.log("Send "+str(datasize)+" bytes to n"+str(dst)+" on "+interface,node=nsrc) if not self.nodes[dst]["turned_on"] and receiver_required: return(False) self.update_sharing(dst,1,interface) # Update sharing first @@ -331,7 +332,7 @@ class Simulator: duration=datasize*8/(self.netmat[interface]["bandwidth"][src,dst]/self.sharing[interface][dst])+self.netmat[interface]["latency"][src,dst] self.add_event(0,duration+self.time,(src,dst,interface,data,datasize,duration,datasize,self.time,self.nodes[dst]["turned_on"],receiver_required)) else: - self.log("Send "+str(datasize)+" bytes on "+interface,node=src) + self.log("Send "+str(datasize)+" bytes on "+interface,node=nsrc) for dst in self.list_receivers(nsrc,interface): if self.nodes[dst]["turned_on"]: duration=datasize*8/self.netmat[interface]["bandwidth"][src,dst]+self.netmat[interface]["latency"][src,dst] @@ -418,7 +419,7 @@ class Simulator: if perform_delivery: dst["interfaces"][interface].put((data,start_at,self.time)) dst["interfaces_queue_size"][interface]+=1 - self.log("Receive "+str(datasize)+" bytes on "+interface,node=int(dst_id)) + self.log("Receive "+str(datasize)+" bytes on "+interface,node=dst) # If node is receiving makes it consume (this way if there is a timeout, it will be removed!) if dst["state"] == "call_blocking" and dst["request"] == "receive": dst["interfaces_queue_size"][interface]-=1 @@ -437,7 +438,7 @@ class Simulator: if perform_delivery: dst["interfaces"][interface].put((data,start_at,self.time)) dst["interfaces_queue_size"][interface]+=1 - self.log("Receive "+str(datasize)+" bytes on "+interface,node=int(dst_id)) + self.log("Receive "+str(datasize)+" bytes on "+interface,node=dst) # If node is receiving makes it consume (this way if there is a timeout, it will be removed!) if dst["state"] == "call_blocking" and dst["request"] == "receive": dst["interfaces_queue_size"][interface]-=1 |
