#include #include #include #include "utils.h" // Global: char *__logdir; char *__key; char *__interface; char *__ip; int __loginterval; int __port; unsigned char __stop=0; void publish(void *publisher, char *filepath, char* client, long int interval); void sighandler(int signo){ if (signo == SIGINT){ printf("Stopping...\n"); __stop=1; } } int main (int argc, char *argv []) { if(argc != 6){ printf("Usage: %s ",argv[0]); exit(1); } //----- Init global variables __logdir=argv[1]; __loginterval=atoi(argv[2]); __ip=argv[3]; __port=atoi(argv[4]); __key=argv[5]; //----- Sanity checks signal(SIGINT,sighandler); if(__logintervald_name,".") && strcmp(de->d_name,"..")){ char *client=de->d_name; // Directory name is the client name // Build the path for files we are looking for: char logfile[STATIC_LEN]; char logfile_next[STATIC_LEN]; sprintf(logfile,"%s/%s/%ld",__logdir,client,interval); sprintf(logfile_next,"%s/%s/%ld",__logdir,client,interval_next); // As long as next logfile is not available, we should wait // before sending the current one (since loggers are working on it) printf("Waiting for %s logger measurements...\n",client); while(!FILE_EXISTS(logfile_next) && (!__stop)){ sleep(1); } // Send current one: if(FILE_EXISTS(logfile)){ printf("Publishing %s measurements\n",client); publish(publisher,logfile,client,interval); remove(logfile); // Prevent log accumulation } } } closedir(dr); } else { // If open fails, directory probably do not exists // yet so wait: sleep(1); } } zmq_close (publisher); zmq_ctx_destroy (context); return 0; } void publish(void *publisher, char *filepath, char* client, long int interval){ // Build message header: char buffer[ZMQ_MSG_SIZE]; sprintf(buffer,"%s\n%s\n%s\n%ld\n",ZMQ_TOKEN,__key,client,interval); int msglen=strlen(buffer); char * line = NULL; size_t len = 0; ssize_t read; FILE *fptr; fptr=fopen(filepath,"r"); // Put every lines in the buffer and send it while ((read = getline(&line, &len, fptr)) != -1) { if((read+msglen)