#include #include #include #include #include #include #include #include "Inputs.hpp" #define PLATFORM_FILE "platform.xml" #define MODE_OFF() simgrid::s4u::this_actor::get_host()->set_pstate(0); #define MODE_ON() simgrid::s4u::this_actor::get_host()->set_pstate(1); #define MODE_RX() simgrid::s4u::this_actor::get_host()->set_pstate(2); #define MODE_TX() simgrid::s4u::this_actor::get_host()->set_pstate(3); #define CLOCK (simgrid::s4u::Engine::get_clock()) #define CNAME (selfName.c_str()) #define TRACK_UPTIME(instruction) \ { \ double uptimeTrack=CLOCK; \ instruction; \ uptimeTrack=CLOCK-uptimeTrack; \ uptime-=uptimeTrack; \ uptime=uptime > 0 ? uptime : 0; \ } /// @brief Note that we need to simulate latency our self since we need to send instantaneous messages #define SEND(instruction) \ { \ TRACK_UPTIME(simgrid::s4u::this_actor::sleep_for(i.latency > uptime ? uptime : i.latency)); \ instruction; \ } #define FORWARD_HINT(TRY_FORWARD_DURING) \ { \ if(hint_forward!=NULL && CLOCKhint){ \ hint_forward->HisForward=true; \ hint_forward->DedicatedMailbox="hint_forward"; \ try { \ XBT_INFO("%s try to forward a hint",CNAME); \ TRACK_UPTIME(m->put(hint_forward,0,TRY_FORWARD_DURING)); \ simgrid::s4u::Mailbox *m_ded= simgrid::s4u::Mailbox::by_name(hint_forward->DedicatedMailbox); \ MODE_TX(); \ SEND(m_ded->put(hint_forward,0,uptime)); \ } \ catch(...){ \ XBT_INFO("%s fail to forward a hint",CNAME); \ } \ } \ } /// @brief Required by SimGrid XBT_LOG_NEW_DEFAULT_CATEGORY(simulator, "[DAO] Loosely Coupled DSS"); /// @brief For convenience sake typedef unsigned int u32; u32 nON; bool *data_ready; bool is_data_rcv_ready(){ for(int i=0;i args); /** * No arguments are require (cf inputs.json) */ int main(int argc, char **argv) { // Build engine sg_host_energy_plugin_init(); simgrid::s4u::Engine engine(&argc, argv); Inputs::GeneratePlatform(PLATFORM_FILE); engine.load_platform(PLATFORM_FILE); // Headline XBT_INFO("-------------------------------------------------"); XBT_INFO("Sarting loosely coupled data dissemination experiments"); XBT_INFO("-------------------------------------------------"); // Init all nodes actors nON=simgrid::s4u::Engine::get_instance()->get_host_count(); data_ready=(bool*)malloc(sizeof(bool)*nON); for(u32 i=0;i args; std::ostringstream ss; ss<< "on" < args) { // Init various variables std::string selfName = simgrid::s4u::this_actor::get_host()->get_name(); int id=stoi(args[0]); simgrid::s4u::this_actor::get_host()->turn_on(); Inputs i(selfName); simgrid::s4u::Mailbox *m = simgrid::s4u::Mailbox::by_name("medium"); XBT_INFO("Deploying observation node %s",CNAME); // Starting node bool isObserver=false; u32 nWakeUp=0; u32 nDataRcv=0; u32 nSendFail=0; u32 nRcvFail=0; u32 nSend=0; u32 hint_added=0; double totalUptime=0; Payload *hint_forward=NULL; bool init_issender=i.is_sender; while(i.ShouldContinue()){ // Start by sleeping XBT_INFO("%s is spleeping",CNAME); MODE_OFF(); simgrid::s4u::this_actor::sleep_until(i.GetTS()); MODE_ON(); XBT_INFO("%s wakes up",CNAME); // Doing wake up stuff double uptime=i.GetDuration(); double upsince=simgrid::s4u::Engine::get_clock(); double upuntil=i.GetTS()+i.GetDuration(); bool forward_mode=false; bool onlyf=false; bool sendhint_mode=false; while(uptime>0) { if(i.is_sender){ double try_for=sendhint_mode ? 0.3 : 1; try_for=try_for>uptime ? uptime : try_for; // Ensure we do not exceed uptime try_for=(i.use_hint && i.HasNext()) ? try_for: uptime; // Ensure we shoud use hint otherwise only send data // Send hint if(i.use_hint && sendhint_mode && i.HasNext()){ Payload *p=new Payload(); p->DedicatedMailbox="hintmailbox"+selfName; p->HasHint=true; p->duration=i.GetNextDuration(); p->hint=i.GetNextTS(); try { TRACK_UPTIME(m->put(p,0,try_for)); simgrid::s4u::Mailbox *m_ded= simgrid::s4u::Mailbox::by_name(p->DedicatedMailbox); MODE_TX(); SEND(m_ded->put(p,i.hint_size,uptime)); XBT_INFO("%s sent a hint successfully",CNAME); } catch(...){} } else{ // Send data Payload *p=new Payload(); p->DedicatedMailbox="datamailbox"+selfName; p->HasData=true; p->HasHint=false; if(i.use_hint && i.HasNext()){ p->HasHint=true; p->duration=i.GetNextDuration(); p->hint=i.GetNextTS(); } try { TRACK_UPTIME(m->put(p,0,try_for)); simgrid::s4u::Mailbox *m_ded= simgrid::s4u::Mailbox::by_name(p->DedicatedMailbox); MODE_TX(); if(i.extended){ SEND(m_ded->put(p,i.data_size)); } else{ SEND(m_ded->put(p,i.data_size,uptime)); } XBT_INFO("%s sent data successfully",CNAME); nSend++; i.is_sender=(nSend<(i.n_nodes-1)); isObserver=!i.is_sender; } catch(...){} } sendhint_mode=!sendhint_mode; // Switch to send hint mode } else if(!isObserver){ // Here we try to forward hint for 1 sec and try to receive data for 5 secs double try_for=forward_mode ? 0.3 : 1; try_for=try_for>uptime ? uptime : try_for; // Ensure we do not exceed uptime // Forward hint mode if(forward_mode){ if(hint_forward!=NULL && CLOCK < hint_forward->hint){ FORWARD_HINT(try_for); // Try forward for 5 seconds then switch to received mode } } else { // Receiving mode Payload *p; // Received data Payload *hint; // To Save the received hint bool hintReceived=false; // In case of error during data rx this will be use to check if we could use the *hint Payload object try { // Get the instantaneous message do { TRACK_UPTIME(p=m->get(try_for)); if(p->HisForward){ if(hint_forward==NULL || (hint_forward !=NULL && p->hint!=hint_forward->hint)){ simgrid::s4u::Mailbox *m_ded=simgrid::s4u::Mailbox::by_name(p->DedicatedMailbox); MODE_RX(); TRACK_UPTIME(p=m_ded->get(uptime)); MODE_ON(); XBT_INFO("%s received a forwarded hint successfully",CNAME); if(CLOCK < p->hint){ i.AddEvent(p->hint, p->duration); hint_forward=new Payload(*p); hint_added++; } } } } while(p->HisForward); simgrid::s4u::Mailbox *m_ded= simgrid::s4u::Mailbox::by_name(p->DedicatedMailbox); // Start receiving data MODE_RX(); if(p->HasHint && !p->HasData){ TRACK_UPTIME(p=m_ded->get(uptime)); XBT_INFO("%s received a hint successfully",CNAME); hint=new Payload(*p); // Save hint hint_forward=new Payload(*p); // Enable hint forwarding hintReceived=true; if(CLOCK < p->hint){ i.AddEvent(p->hint, p->duration); hint_forward=new Payload(*p); hint_added++; } } else { if(i.extended){ p=m_ded->get(); // Fetch data until sended } else p=m_ded->get(uptime); // Fetch data until sended or uptime expire // If we reach here, data has been received successfully XBT_INFO("%s received data successfully",CNAME); if(p->HasHint){ XBT_INFO("%s received a hint along with data successfully",CNAME); hint=new Payload(*p); // Save hint hint_forward=new Payload(*p); // Enable hint forwarding hintReceived=true; } nDataRcv++; isObserver=true; i.is_sender=false; } }catch(...){ XBT_INFO("%s could not receive any data",CNAME); nRcvFail++; if(hintReceived){ i.AddEvent(hint->hint, hint->duration); // Add the hint to the event list hint_added++; } } } forward_mode=!forward_mode; // Toggle mode (go back and forth between receiving and forwarding) } else { XBT_INFO("%s is observing his environment...",CNAME); MODE_ON(); if(i.use_hint){ double try_for=forward_mode ? 1 : 1; try_for=try_for>uptime ? uptime : try_for; if(onlyf) forward_mode=true; if(forward_mode && hint_forward!=NULL && CLOCK < hint_forward->hint){ FORWARD_HINT(try_for); } else { Payload *p; try { do { TRACK_UPTIME(p=m->get(try_for)); } while(p->HisForward); // Ignore forwarded hint simgrid::s4u::Mailbox *m_ded= simgrid::s4u::Mailbox::by_name(p->DedicatedMailbox); // Start receiving hint from sender onlyf=true; MODE_RX(); if(p->HasHint){ TRACK_UPTIME(p=m_ded->get(uptime)); XBT_INFO("%s received a hint successfully",CNAME); hint_forward=new Payload(*p); // Enable hint forwarding //simgrid::s4u::this_actor::sleep_for(uptime); // Now sleep until the end } else { simgrid::s4u::this_actor::sleep_for(try_for); } } catch(...){ } } forward_mode=!forward_mode; } else { simgrid::s4u::this_actor::sleep_for(uptime); } } uptime=upuntil-CLOCK; // Note that uptime can be < 0 in extended mode } // Load next event i.GotoNextEvent(); nWakeUp++; // Increase the number of wake up totalUptime+=CLOCK-upsince; } // Done MODE_OFF() XBT_INFO("Observation node %s finished [LOG2PARSE](node:%s|isSender:%d|nSend:%d|nWakeUp:%d|nDataRcv:%d|nSendFail:%d|nRcvFail:%d|totalUptime:%f|seed:%d|hint_added:%d)",CNAME,CNAME,init_issender,nSend,nWakeUp,nDataRcv,nSendFail,nRcvFail,totalUptime,i.seed,hint_added); }