diff options
Diffstat (limited to 'src/publisher.c')
| -rw-r--r-- | src/publisher.c | 262 |
1 files changed, 151 insertions, 111 deletions
diff --git a/src/publisher.c b/src/publisher.c index 8c8c2f3..03b3a1a 100644 --- a/src/publisher.c +++ b/src/publisher.c @@ -1,130 +1,170 @@ -#include <zmq.h> +#include "utils.h" #include <assert.h> +#include <libgen.h> +#include <pthread.h> #include <signal.h> +#include <stdio.h> +#include <sys/stat.h> +#include <unistd.h> +#include <zmq.h> -#include "utils.h" - -// Global: -char *__logdir; -char *__key; -char *__interface; -char *__ip; -int __loginterval; -int __port; -unsigned char __stop=0; +#ifndef LOG_DELAY +#define LOG_DELAY 0 +#endif -void publish(void *publisher, char *filepath, char* client, long int interval); +/// @brief Set to non-zero to stop the processes +unsigned char __stop = 0; -void sighandler(int signo){ - if (signo == SIGINT){ +/** + * @brief Stop process properly on SIGINT + * + * @param signo + */ +void sighandler(int signo) { + if (signo == SIGINT) { printf("Stopping...\n"); - __stop=1; + __stop = 1; } } -int main (int argc, char *argv []) -{ - if(argc != 6){ - printf("Usage: %s <abslogdir> <loginterval> <ip> <port> <key>",argv[0]); - exit(1); - } +void *publisher(void *zmq_publisher); - //----- Init global variables - __logdir=argv[1]; - __loginterval=atoi(argv[2]); - __ip=argv[3]; - __port=atoi(argv[4]); - __key=argv[5]; - - //----- Sanity checks - signal(SIGINT,sighandler); - if(__loginterval<MIN_INTERVAL){ - printf("Log interval is too small (min=%ds)\n",MIN_INTERVAL); - exit(2); - } +typedef struct queue { + int size; + char issending; + char msg[ZMQ_MSG_SIZE]; +} queue; +queue queues[MAX_QUEUES]; - //----- Prepare our context and publisher - void *context = zmq_ctx_new (); - void *publisher = zmq_socket (context, ZMQ_PUB); - char bindto[STATIC_LEN]; - sprintf(bindto,"tcp://%s:%d",__ip,__port); - int rc = zmq_connect (publisher, bindto); - if(rc!=0){ - printf("Failed to connect to %s\n",bindto); - exit(1); - } +int main(int argc, char *argv[]) { + if (argc != 6) { + printf("Usage: %s <client> <loginterval> <ip> <port> <key>", + argv[0]); + exit(1); + } + + //----- Init arguments + char *client = argv[1]; + int loginterval = atoi(argv[2]); + char *ip = argv[3]; + int port = atoi(argv[4]); + char *key = argv[5]; + + // __regpower: + char regpower[STATIC_LEN]=""; + strcat(regpower, INA260_SYSFS); + strcat(regpower, "/"); + strcat(regpower, client); + strcat(regpower, "/"); + strcat(regpower, INA260_POWER_REGISTER); - //----- Start publisher - struct dirent *de; // Pointer for directory entry - while(!__stop){ - int interval=INTERVAL(__loginterval); // Current interval - int interval_next=INTERVAL_NEXT(__loginterval); // The next one - DIR *dr = opendir(__logdir); // Open the log directory - if(dr !=NULL){ - while ((de = readdir(dr)) != NULL){ - // Iterate over each loggers directories - if(strcmp(de->d_name,".") && strcmp(de->d_name,"..")){ - char *client=de->d_name; // Directory name is the client name - // Build the path for files we are looking for: - char logfile[STATIC_LEN]; - char logfile_next[STATIC_LEN]; - sprintf(logfile,"%s/%s/%ld",__logdir,client,interval); - sprintf(logfile_next,"%s/%s/%ld",__logdir,client,interval_next); - // As long as next logfile is not available, we should wait - // before sending the current one (since loggers are working on it) - printf("Waiting for %s logger measurements...\n",client); - while(!FILE_EXISTS(logfile_next) && (!__stop)){ - sleep(1); - } - // Send current one: - if(FILE_EXISTS(logfile)){ - printf("Publishing %s measurements\n",client); - publish(publisher,logfile,client,interval); - remove(logfile); // Prevent log accumulation - } - } - } - closedir(dr); - } - else { - // If open fails, directory probably do not exists - // yet so wait: - sleep(1); - } + //----- Sanity checks + signal(SIGINT, sighandler); + if (loginterval < MIN_INTERVAL) { + printf("Log interval is too small (min=%ds)\n", MIN_INTERVAL); + exit(2); + } + if (!FILE_EXISTS(regpower)) { + printf("Logger cannot access to %s\n", regpower); + exit(3); + } + + //----- Prepare our context and publisher + void *zmq_context = zmq_ctx_new(); + void *zmq_publisher = zmq_socket(zmq_context, ZMQ_PUB); + char bindto[STATIC_LEN]; + sprintf(bindto, "tcp://%s:%d", ip, port); + int rc = zmq_connect(zmq_publisher, bindto); + if (rc != 0) { + printf("Failed to connect to %s\n", bindto); + exit(1); + } + + //----- Init logging variables + pthread_t zmq_thread; + FILE *regptr, *logptr; + char logfilepath[STATIC_LEN] = ""; + regptr = fopen(regpower, "r"); + char buffer[STATIC_LEN]; + time_t interval; + struct timespec power_ts; + int queue_id = 0; + + // Init queues + for (int i = 0; i < MAX_QUEUES; i++) { + queues[queue_id].issending = 0; + } + pthread_create(&zmq_thread, NULL, publisher, zmq_publisher); + + //----- Start logging + printf("Logger started [client=%s,interval=%ds]\n", client, loginterval); + while (!__stop) { + interval = INTERVAL(loginterval); + // Log current interval + queue_id = (queue_id + 1) >= MAX_QUEUES ? 0 : (queue_id + 1); + // Busy wait: + while (queues[queue_id].issending) { + }; + // Write msg header: + *queues[queue_id].msg = '\0'; + sprintf(queues[queue_id].msg, "%s\n%s\n%s\n%ld\n", ZMQ_TOKEN, key, client, + interval); + queues[queue_id].size = strlen(queues[queue_id].msg); + // Monitor: + while ((TIMESTAMP() - interval) < loginterval) { + // Check if should stop: + if (__stop) + break; + // Read power: + fgets(buffer, STATIC_LEN, regptr); + // Get power measurement timestamp: + clock_gettime(CLOCK_REALTIME, &power_ts); + // Write measurement into msg buffer: + char line[MAX_RECORD_LEN]; + if ((queues[queue_id].size + MAX_RECORD_LEN) > ZMQ_MSG_SIZE) { + printf( + "To many measurements to publish. Please increase ZMQ_MSG_SIZE\n"); + } else { + sprintf(queues[queue_id].msg + queues[queue_id].size, "%ld,%ld,%d\n", + power_ts.tv_sec, power_ts.tv_nsec, atoi(buffer)); + queues[queue_id].size += + strlen(queues[queue_id].msg + queues[queue_id].size); + } + // Reset power register file: + fseek(regptr, 0, SEEK_SET); +#if LOG_DELAY > 0 + usleep(LOG_DELAY * 1000); +#endif } + queues[queue_id].issending = 1; + } - zmq_close (publisher); - zmq_ctx_destroy (context); + //----- Cleaning + fclose(regptr); + pthread_join(zmq_thread, NULL); + zmq_close(zmq_publisher); + zmq_ctx_destroy(zmq_context); - return 0; + return 0; } -void publish(void *publisher, char *filepath, char* client, long int interval){ - // Build message header: - char buffer[ZMQ_MSG_SIZE]; - sprintf(buffer,"%s\n%s\n%s\n%ld\n",ZMQ_TOKEN,__key,client,interval); - int msglen=strlen(buffer); - - char * line = NULL; - size_t len = 0; - ssize_t read; - FILE *fptr; - fptr=fopen(filepath,"r"); - // Put every lines in the buffer and send it - while ((read = getline(&line, &len, fptr)) != -1) { - if((read+msglen) <ZMQ_MSG_SIZE){ - strcat(buffer,line); - msglen+=read; - } else { - // It buffer is full, we send the message and create another one - zmq_send (publisher, buffer, msglen, 0); - // Build new message header: - sprintf(buffer,"%s\n%s\n%s\n%ld\n",ZMQ_TOKEN,__key,client,interval); - strcat(buffer,line); - msglen=strlen(buffer); - } +void *publisher(void *zmq_publisher) { + int queue_id = 0; + while (!__stop) { + if (queues[queue_id].issending) { + printf("Publishing..."); + zmq_send(zmq_publisher, queues[queue_id].msg, queues[queue_id].size, 0); + queues[queue_id].issending = 0; + printf("done\n"); + } else { +#if LOG_DELAY > 0 + usleep(LOG_DELAY * 1000); +#endif + continue; // Queues are always filled in order by the logger (from 0 to n) } - fclose(fptr); - // Finally send the last message (or the only one) - zmq_send (publisher, buffer, msglen, 0); + queue_id++; + if (queue_id >= MAX_QUEUES) + queue_id = 0; + } + pthread_exit(EXIT_SUCCESS); }
\ No newline at end of file |
