summaryrefslogtreecommitdiff
path: root/esds
diff options
context:
space:
mode:
authorLoic Guegan <manzerbredes@mailbox.org>2023-06-28 15:46:15 +0200
committerLoic Guegan <manzerbredes@mailbox.org>2023-06-28 15:46:15 +0200
commitf84359ec7a79c07bc2ae86ad81db823e1912a9e8 (patch)
tree2c9269c65456941c55270d5a2ff110c4d472d07b /esds
parente95d50d0ff8465988841a319efb3829109c154d1 (diff)
Add nodes groups features
Diffstat (limited to 'esds')
-rw-r--r--esds/node.py3
-rw-r--r--esds/platform.py14
-rw-r--r--esds/simulator.py29
3 files changed, 30 insertions, 16 deletions
diff --git a/esds/node.py b/esds/node.py
index 12e7a13..4bfae71 100644
--- a/esds/node.py
+++ b/esds/node.py
@@ -3,11 +3,12 @@ from esds.rcode import RCode
class Node:
available_node_id=0
- def __init__(self,src,interfaces):
+ def __init__(self,src,interfaces,grp):
"""
self.chest: contains mutex protected data
"""
self.node_id=Node.available_node_id
+ self.grp=grp # Node group
Node.available_node_id+=1 # Refresh node id
self.src=src # Store the node source code
self.args=None # Store the node arguments (passed through Simulator.create_node()
diff --git a/esds/platform.py b/esds/platform.py
index 341ca13..40f8301 100644
--- a/esds/platform.py
+++ b/esds/platform.py
@@ -58,6 +58,7 @@ class YAMLPlatformFile:
"node_count": 0,
"implementations": [],
"arguments": [],
+ "groups": dict(),
"interfaces": dict()
}
@@ -145,6 +146,14 @@ class YAMLPlatformFile:
self.default["node_count"]=nodes["count"]
else:
self.parsing_error("node count not provided")
+ if "groups" in nodes:
+ if type(nodes["groups"]) != list:
+ self.parsing_error("nodes groups should be a list")
+ for grp in nodes["groups"]:
+ words=grp.split()
+ r=UnitsParser.node_range(words[0],self.default["node_count"])
+ for node in r:
+ self.default["groups"][node]=words[1]
if "implementations" in nodes:
if type(nodes["implementations"]) != list:
self.parsing_error("nodes implementations should be a list of file path")
@@ -210,7 +219,10 @@ class YAMLPlatformFile:
##### Create simulator
simulator=Simulator(self.default["interfaces"])
for node_id in range(0,self.default["node_count"]):
- simulator.create_node(self.default["implementations"][node_id], args=self.default["arguments"][node_id])
+ if node_id in self.default["groups"]:
+ simulator.create_node(self.default["implementations"][node_id], args=self.default["arguments"][node_id],grp=self.default["groups"][node_id])
+ else:
+ simulator.create_node(self.default["implementations"][node_id], args=self.default["arguments"][node_id])
##### Run simulation
simulator.run(
breakpoints=self.default["breakpoints"],
diff --git a/esds/simulator.py b/esds/simulator.py
index 90ad427..2253a49 100644
--- a/esds/simulator.py
+++ b/esds/simulator.py
@@ -75,18 +75,19 @@ class Simulator:
event[2][5]=new_duration
self.netmat=netmat
- def create_node(self, src, args=None):
+ def create_node(self, src, args=None, grp="def"):
"""
Create a node thread and run it
"""
- node=Node(src, self.netmat.keys())
+ node=Node(src, self.netmat.keys(), grp)
self.nodes.append(node)
thread=threading.Thread(target=node.run,args=[args]) # There must be "daemon=True" as a parameter, but we removed it to be compatible with older version of python
thread.start()
def log(self,msg,node=None):
- src = "esds" if node is None else "n"+str(node)
- logline="[t="+str(self.time_truncated)+",src="+src+"] "+msg
+ logline="[t="+str(self.time_truncated)+",src=esds] "+msg
+ if node is not None:
+ logline="[t="+str(self.time_truncated)+",src=n"+str(node.node_id)+",grp="+str(node.grp)+"] "+msg
if self.debug is not None:
self.debug.append_log(logline)
print(logline)
@@ -118,7 +119,7 @@ class Simulator:
break
elif not timeout_remove_only:
if node["request"] == "log":
- self.log(node.rargs,node=node.node_id)
+ self.log(node.rargs,node=node)
node["state"]="running"
node.rqueue.put(("log",RCode.SUCCESS))
elif node["request"] == "timeout_add":
@@ -140,7 +141,7 @@ class Simulator:
node["state"]="running"
node.rqueue.put(("notify_remove",RCode.SUCCESS))
elif node["request"] == "abort":
- self.log("Simulation aborted: "+node.rargs,node=node.node_id)
+ self.log("Simulation aborted: "+node.rargs,node=node)
exit(1)
elif node["request"] == "read":
node["state"]="running"
@@ -159,12 +160,12 @@ class Simulator:
elif node["request"] == "turn_on":
node["state"]="running"
node.rqueue.put(("turn_on",RCode.SUCCESS))
- self.log("Turned on",node=node.node_id)
+ self.log("Turned on",node=node)
elif node["request"] == "turn_off":
# Update node state after turning off
node["state"]="running"
node.rqueue.put(("turn_off",RCode.SUCCESS))
- self.log("Turned off",node=node.node_id)
+ self.log("Turned off",node=node)
# We cancel communication after node has turned off
self.cancel_communications(node.node_id,reason=RCode.RECEIVER_TURNED_OFF)
elif node["request"] == "send_cancel":
@@ -281,7 +282,7 @@ class Simulator:
if len(selector) != 0:
self.events=self.events[~np.array(selector)]
for node in notify:
- self.log("Interferences on "+interface,node=node)
+ self.log("Interferences on "+interface,node=self.nodes[node])
return status
def sync_node_blocking(self, node):
@@ -294,7 +295,7 @@ class Simulator:
interface, data, datasize, dst, receiver_required=node.rargs
if dst != None:
if not (dst >=0 and dst <=len(self.nodes)):
- self.log("Invalid dst used in send() or sendt(), node "+str(dst)+" not found", node=node.node_id)
+ self.log("Invalid dst used in send() or sendt(), node "+str(dst)+" not found", node=node)
exit(1)
if not self.communicate(interface, node.node_id, dst, data, datasize,receiver_required):
node["state"]="running"
@@ -323,7 +324,7 @@ class Simulator:
"""
nsrc=self.nodes[src]
if self.netmat[interface]["is_wired"]:
- self.log("Send "+str(datasize)+" bytes to n"+str(dst)+" on "+interface,node=src)
+ self.log("Send "+str(datasize)+" bytes to n"+str(dst)+" on "+interface,node=nsrc)
if not self.nodes[dst]["turned_on"] and receiver_required:
return(False)
self.update_sharing(dst,1,interface) # Update sharing first
@@ -331,7 +332,7 @@ class Simulator:
duration=datasize*8/(self.netmat[interface]["bandwidth"][src,dst]/self.sharing[interface][dst])+self.netmat[interface]["latency"][src,dst]
self.add_event(0,duration+self.time,(src,dst,interface,data,datasize,duration,datasize,self.time,self.nodes[dst]["turned_on"],receiver_required))
else:
- self.log("Send "+str(datasize)+" bytes on "+interface,node=src)
+ self.log("Send "+str(datasize)+" bytes on "+interface,node=nsrc)
for dst in self.list_receivers(nsrc,interface):
if self.nodes[dst]["turned_on"]:
duration=datasize*8/self.netmat[interface]["bandwidth"][src,dst]+self.netmat[interface]["latency"][src,dst]
@@ -418,7 +419,7 @@ class Simulator:
if perform_delivery:
dst["interfaces"][interface].put((data,start_at,self.time))
dst["interfaces_queue_size"][interface]+=1
- self.log("Receive "+str(datasize)+" bytes on "+interface,node=int(dst_id))
+ self.log("Receive "+str(datasize)+" bytes on "+interface,node=dst)
# If node is receiving makes it consume (this way if there is a timeout, it will be removed!)
if dst["state"] == "call_blocking" and dst["request"] == "receive":
dst["interfaces_queue_size"][interface]-=1
@@ -437,7 +438,7 @@ class Simulator:
if perform_delivery:
dst["interfaces"][interface].put((data,start_at,self.time))
dst["interfaces_queue_size"][interface]+=1
- self.log("Receive "+str(datasize)+" bytes on "+interface,node=int(dst_id))
+ self.log("Receive "+str(datasize)+" bytes on "+interface,node=dst)
# If node is receiving makes it consume (this way if there is a timeout, it will be removed!)
if dst["state"] == "call_blocking" and dst["request"] == "receive":
dst["interfaces_queue_size"][interface]-=1