#include #include #include #include // Access to system() #include "utils.h" /// @brief Set to non-zero to stop the processes unsigned char __stop=0; char buffer[ZMQ_MSG_SIZE]; /** * @brief Stop process properly on SIGINT * * @param signo */ void sighandler(int signo){ if (signo == SIGINT){ printf("Stopping...\n"); __stop=1; } } int main (int argc, char *argv []) { if(argc != 3){ printf("Usage: %s ",argv[0]); exit(1); } //----- Arguments int port=atoi(argv[1]); char *cdatadir=argv[2]; //----- Various inits mkdirp(cdatadir); signal(SIGINT,sighandler); //----- Init ZMQ void *context = zmq_ctx_new (); void *subscriber = zmq_socket (context, ZMQ_SUB); char bindto[STATIC_LEN]; sprintf(bindto,"tcp://*:%d",port); int rc = zmq_bind (subscriber, bindto); if(rc!=0){ printf("Failed to bind zmq on %s\n",bindto); exit(1); } rc = zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, ZMQ_TOKEN, strlen(ZMQ_TOKEN)); //----- Listen int size; while(!__stop){ size=zmq_recv (subscriber, buffer, ZMQ_MSG_SIZE-1, 0); if(size<=0) continue; buffer[size < ZMQ_MSG_SIZE ? size : ZMQ_MSG_SIZE - 1] = '\0'; //----- Read buffer char *token = strtok(buffer, "\n"); char key[STATIC_LEN]; char client[STATIC_LEN]; long int interval; FILE *fptr; char path[STATIC_LEN]=""; // Output file path int line=1; while(token != NULL){ if(line==2) strcpy(key,token); else if(line==3) strcpy(client,token); else if(line==4) interval=atoi(token); if(line==4){ double size_mib=size/(1024*1024); printf("Data received: key=%s client=%s interval=%ld msgsize=%.2lfMiB\n",key, client, interval,size_mib); // Create dir if not exists: sprintf(path,"%s/%s/%s/",cdatadir,key,client); if(!DIR_EXISTS(path)){ mkdirp(path); } // Now open output file: sprintf(path,"%s/%s/%s/%ld",cdatadir,key,client,interval); char exists=FILE_EXISTS(path); fptr=fopen(path,"a"); // AT SOME POINT WE MUST CHECK IF WORKED!! OTHERWISE PROBLEM if(!exists){ fwrite(CSV_HEADER"\n", strlen(CSV_HEADER)+1, 1, fptr); } } // Write all the measurements: if(line>4){ fwrite(token, strlen(token), 1, fptr); fwrite("\n",1,1,fptr); } token=strtok(NULL, "\n"); line++; } fclose(fptr); #ifdef RCV_CMD char cmd[STATIC_LEN*2]=""; sprintf(cmd,"%s %s",STRINGIFY(RCV_CMD),path); // Run the command with new file as argument system(cmd); // Allow to run a supplied user command on receive #endif } zmq_close (subscriber); zmq_ctx_destroy (context); return 0; }