summaryrefslogtreecommitdiff
path: root/src/publisher.c
blob: 8c8c2f34811cb17f129e5a75826a1d9dfcf60fc8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
#include <zmq.h>
#include <assert.h>
#include <signal.h>

#include "utils.h"

// Global:
char *__logdir;
char *__key;
char *__interface;
char *__ip;
int __loginterval;
int __port;
unsigned char __stop=0;

void publish(void *publisher, char *filepath, char* client, long int interval);

void sighandler(int signo){
  if (signo == SIGINT){
    printf("Stopping...\n");
    __stop=1;
  }
}

int main (int argc, char *argv [])
{
    if(argc != 6){
        printf("Usage: %s <abslogdir> <loginterval> <ip> <port> <key>",argv[0]);
        exit(1);
    }

    //----- Init global variables
    __logdir=argv[1];
    __loginterval=atoi(argv[2]);
    __ip=argv[3];
    __port=atoi(argv[4]);
    __key=argv[5];

    //----- Sanity checks
    signal(SIGINT,sighandler);
    if(__loginterval<MIN_INTERVAL){
        printf("Log interval is too small (min=%ds)\n",MIN_INTERVAL);
        exit(2);
    }

    //----- Prepare our context and publisher
    void *context = zmq_ctx_new ();
    void *publisher = zmq_socket (context, ZMQ_PUB);
    char bindto[STATIC_LEN];
    sprintf(bindto,"tcp://%s:%d",__ip,__port);
    int rc = zmq_connect (publisher, bindto);
    if(rc!=0){
        printf("Failed to connect to %s\n",bindto);
        exit(1);
    }

    //----- Start publisher
    struct dirent *de;  // Pointer for directory entry
    while(!__stop){
        int interval=INTERVAL(__loginterval); // Current interval
        int interval_next=INTERVAL_NEXT(__loginterval); // The next one
        DIR *dr = opendir(__logdir); // Open the log directory
        if(dr !=NULL){
            while ((de = readdir(dr)) != NULL){
                // Iterate over each loggers directories
                if(strcmp(de->d_name,".") && strcmp(de->d_name,"..")){
                    char *client=de->d_name; // Directory name is the client name
                    // Build the path for files we are looking for:
                    char logfile[STATIC_LEN];
                    char logfile_next[STATIC_LEN];
                    sprintf(logfile,"%s/%s/%ld",__logdir,client,interval);
                    sprintf(logfile_next,"%s/%s/%ld",__logdir,client,interval_next);
                    // As long as next logfile is not available, we should wait
                    // before sending the current one (since loggers are working on it)
                    printf("Waiting for %s logger measurements...\n",client);
                    while(!FILE_EXISTS(logfile_next) && (!__stop)){
                        sleep(1);
                    }
                    // Send current one:
                    if(FILE_EXISTS(logfile)){
                        printf("Publishing %s measurements\n",client);
                        publish(publisher,logfile,client,interval);
                        remove(logfile); // Prevent log accumulation
                    }
                }
            }
            closedir(dr);
        }
        else {
            // If open fails, directory probably do not exists
            // yet so wait:
            sleep(1);
        }
    }

    zmq_close (publisher);
    zmq_ctx_destroy (context);

    return 0;
}

void publish(void *publisher, char *filepath, char* client, long int interval){
    // Build message header:
    char buffer[ZMQ_MSG_SIZE];
    sprintf(buffer,"%s\n%s\n%s\n%ld\n",ZMQ_TOKEN,__key,client,interval);
    int msglen=strlen(buffer);

    char * line = NULL;
    size_t len = 0;
    ssize_t read;
    FILE *fptr;
    fptr=fopen(filepath,"r");
    // Put every lines in the buffer and send it
    while ((read = getline(&line, &len, fptr)) != -1) {
        if((read+msglen) <ZMQ_MSG_SIZE){
            strcat(buffer,line);
            msglen+=read;
        } else {
            // It buffer is full, we send the message and create another one
            zmq_send (publisher, buffer, msglen, 0);
            // Build new message header:
            sprintf(buffer,"%s\n%s\n%s\n%ld\n",ZMQ_TOKEN,__key,client,interval);
            strcat(buffer,line);
            msglen=strlen(buffer);
        }
    }
    fclose(fptr);
    // Finally send the last message (or the only one)
    zmq_send (publisher, buffer, msglen, 0);
}