summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--esds/node.py9
-rw-r--r--esds/plugins/node_plugin.py3
-rw-r--r--esds/simulator.py13
-rw-r--r--manual/.gitignore3
-rw-r--r--manual/components.pdfbin0 -> 8704 bytes
-rw-r--r--manual/manual.org204
-rw-r--r--manual/manual.pdfbin0 -> 294819 bytes
7 files changed, 230 insertions, 2 deletions
diff --git a/esds/node.py b/esds/node.py
index 6a2566d..59b6f96 100644
--- a/esds/node.py
+++ b/esds/node.py
@@ -13,7 +13,7 @@ class Node:
self.rargs=None # Store the requests arguments
self.plugins=list() # Contains all registered node plugins
self.rqueue=queue.Queue() # Receive simulator acknowledgments
- self.chest={"state":"running", "turned_on":True, "request": None, "interfaces":dict(), "interfaces_queue_size":dict()}
+ self.chest={"state":"running", "turned_on":True, "request": None, "interfaces":dict(), "interfaces_queue_size":dict(), "plugin_notify": dict()}
for interface in interfaces:
self.chest["interfaces"][interface]=queue.Queue()
self.chest["interfaces_queue_size"][interface]=0
@@ -22,6 +22,11 @@ class Node:
def plugin_register(self,plugin):
self.plugins.append(plugin)
+ def plugin_handle_requests(self):
+ # Take plugins notification into account
+ for key in list(self["plugin_notify"]):
+ self.plugin_notify(key,self["plugin_notify"].pop(key))
+
def plugin_notify(self,reason,args):
"""
This function strives to avoid using Python specific features
@@ -33,6 +38,8 @@ class Node:
p.on_send_call(args[0],args[1],args[2],args[3])
if reason == "send_return":
p.on_send_return(args[0],args[1],args[2],args[3],args[4])
+ if reason == "on_communication_end":
+ p.on_communication_end(args[0],args[1],args[2],args[3],args[4])
if reason == "terminated":
p.on_terminated()
diff --git a/esds/plugins/node_plugin.py b/esds/plugins/node_plugin.py
index 325ff8a..dc9302f 100644
--- a/esds/plugins/node_plugin.py
+++ b/esds/plugins/node_plugin.py
@@ -24,6 +24,9 @@ class NodePlugin:
def on_terminated(self):
pass
+ def on_communication_end(self,interface,data,start_at,end_at,aborted_at):
+ self.log("hello world")
+
def log(self,msg):
self.api.log(self.plugin_name+"(NP) "+msg)
diff --git a/esds/simulator.py b/esds/simulator.py
index fee9331..57dda4e 100644
--- a/esds/simulator.py
+++ b/esds/simulator.py
@@ -123,7 +123,10 @@ class Simulator:
"""
sorted_indexes=np.lexsort((self.events[:,3],self.events[:,1]))
self.events=self.events[sorted_indexes]
-
+
+ def notify_node_plugin(self,node,function,args):
+ self.nodes[node]["plugin_notify"][function]=args
+
def sync_node_non_blocking(self,node, timeout_remove_only=False):
"""
Process all call request and wait for Node.sync() to return
@@ -196,11 +199,14 @@ class Simulator:
selector_wireless.append(False)
if event[2][9]: # Check if should be cancel on turn_off (receiver_required)
selector_wired.append(True)
+ self.notify_node_plugin(event[2][1],"on_communication_end",("eth0",0,0,0,0))
else:
selector_wired.append(False)
event[2][8]=False # So set delivery to False!!
+ # TODO: notify receiver plugins
else:
selector_wireless.append(True)
+ # TODO: notify receiver plugins
selector_wired.append(False)
else:
selector_wireless.append(False)
@@ -234,6 +240,7 @@ class Simulator:
selector.append(True)
if self.netmat[event[2][2]]["is_wired"]:
sharing_to_update.append((int(event[2][1]),event[2][2]))
+ # TODO: notify sender plugins
else:
selector.append(False)
self.events=self.events[~np.array(selector)]
@@ -427,11 +434,13 @@ class Simulator:
dst["state"]="running"
dst.rqueue.put(("receive",0))
self.sync_node_non_blocking(dst,timeout_remove_only=True)
+ # TODO: notify receiver plugins
self.update_sharing(dst.node_id,-1,interface)
src["state"]="running"
code=0 if perform_delivery else 1
src.rqueue.put(("send",code))
self.sync_node_non_blocking(src,timeout_remove_only=True)
+ # TODO: notify sender plugins
else:
if src.node_id != dst.node_id:
if perform_delivery:
@@ -444,10 +453,12 @@ class Simulator:
dst["state"]="running"
dst.rqueue.put(("receive",0))
self.sync_node_non_blocking(dst,timeout_remove_only=True)
+ # TODO: notify receiver plugins
else:
src["state"]="running"
src.rqueue.put(("send",0))
self.sync_node_non_blocking(src,timeout_remove_only=True)
+ # TODO: notify sender plugins
elif event_type == 1: # Timeout
node=self.nodes[int(event)]
node["state"]="running"
diff --git a/manual/.gitignore b/manual/.gitignore
new file mode 100644
index 0000000..056aa14
--- /dev/null
+++ b/manual/.gitignore
@@ -0,0 +1,3 @@
+_minted-manual
+*.tex
+*.bbl \ No newline at end of file
diff --git a/manual/components.pdf b/manual/components.pdf
new file mode 100644
index 0000000..49afc21
--- /dev/null
+++ b/manual/components.pdf
Binary files differ
diff --git a/manual/manual.org b/manual/manual.org
new file mode 100644
index 0000000..4ad104c
--- /dev/null
+++ b/manual/manual.org
@@ -0,0 +1,204 @@
+#+TITLE: ESDS: Extensible Simulator for Distributed Systems
+#+AUTHOR: Loic GUEGAN
+#+OPTIONS: toc:nil
+
+#+LATEX_HEADER: \usepackage{fullpage}
+#+LATEX_HEADER: \usepackage{minted}
+#+LATEX_HEADER: \usepackage{booktabs}
+#+LATEX_HEADER: \usepackage{xspace}
+#+LATEX_HEADER: \newcommand{\stateoff}{"\textit{off}"\xspace}
+#+LATEX_HEADER: \newcommand{\stateon}{"\textit{on}"\xspace}
+
+
+* Simulation Architecture
+The ESDS simulator comprises two major components: 1) The Simulation Orchestrator(SO) 2) The Simulated
+Nodes (SN). This architecture is depicted in Figure \ref{architecture}.
+
+\begin{figure}[!h]
+\centering
+\includegraphics[scale=0.5]{components.pdf}
+\caption{Architecture of ESDS}
+\label{architecture}
+\end{figure}
+
+The SO is the main process in charge of implementing the simulation main loop. It instantiates the
+network (e.g bandwidths andlatencies), collects and processes the events (e.g communications,turn
+on/off). The nodes on the other hand are threads that simulate the nodes behaviors.
+
+* Example
+To run a simulation, you need to provide at least 2 files. The first one instantiate the
+orchestrator and the second one will simulate the node. In this section, you will learn how to write
+both files.
+
+The simulated scenario comprises 2 nodes that wakes up randomly every hour for a duration called
+"uptime". The sender try to transmit his data during that uptime. The other node is a receiver that
+have similar random wake up parterns and strive to receive data from the sender.
+** Orchestrator
+#+attr_latex: :options fontsize=\small, breaklines
+#+BEGIN_SRC python
+ #!/usr/bin/env python
+
+ import esds # Load ESDS
+ import numpy as np # Use numpy to construct bandwidth and latencies matrix
+
+ ##### Bandwidth matrix
+ # Bandwidth value can be 0 for unreachable nodes
+ # Regarding wireless interfaces the diagonals of the bandwidth and latency matrices are very important.
+ # They determine the duration of the tranmission for THE SENDER. It allows to have a different tx
+ # duration per node and per interface. Please cf esds.py for more informations.
+ n=2 # Number of nodes including the sender
+ B=np.full((n,n),5) # 5Mbps
+
+ ##### Latency matrix
+ # If the latency entries match one with a bandwidth of 0
+ # then it will be ignore since node is unreachable.
+ L=np.full((n,n),0) # 0s
+
+ ##### Create the simulator
+ # esds.Simulator take at least a dictionnary as a parameter
+ # This dictionnary contains all the network interfaces (name as a key) of each node
+ s=esds.Simulator({"wlan0":{"bandwidth":B, "latency":L, "is_wired":False},"eth0":{"bandwidth":B, "latency":L, "is_wired":True}})
+
+ ##### Instantiate nodes
+ uptime=180 # 180s uptime
+ s.create_node("sender",args=uptime) # Load sender.py for the first node with 5 as argument (first row in B and L)
+
+ # Aguments can be passed to nodes via: s.create_node("sender",args="my argument")
+ for n in range(0,n-1): # Load receiver.py for the remaining nodes
+ s.create_node("receiver",args=uptime)
+
+ ##### Run the simulation
+ s.run()
+#+END_SRC
+
+** Nodes
+To implement a node, you should create a python file with the method execute(api). This method will be
+called by the orchestrator to execute the code of your node. The api parameter provide you access to the following esds API:
+
+
+\begin{table*}[]
+ \centering
+ \caption{Simulated Nodes blocking and non-blocking API calls}
+ \label{tab:api}
+ \small
+ \resizebox{\columnwidth}{!}{%
+ \begin{tabular}{@{}lll@{}}
+\toprule
+\textbf{Call} & \textbf{Blocking} & \textbf{Description} \\ \midrule
+\verb!send(<int>,<data>,<size>,<dst>,<rdst>)! & yes & Send \verb!<data>! of size \verb!<size>! on interface \verb!<int>! \\
+\verb!sendt(<int>,<data>,<size>,<dst>,<t>,<rdst>)! & yes & Send \verb!<data>! of size \verb!<size>! on interface \verb!<int>! with a timeout of \verb!<t>! \\
+\verb!receive(<int>)! & yes & Wait for and fetch incoming data on interface \verb!<int>! \\
+\verb!receivet(<int>,<t>)! & yes & Wait for and fetch incoming data on interface \verb!<int>! with a timeout of \verb!<t>! \\
+\verb!wait(<t>)! & yes & Wait for a specific amount of simulated time \verb!<t>! \\
+\verb!wait_end()! & yes & Wait until the end of the simulation \\
+\verb!log(<message>)! & no & Report \verb!<message>! to the SO that will print it to the standard output \\
+\verb!read(<register>)! & no & Read in the SO registers (e.g \textit{clock} to get the current simulated time) \\
+\verb!turn_off()/turn_on()! & no & Change the node state to \stateoff or \stateon respectively
+\end{tabular}}
+\end{table*}
+
+*** Sender
+#+attr_latex: :options fontsize=\small, breaklines
+#+BEGIN_SRC python
+ #!/usr/bin/env python
+
+ import random
+
+ # Note that the following is required to have different instance from thread to thread
+ lr=random.Random(6)
+
+ def execute(api):
+ uptime=api.args
+ endoff=0
+ for i in range(0,24):
+ startoff=random.randint(0,3600-uptime)
+ api.turn_off()
+ api.wait(startoff+endoff)
+ api.turn_on()
+ wakeat=api.read("clock")
+ wakeuntil=wakeat+uptime
+ # Send until uptime seconds if elapsed
+ while api.read("clock") < wakeuntil:
+ api.sendt("wlan0","hello",10,None, wakeuntil-api.read("clock"))
+ api.log("Was up for {}s".format(api.read("clock")-wakeat))
+ endoff=3600*(i+1)-api.read("clock")
+ api.turn_off()
+ api.wait(endoff)
+ api.turn_on()
+
+
+
+
+#+END_SRC
+*** Receiver
+#+attr_latex: :options fontsize=\small, breaklines
+#+BEGIN_SRC python
+ #!/usr/bin/env python
+
+ import sys, random, time
+
+ lr=random.Random(6)
+
+ def execute(api):
+ uptime=api.args
+ endoff=0
+ for i in range(0,24):
+ startoff=random.randint(0,3600-uptime)
+ api.turn_off()
+ api.wait(startoff+endoff)
+ api.turn_on()
+ wakeat=api.read("clock")
+ wakeuntil=wakeat+uptime
+ # Receive until uptime seconds if elapsed
+ while api.read("clock") < wakeuntil:
+ code, data=api.receivet("wlan0",wakeuntil-api.read("clock"))
+ if code == 0:
+ api.log("Receive "+data)
+ api.log("Was up for {}s".format(api.read("clock")-wakeat))
+ endoff=3600*(i+1)-api.read("clock")
+ api.turn_off()
+ api.wait(endoff)
+ api.turn_on()
+
+
+#+END_SRC
+
+** Simulation Output
+Here is part of the simulation output:
+#+begin_example
+ [t=82626.000,src=n0] Send 10 bytes on wlan0
+ [t=82630.000,src=n0] Was up for 180.0s
+ [t=82630.000,src=n0] Turned off
+ [t=83083.000,src=n1] Turned on
+ [t=83263.000,src=n1] Was up for 180.0s
+ [t=83263.000,src=n1] Turned off
+ [t=85910.000,src=n0] Turned on
+ [t=85910.000,src=n0] Send 10 bytes on wlan0
+ [t=85926.000,src=n0] Send 10 bytes on wlan0
+ [t=85942.000,src=n0] Send 10 bytes on wlan0
+ [t=85958.000,src=n0] Send 10 bytes on wlan0
+ [t=85974.000,src=n0] Send 10 bytes on wlan0
+ [t=85990.000,src=n0] Send 10 bytes on wlan0
+ [t=86006.000,src=n0] Send 10 bytes on wlan0
+ [t=86022.000,src=n0] Send 10 bytes on wlan0
+ [t=86038.000,src=n0] Send 10 bytes on wlan0
+ [t=86054.000,src=n0] Send 10 bytes on wlan0
+ [t=86070.000,src=n0] Send 10 bytes on wlan0
+ [t=86086.000,src=n0] Send 10 bytes on wlan0
+ [t=86090.000,src=n0] Was up for 180.0s
+ [t=86090.000,src=n0] Turned off
+ [t=86400.000,src=n0] Turned on
+ [t=86400.000,src=n1] Turned on
+ [t=86400.000,src=esds] Simulation ends
+#+end_example
+Brackets indicate additional informations related to the message (e.g source and simulated
+time). All the send and received events are reported automatically by esds.
+
+# Local Variables:
+# eval: (setq org-latex-listings 'minted)
+# eval: (setq org-latex-pdf-process
+# '("pdflatex -shell-escape -interaction nonstopmode -output-directory %o %f"
+# "bibtex %b"
+# "pdflatex -shell-escape -interaction nonstopmode -output-directory %o %f"
+# "pdflatex -shell-escape -interaction nonstopmode -output-directory %o %f"))
+# End:
diff --git a/manual/manual.pdf b/manual/manual.pdf
new file mode 100644
index 0000000..037f976
--- /dev/null
+++ b/manual/manual.pdf
Binary files differ