summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLoic Guegan <manzerbredes@mailbox.org>2022-09-07 07:04:40 +0200
committerLoic Guegan <manzerbredes@mailbox.org>2022-09-07 07:04:40 +0200
commit02aac0ef999cbb6d9d727de5397818fddaa6bb65 (patch)
tree318724685bf8092629b3ac4acc6bbf9cdbd6f50c
parent8252ecb385a745b55b0733a9180270d4bd298da4 (diff)
Improve plugin callbacks
-rw-r--r--esds/node.py7
-rw-r--r--esds/simulator.py11
2 files changed, 12 insertions, 6 deletions
diff --git a/esds/node.py b/esds/node.py
index 7fc62dc..03c7457 100644
--- a/esds/node.py
+++ b/esds/node.py
@@ -13,7 +13,7 @@ class Node:
self.rargs=None # Store the requests arguments
self.plugins=list() # Contains all registered node plugins
self.rqueue=queue.Queue() # Receive simulator acknowledgments
- self.chest={"state":"running", "turned_on":True, "request": None, "interfaces":dict(), "interfaces_queue_size":dict()}
+ self.chest={"state":"running", "turned_on":True, "request": None, "interfaces":dict(), "interfaces_queue_size":dict(), "pending_plugin_notify":0}
for interface in interfaces:
self.chest["interfaces"][interface]=queue.Queue()
self.chest["interfaces_queue_size"][interface]=0
@@ -200,7 +200,10 @@ class Node:
ack=None
while True:
ack=self.rqueue.get() # Wait for simulator acknowledgments
- if ack[0] not in ack_types:
+ if ack[0] == "plugin_notify":
+ self.plugin_notify(ack[1],ack[2])
+ self["pending_plugin_notify"]-=1
+ elif ack[0] not in ack_types:
ack_buffer.append(ack)
else:
break
diff --git a/esds/simulator.py b/esds/simulator.py
index 960d87f..f56dfcb 100644
--- a/esds/simulator.py
+++ b/esds/simulator.py
@@ -124,9 +124,6 @@ class Simulator:
sorted_indexes=np.lexsort((self.events[:,3],self.events[:,1]))
self.events=self.events[sorted_indexes]
- def notify_node_plugin(self,node,function,args):
- self.nodes[node]["plugin_notify"][function]=args
-
def sync_node_non_blocking(self,node, timeout_remove_only=False):
"""
Process all call request and wait for Node.sync() to return
@@ -362,7 +359,13 @@ class Simulator:
selector = self.netmat[interface]["bandwidth"][node.node_id,] > 0
return np.arange(0,selector.shape[0])[selector]
-
+ def notify_node_plugins(self,node,callback,args):
+ node[pending_plugin_notify]+=1
+ self.nodes[int(event[2][1])].rqueue.put(("plugin_notify",callback,args))
+ # Now ensure that all callbacks are called before continuing
+ while node[pending_plugin_notify] > 0:
+ pass
+
def add_event(self,event_type,event_ts,event,priority=2):
"""
Call this function with sort=True the least amount of time possible