mirror of
https://github.com/bgp/bgpq4.git
synced 2024-05-11 05:55:05 +00:00
new pipelining... still buggy
This commit is contained in:
21
bgpq3.h
21
bgpq3.h
@ -1,10 +1,12 @@
|
||||
#ifndef BGPQ3_H_
|
||||
#define BGPQ3_H_
|
||||
|
||||
#include <sys/queue.h>
|
||||
|
||||
#include "sx_prefix.h"
|
||||
#include "sx_slentry.h"
|
||||
|
||||
typedef enum {
|
||||
typedef enum {
|
||||
V_CISCO = 0,
|
||||
V_JUNIPER,
|
||||
V_CISCO_XR,
|
||||
@ -13,7 +15,7 @@ typedef enum {
|
||||
V_FORMAT
|
||||
} bgpq_vendor_t;
|
||||
|
||||
typedef enum {
|
||||
typedef enum {
|
||||
T_NONE = 0,
|
||||
T_ASPATH,
|
||||
T_OASPATH,
|
||||
@ -21,15 +23,17 @@ typedef enum {
|
||||
T_EACL
|
||||
} bgpq_gen_t;
|
||||
|
||||
struct bgpq_prequest {
|
||||
struct bgpq_prequest* next;
|
||||
char request[128];
|
||||
int size;
|
||||
struct bgpq_request {
|
||||
STAILQ_ENTRY(bgpq_request) next;
|
||||
char* request;
|
||||
int size, offset;
|
||||
int (*callback)(char*, void*, char*);
|
||||
void *udata;
|
||||
char* response;
|
||||
int rsize, roffset;
|
||||
};
|
||||
|
||||
struct bgpq_expander {
|
||||
struct bgpq_expander {
|
||||
struct sx_radix_tree* tree;
|
||||
struct sx_slentry* macroses;
|
||||
struct sx_slentry* rsets;
|
||||
@ -52,8 +56,7 @@ struct bgpq_expander {
|
||||
char* server;
|
||||
char* format;
|
||||
unsigned maxlen;
|
||||
int socksize;
|
||||
int qsize;
|
||||
STAILQ_HEAD(bgpq_requests, bgpq_request) wq, rq;
|
||||
};
|
||||
|
||||
|
||||
|
426
bgpq_expander.c
426
bgpq_expander.c
@ -6,6 +6,7 @@
|
||||
#include <sys/socket.h>
|
||||
|
||||
#include <assert.h>
|
||||
#include <fcntl.h>
|
||||
#include <ctype.h>
|
||||
#include <errno.h>
|
||||
#include <netdb.h>
|
||||
@ -28,11 +29,11 @@ int expand_special_asn=0;
|
||||
struct limited_req {
|
||||
int cdepth;
|
||||
struct bgpq_expander* b;
|
||||
FILE* f;
|
||||
int fd;
|
||||
};
|
||||
|
||||
struct limited_req*
|
||||
lreq_alloc(struct bgpq_expander* b, int cdepth, FILE* f)
|
||||
lreq_alloc(struct bgpq_expander* b, int cdepth, int fd)
|
||||
{
|
||||
struct limited_req* lr = malloc(sizeof(struct limited_req));
|
||||
if (!lr)
|
||||
@ -40,7 +41,7 @@ lreq_alloc(struct bgpq_expander* b, int cdepth, FILE* f)
|
||||
memset(lr, 0, sizeof(struct limited_req));
|
||||
lr->cdepth=cdepth;
|
||||
lr->b=b;
|
||||
lr->f=f;
|
||||
lr->fd=fd;
|
||||
return lr;
|
||||
};
|
||||
|
||||
@ -69,6 +70,9 @@ bgpq_expander_init(struct bgpq_expander* b, int af)
|
||||
b->identify=1;
|
||||
b->server="whois.radb.net";
|
||||
|
||||
STAILQ_INIT(&b->wq);
|
||||
STAILQ_INIT(&b->rq);
|
||||
|
||||
return 1;
|
||||
fixups:
|
||||
/* if(b->tree) XXXXXXXXXXXXX sx_radix_tree_destroy(b->tree); */
|
||||
@ -238,9 +242,9 @@ bgpq_expanded_macro(char* as, void* udata, char* request)
|
||||
return 1;
|
||||
};
|
||||
|
||||
int bgpq_pipeline(struct bgpq_expander* b, FILE* f,
|
||||
int bgpq_pipeline(struct bgpq_expander* b, int fd,
|
||||
int (*callback)(char*, void*,char*), void* udata, char* fmt, ...);
|
||||
int bgpq_expand_irrd(FILE* f, int (*callback)(char*, void*,char*), void* udata,
|
||||
int bgpq_expand_irrd(int fd, int (*callback)(char*, void*,char*), void* udata,
|
||||
char* fmt, ...);
|
||||
|
||||
int
|
||||
@ -259,13 +263,13 @@ addasset:
|
||||
already=already->next;
|
||||
};
|
||||
if(lr->cdepth + 1 < lr->b->maxdepth) {
|
||||
struct limited_req* lr1 = lreq_alloc(lr->b,lr->cdepth+1,lr->f);
|
||||
struct limited_req* lr1 = lreq_alloc(lr->b,lr->cdepth+1,lr->fd);
|
||||
bgpq_expander_add_already(lr->b,as);
|
||||
if (pipelining) {
|
||||
bgpq_pipeline(lr->b,lr->f,bgpq_expanded_macro_limit,lr1,
|
||||
bgpq_pipeline(lr->b,lr->fd,bgpq_expanded_macro_limit,lr1,
|
||||
"!i%s\n",as);
|
||||
} else {
|
||||
bgpq_expand_irrd(lr->f,bgpq_expanded_macro_limit,lr1,
|
||||
bgpq_expand_irrd(lr->fd,bgpq_expanded_macro_limit,lr1,
|
||||
"!i%s\n",as);
|
||||
};
|
||||
} else {
|
||||
@ -317,178 +321,234 @@ bgpq_expanded_v6prefix(char* prefix, void* udata, char* request)
|
||||
return 1;
|
||||
};
|
||||
|
||||
int bgpq_pipeline_dequeue(FILE* f, struct bgpq_expander* b);
|
||||
int bgpq_pipeline_dequeue(int fd, struct bgpq_expander* b);
|
||||
|
||||
static struct bgpq_request*
|
||||
bgpq_request_alloc(char* request, int (*callback)(char*, void*, char*),
|
||||
void* udata)
|
||||
{
|
||||
struct bgpq_request* bp = malloc(sizeof(struct bgpq_request));
|
||||
if (!bp)
|
||||
return NULL;
|
||||
memset(bp, 0, sizeof(struct bgpq_request));
|
||||
bp->request = strdup(request);
|
||||
bp->offset = 0;
|
||||
bp->size = strlen(bp->request);
|
||||
bp->callback = callback;
|
||||
bp->udata = udata;
|
||||
return bp;
|
||||
};
|
||||
|
||||
int
|
||||
bgpq_pipeline(struct bgpq_expander* b, FILE* f,
|
||||
bgpq_pipeline(struct bgpq_expander* b, int fd,
|
||||
int (*callback)(char*, void*,char*), void* udata, char* fmt, ...)
|
||||
{
|
||||
char request[128];
|
||||
int ret, rlen;
|
||||
struct bgpq_prequest* bp=NULL;
|
||||
struct bgpq_request* bp=NULL;
|
||||
va_list ap;
|
||||
va_start(ap,fmt);
|
||||
vsnprintf(request,sizeof(request),fmt,ap);
|
||||
va_end(ap);
|
||||
|
||||
rlen=strlen(request);
|
||||
if(rlen+b->qsize >= b->socksize) {
|
||||
SX_DEBUG(debug_expander, "looks like socket buffer shortage, "
|
||||
"queued %i of %i, dequeueing\n", b->qsize, b->socksize);
|
||||
bgpq_pipeline_dequeue(f, b);
|
||||
};
|
||||
|
||||
SX_DEBUG(debug_expander,"expander: sending '%s' (queued %i of %i)\n",
|
||||
request, b->qsize, b->socksize);
|
||||
SX_DEBUG(debug_expander,"expander: sending %s", request);
|
||||
|
||||
bp=malloc(sizeof(struct bgpq_prequest));
|
||||
bp = bgpq_request_alloc(request, callback, udata);
|
||||
if(!bp) {
|
||||
sx_report(SX_FATAL,"Unable to allocate %lu bytes: %s\n",
|
||||
(unsigned long)sizeof(struct bgpq_prequest),strerror(errno));
|
||||
(unsigned long)sizeof(struct bgpq_request),strerror(errno));
|
||||
exit(1);
|
||||
};
|
||||
memset(bp,0,sizeof(struct bgpq_prequest));
|
||||
|
||||
ret=fwrite(request,1,strlen(request),f);
|
||||
|
||||
if(ret!=rlen) {
|
||||
sx_report(SX_FATAL,"Partial write to IRRd, only %i bytes written: %s\n",
|
||||
ret,strerror(errno));
|
||||
exit(1);
|
||||
};
|
||||
|
||||
ret=fflush(f);
|
||||
if(ret) {
|
||||
sx_report(SX_FATAL, "fflush(%i) error: %s\n", fileno(f),
|
||||
strerror(errno));
|
||||
exit(1);
|
||||
};
|
||||
|
||||
strlcpy(bp->request,request,sizeof(bp->request));
|
||||
bp->callback=callback;
|
||||
bp->udata=udata;
|
||||
bp->size=rlen;
|
||||
b->qsize+=rlen;
|
||||
|
||||
if(b->lastpipe) {
|
||||
b->lastpipe->next=bp;
|
||||
b->lastpipe=bp;
|
||||
} else {
|
||||
b->firstpipe=b->lastpipe=bp;
|
||||
};
|
||||
b->piped++;
|
||||
if (STAILQ_EMPTY(&b->wq)) {
|
||||
ret=write(fd, request, bp->size);
|
||||
if (ret < 0)
|
||||
sx_report(SX_FATAL, "Error writing request: %s\n", strerror(errno));
|
||||
if (ret == bp->size) {
|
||||
STAILQ_INSERT_TAIL(&b->rq, bp, next);
|
||||
} else {
|
||||
bp->offset=ret;
|
||||
STAILQ_INSERT_TAIL(&b->wq, bp, next);
|
||||
};
|
||||
} else
|
||||
STAILQ_INSERT_TAIL(&b->wq, bp, next);
|
||||
|
||||
return 0;
|
||||
};
|
||||
|
||||
int
|
||||
bgpq_pipeline_dequeue(FILE* f, struct bgpq_expander* b)
|
||||
static void
|
||||
bgpq_write(struct bgpq_expander* b, int fd)
|
||||
{
|
||||
while(b->piped>0) {
|
||||
char response[256];
|
||||
struct bgpq_prequest* pipe;
|
||||
memset(response,0,sizeof(response));
|
||||
SX_DEBUG(debug_expander>2, "waiting for answer to %s",
|
||||
b->firstpipe->request);
|
||||
if(!fgets(response,sizeof(response),f)) {
|
||||
if(ferror(f)) {
|
||||
sx_report(SX_FATAL,"Error reading data from IRRd: %s (dequeue)"
|
||||
"\n", strerror(errno));
|
||||
} else {
|
||||
sx_report(SX_FATAL,"EOF from IRRd (dequeue)\n");
|
||||
};
|
||||
exit(1);
|
||||
};
|
||||
while(!STAILQ_EMPTY(&b->wq)) {
|
||||
struct bgpq_request* req = STAILQ_FIRST(&b->wq);
|
||||
int ret = write(fd, req->request+req->offset, req->size-req->offset);
|
||||
if (ret < 0)
|
||||
sx_report(SX_FATAL, "error writing data: %s\n", strerror(errno));
|
||||
|
||||
if (ret == req->size - req->offset) {
|
||||
/* this request was dequeued */
|
||||
STAILQ_REMOVE_HEAD(&b->wq, next);
|
||||
STAILQ_INSERT_TAIL(&b->rq, req, next);
|
||||
} else {
|
||||
req->offset += ret;
|
||||
break;
|
||||
};
|
||||
};
|
||||
};
|
||||
|
||||
int
|
||||
bgpq_read(struct bgpq_expander* b, int fd)
|
||||
{
|
||||
static char response[256];
|
||||
static int off = 0;
|
||||
|
||||
if (!STAILQ_EMPTY(&b->wq))
|
||||
bgpq_write(b, fd);
|
||||
|
||||
while(!STAILQ_EMPTY(&b->rq)) {
|
||||
struct bgpq_request* req = STAILQ_FIRST(&b->rq);
|
||||
SX_DEBUG(debug_expander>2, "waiting for answer to %s, init %i '%.*s'\n",
|
||||
req->request, off, off, response);
|
||||
int ret = 0;
|
||||
|
||||
if (strchr(response, '\n'))
|
||||
goto have;
|
||||
repeat:
|
||||
ret = read(fd, response+off, sizeof(response)-off);
|
||||
if (ret < 0) {
|
||||
if (errno == EAGAIN)
|
||||
goto repeat;
|
||||
sx_report(SX_FATAL,"Error reading data from IRRd: %s (dequeue)\n",
|
||||
strerror(errno));
|
||||
} else if (ret == 0) {
|
||||
sx_report(SX_FATAL,"EOF from IRRd (dequeue)\n");
|
||||
};
|
||||
off += ret;
|
||||
|
||||
if (!strchr(response, '\n'))
|
||||
goto repeat;
|
||||
have:
|
||||
SX_DEBUG(debug_expander>5, "got response of %.*s\n", off, response);
|
||||
if(response[0]=='A') {
|
||||
char* eon, *c;
|
||||
char* eon, *c, *cres;
|
||||
unsigned long togot=strtoul(response+1,&eon,10);
|
||||
char* recvbuffer=malloc(togot+256);
|
||||
char* recvbuffer=malloc(togot);
|
||||
int offset = 0;
|
||||
memset(recvbuffer,0,togot+256);
|
||||
memset(recvbuffer,0,togot);
|
||||
|
||||
if(!eon || *eon!='\n') {
|
||||
sx_report(SX_ERROR,"A-code finished with wrong char '%c'(%s)\n",
|
||||
eon?*eon:'0',response);
|
||||
exit(1);
|
||||
};
|
||||
if(fgets(recvbuffer,togot+256,f)==NULL) {
|
||||
if(ferror(f)) {
|
||||
sx_report(SX_FATAL,"Error reading IRRd: %s (dequeue, "
|
||||
"result)\n", strerror(errno));
|
||||
} else {
|
||||
sx_report(SX_FATAL,"EOF from IRRd (dequeue, result)\n");
|
||||
};
|
||||
exit(1);
|
||||
|
||||
offset = off - ((eon+1) - response);
|
||||
memcpy(recvbuffer, eon+1, off - ((eon+1) - response));
|
||||
|
||||
SX_DEBUG(debug_expander>5,
|
||||
"starting read with ready '%.*s', waiting for %lu\n",
|
||||
offset, recvbuffer, togot-offset);
|
||||
if (togot < offset) {
|
||||
memmove(response, recvbuffer+togot, offset-togot);
|
||||
off = offset-togot;
|
||||
goto have3;
|
||||
} else if (togot == offset) {
|
||||
goto have2;
|
||||
};
|
||||
if(strlen(recvbuffer) != togot) {
|
||||
sx_report(SX_FATAL, "expected %lu, got %lu\n",
|
||||
reread:
|
||||
|
||||
ret = read(fd, recvbuffer+offset, togot-offset);
|
||||
if (ret < 0) {
|
||||
if (errno == EAGAIN)
|
||||
goto reread;
|
||||
sx_report(SX_FATAL,"Error reading IRRd: %s (dequeue, result)\n",
|
||||
strerror(errno));
|
||||
} else if (ret == 0) {
|
||||
sx_report(SX_FATAL,"EOF from IRRd (dequeue, result)\n");
|
||||
};
|
||||
SX_DEBUG(debug_expander>5,
|
||||
"Read1: got '%.*s'\n", ret, recvbuffer+offset);
|
||||
offset+=ret;
|
||||
if(offset < togot) {
|
||||
sx_report(SX_NOTICE, "expected %lu, got %lu\n",
|
||||
togot, strlen(recvbuffer));
|
||||
exit(1);
|
||||
goto reread;
|
||||
};
|
||||
if(fgets(response,sizeof(response),f)==NULL) {
|
||||
if(ferror(f)) {
|
||||
sx_report(SX_FATAL,"Error reading IRRd: %s (dequeue,final)"
|
||||
")\n", strerror(errno));
|
||||
} else {
|
||||
sx_report(SX_FATAL,"EOF from IRRd (dequeue,final)\n");
|
||||
};
|
||||
exit(1);
|
||||
|
||||
have2:
|
||||
off=0;
|
||||
memset(response, 0, sizeof(response));
|
||||
reread2:
|
||||
ret = read(fd, response+off, sizeof(response) - off);
|
||||
if (ret < 0) {
|
||||
if (errno == EAGAIN)
|
||||
goto reread2;
|
||||
sx_report(SX_FATAL,"Error reading IRRd: %s (dequeue,final)\n",
|
||||
strerror(errno));
|
||||
} else if (ret == 0) {
|
||||
sx_report(SX_FATAL,"EOF from IRRd (dequeue,final)\n");
|
||||
};
|
||||
SX_DEBUG(debug_expander>5,
|
||||
"Read2: got '%.*s'\n", ret, response+off);
|
||||
off+=ret;
|
||||
|
||||
have3:
|
||||
if (!(cres = strchr(response, '\n')))
|
||||
goto reread2;
|
||||
|
||||
SX_DEBUG(debug_expander>=3,"Got %s (%lu bytes of %lu) in response "
|
||||
"to %sfinal code: %s",recvbuffer,strlen(recvbuffer),togot,
|
||||
b->firstpipe->request,response);
|
||||
"to %sfinal code: %.*s",recvbuffer,strlen(recvbuffer),togot,
|
||||
req->request,off,response);
|
||||
|
||||
for(c=recvbuffer; c<recvbuffer+togot;) {
|
||||
size_t spn=strcspn(c," \n");
|
||||
if(spn) c[spn]=0;
|
||||
if(c[0]==0) break;
|
||||
if(b->firstpipe->callback) {
|
||||
b->firstpipe->callback(c,b->firstpipe->udata,
|
||||
b->firstpipe->request);
|
||||
};
|
||||
req->callback(c, req->udata, req->request);
|
||||
c+=spn+1;
|
||||
};
|
||||
assert(c == recvbuffer+togot);
|
||||
memmove(response, cres+1, off-((cres+1)-response));
|
||||
off -= (cres+1)-response;
|
||||
memset(response+off, 0, sizeof(response) - off);
|
||||
SX_DEBUG(debug_expander>5,
|
||||
"fixed response of %i, %.*s\n", off, off, response);
|
||||
memset(recvbuffer,0,togot+2);
|
||||
free(recvbuffer);
|
||||
} else if(response[0]=='C') {
|
||||
/* No data */
|
||||
SX_DEBUG(debug_expander>2,"No data expanding %s\n",
|
||||
b->firstpipe->request);
|
||||
SX_DEBUG(debug_expander,"No data expanding %s\n", req->request);
|
||||
} else if(response[0]=='D') {
|
||||
/* .... */
|
||||
SX_DEBUG(debug_expander>2,"Key not found expanding %s\n",
|
||||
b->firstpipe->request);
|
||||
SX_DEBUG(debug_expander,"Key not found expanding %s\n",
|
||||
req->request);
|
||||
} else if(response[0]=='E') {
|
||||
sx_report(SX_ERROR, "Multiple keys expanding %s: %s\n",
|
||||
b->firstpipe->request, response);
|
||||
sx_report(SX_ERROR, "Multiple keys expanding %s: %s\n",
|
||||
req->request, response);
|
||||
} else if(response[0]=='F') {
|
||||
sx_report(SX_ERROR, "Error expanding %s: %s\n",
|
||||
b->firstpipe->request, response);
|
||||
sx_report(SX_ERROR, "Error expanding %s: %s\n",
|
||||
req->request, response);
|
||||
} else {
|
||||
sx_report(SX_ERROR,"Wrong reply: %s to %s\n", response,
|
||||
b->firstpipe->request);
|
||||
req->request);
|
||||
exit(1);
|
||||
};
|
||||
|
||||
pipe=b->firstpipe;
|
||||
b->firstpipe=b->firstpipe->next;
|
||||
STAILQ_REMOVE_HEAD(&b->rq, next);
|
||||
b->piped--;
|
||||
b->qsize-=pipe->size;
|
||||
free(pipe);
|
||||
/* XXXXXX - free(req) */
|
||||
};
|
||||
b->lastpipe=NULL;
|
||||
assert(b->firstpipe == NULL);
|
||||
return 0;
|
||||
};
|
||||
|
||||
int
|
||||
bgpq_expand_irrd(FILE* f, int (*callback)(char*, void*,char*), void* udata,
|
||||
bgpq_expand_irrd(int fd, int (*callback)(char*, void*,char*), void* udata,
|
||||
char* fmt, ...)
|
||||
{
|
||||
char request[128], response[128];
|
||||
va_list ap;
|
||||
int ret;
|
||||
int ret, off = 0;
|
||||
|
||||
va_start(ap,fmt);
|
||||
vsnprintf(request,sizeof(request),fmt,ap);
|
||||
@ -496,28 +556,36 @@ bgpq_expand_irrd(FILE* f, int (*callback)(char*, void*,char*), void* udata,
|
||||
|
||||
SX_DEBUG(debug_expander,"expander: sending '%s'\n", request);
|
||||
|
||||
ret=fwrite(request,1,strlen(request),f);
|
||||
ret=write(fd, request, strlen(request));
|
||||
if(ret!=strlen(request)) {
|
||||
sx_report(SX_FATAL,"Partial write to IRRd, only %i bytes written: %s\n",
|
||||
ret,strerror(errno));
|
||||
ret, strerror(errno));
|
||||
exit(1);
|
||||
};
|
||||
memset(response,0,sizeof(response));
|
||||
if(!fgets(response,sizeof(response),f)) {
|
||||
if(ferror(f)) {
|
||||
sx_report(SX_FATAL,"Error reading data from IRRd: %s (expand)"
|
||||
"\n", strerror(errno));
|
||||
exit(1);
|
||||
};
|
||||
sx_report(SX_FATAL,"EOF from IRRd (expand)\n");
|
||||
|
||||
repeat:
|
||||
ret = read(fd, response+off, sizeof(response)-off);
|
||||
if (ret < 0) {
|
||||
sx_report(SX_ERROR, "Error reading IRRd: %s\n", strerror(errno));
|
||||
exit(1);
|
||||
} else if (ret == 0) {
|
||||
sx_report(SX_FATAL, "EOF reading IRRd\n");
|
||||
exit(1);
|
||||
};
|
||||
|
||||
if (!strchr(response, '\n')) {
|
||||
off += ret;
|
||||
goto repeat;
|
||||
};
|
||||
|
||||
SX_DEBUG(debug_expander>2,"expander: initially got %lu bytes, '%s'\n",
|
||||
(unsigned long)strlen(response),response);
|
||||
if(response[0]=='A') {
|
||||
char* eon, *c;
|
||||
long togot=strtoul(response+1,&eon,10);
|
||||
char recvbuffer[togot+1];
|
||||
int recvoff = 0;
|
||||
|
||||
if(eon && *eon!='\n') {
|
||||
sx_report(SX_ERROR,"A-code finised with wrong char '%c' (%s)\n",
|
||||
@ -525,24 +593,30 @@ bgpq_expand_irrd(FILE* f, int (*callback)(char*, void*,char*), void* udata,
|
||||
exit(1);
|
||||
};
|
||||
|
||||
if(fgets(recvbuffer,togot+1,f)==NULL) {
|
||||
if(feof(f)) {
|
||||
sx_report(SX_FATAL,"EOF from IRRd (expand,result)\n");
|
||||
} else {
|
||||
sx_report(SX_FATAL,"Error reading IRRd: %s (expand,result)\n",
|
||||
strerror(errno));
|
||||
};
|
||||
exit(1);
|
||||
};
|
||||
if(fgets(response,sizeof(response),f)==NULL) {
|
||||
if(ferror(f)) {
|
||||
sx_report(SX_FATAL,"Error reading IRRd: %s (dequeue,final)",
|
||||
strerror(errno));
|
||||
} else {
|
||||
sx_report(SX_FATAL,"EOF from IRRd (dequeue,final)\n");
|
||||
};
|
||||
reread:
|
||||
ret = read(fd, recvbuffer+recvoff, togot+1-recvoff);
|
||||
if (ret == 0) {
|
||||
sx_report(SX_FATAL,"EOF from IRRd (expand,result)\n");
|
||||
} else if (ret < 0) {
|
||||
sx_report(SX_FATAL,"Error reading IRRd: %s (expand,result)\n",
|
||||
strerror(errno));
|
||||
};
|
||||
recvoff += ret;
|
||||
if (recvoff < togot+1)
|
||||
goto reread;
|
||||
|
||||
memset(response, 0, sizeof(response));
|
||||
off = 0;
|
||||
ret = read(fd, response+off, sizeof(response)-off);
|
||||
|
||||
if (ret < 0) {
|
||||
sx_report(SX_FATAL, "error reading IRRd: %s\n", strerror(errno));
|
||||
exit(1);
|
||||
} else if (ret == 0) {
|
||||
sx_report(SX_FATAL, "eof reading IRRd\n");
|
||||
exit(1);
|
||||
};
|
||||
|
||||
SX_DEBUG(debug_expander>2,"expander: final reply of %lu bytes, '%s'\n",
|
||||
(unsigned long)strlen(recvbuffer),recvbuffer);
|
||||
|
||||
@ -574,7 +648,6 @@ bgpq_expand(struct bgpq_expander* b)
|
||||
int fd=-1, err, ret;
|
||||
struct sx_slentry* mc;
|
||||
struct addrinfo hints, *res=NULL, *rp;
|
||||
FILE* f=NULL;
|
||||
memset(&hints,0,sizeof(struct addrinfo));
|
||||
|
||||
hints.ai_socktype=SOCK_STREAM;
|
||||
@ -604,34 +677,25 @@ bgpq_expand(struct bgpq_expander* b)
|
||||
err=sx_maxsockbuf(fd,SO_SNDBUF);
|
||||
if(err>0) {
|
||||
SX_DEBUG(debug_expander, "Acquired sendbuf of %i bytes\n", err);
|
||||
b->socksize=err;
|
||||
} else {
|
||||
shutdown(fd, SHUT_RDWR);
|
||||
close(fd);
|
||||
fd=-1;
|
||||
continue;
|
||||
};
|
||||
f=fdopen(fd,"a+");
|
||||
if(!f) {
|
||||
shutdown(fd,SHUT_RDWR);
|
||||
close(fd);
|
||||
fd=-1;
|
||||
f=NULL;
|
||||
continue;
|
||||
};
|
||||
break;
|
||||
};
|
||||
freeaddrinfo(res);
|
||||
|
||||
if(!f) {
|
||||
if(fd == -1) {
|
||||
/* all our attempts to connect failed */
|
||||
sx_report(SX_ERROR,"All attempts to connect %s failed, last"
|
||||
" error: %s\n", b->server, strerror(errno));
|
||||
exit(1);
|
||||
};
|
||||
|
||||
if((ret=fwrite("!!\n",1,3,f))!=3) {
|
||||
sx_report(SX_ERROR,"Partial fwrite to IRRd: %i bytes, %s\n",
|
||||
if((ret=write(fd, "!!\n", 3))!=3) {
|
||||
sx_report(SX_ERROR,"Partial write to IRRd: %i bytes, %s\n",
|
||||
ret, strerror(errno));
|
||||
exit(1);
|
||||
};
|
||||
@ -640,8 +704,9 @@ bgpq_expand(struct bgpq_expander* b)
|
||||
char sources[128];
|
||||
snprintf(sources,sizeof(sources),"!s%s\n", b->sources);
|
||||
SX_DEBUG(debug_expander,"Requesting sources %s", sources);
|
||||
fwrite(sources,strlen(sources),1,f);
|
||||
fgets(sources,sizeof(sources),f);
|
||||
write(fd, sources, strlen(sources));
|
||||
memset(sources, 0, sizeof(sources));
|
||||
read(fd, sources, sizeof(sources));
|
||||
SX_DEBUG(debug_expander,"Got answer %s", sources);
|
||||
if(sources[0]!='C') {
|
||||
sx_report(SX_ERROR, "Invalid source(s) '%s': %s\n", b->sources,
|
||||
@ -653,15 +718,17 @@ bgpq_expand(struct bgpq_expander* b)
|
||||
if(b->identify) {
|
||||
char ident[128];
|
||||
snprintf(ident,sizeof(ident),"!n" PACKAGE_STRING "\n");
|
||||
fwrite(ident,strlen(ident),1,f);
|
||||
fgets(ident,sizeof(ident),f);
|
||||
write(fd, ident, strlen(ident));
|
||||
read(fd, ident, sizeof(ident));
|
||||
};
|
||||
|
||||
fcntl(fd, F_SETFL, O_NONBLOCK|(fcntl(fd, F_GETFL)));
|
||||
|
||||
for(mc=b->macroses;mc;mc=mc->next) {
|
||||
if (!b->maxdepth) {
|
||||
bgpq_expand_irrd(f,bgpq_expanded_macro,b,"!i%s,1\n",mc->text);
|
||||
bgpq_expand_irrd(fd,bgpq_expanded_macro,b,"!i%s,1\n",mc->text);
|
||||
} else {
|
||||
struct limited_req* lr = lreq_alloc(b, 0, f);
|
||||
struct limited_req* lr = lreq_alloc(b, 0, fd);
|
||||
bgpq_expander_add_already(b,mc->text);
|
||||
if (!lr) {
|
||||
sx_report(SX_FATAL, "Unable to allocate memory: %s\n",
|
||||
@ -669,26 +736,29 @@ bgpq_expand(struct bgpq_expander* b)
|
||||
exit(1);
|
||||
};
|
||||
if (pipelining) {
|
||||
bgpq_pipeline(b,f,bgpq_expanded_macro_limit,lr,"!i%s\n",
|
||||
bgpq_pipeline(b,fd,bgpq_expanded_macro_limit,lr,"!i%s\n",
|
||||
mc->text);
|
||||
} else {
|
||||
bgpq_expand_irrd(f,bgpq_expanded_macro_limit,lr,"!i%s\n",
|
||||
bgpq_expand_irrd(fd,bgpq_expanded_macro_limit,lr,"!i%s\n",
|
||||
mc->text);
|
||||
};
|
||||
};
|
||||
};
|
||||
|
||||
if(pipelining && b->firstpipe) {
|
||||
bgpq_pipeline_dequeue(f,b);
|
||||
if(pipelining) {
|
||||
if(!STAILQ_EMPTY(&b->wq))
|
||||
bgpq_write(b, fd);
|
||||
if (!STAILQ_EMPTY(&b->rq))
|
||||
bgpq_read(b, fd);
|
||||
};
|
||||
|
||||
if(b->generation>=T_PREFIXLIST) {
|
||||
unsigned i, j, k;
|
||||
for(mc=b->rsets;mc;mc=mc->next) {
|
||||
if(b->family==AF_INET) {
|
||||
bgpq_expand_irrd(f,bgpq_expanded_prefix,b,"!i%s,1\n",mc->text);
|
||||
bgpq_expand_irrd(fd,bgpq_expanded_prefix,b,"!i%s,1\n",mc->text);
|
||||
} else {
|
||||
bgpq_expand_irrd(f,bgpq_expanded_v6prefix,b,"!i%s,1\n",
|
||||
bgpq_expand_irrd(fd,bgpq_expanded_v6prefix,b,"!i%s,1\n",
|
||||
mc->text);
|
||||
};
|
||||
};
|
||||
@ -700,33 +770,33 @@ bgpq_expand(struct bgpq_expander* b)
|
||||
if(b->family==AF_INET6) {
|
||||
if(!pipelining) {
|
||||
if(k>0)
|
||||
bgpq_expand_irrd(f,bgpq_expanded_v6prefix,b,
|
||||
"!6as%u.%u\r\n", k, i*8+j);
|
||||
bgpq_expand_irrd(fd,bgpq_expanded_v6prefix,
|
||||
b,"!6as%u.%u\r\n", k, i*8+j);
|
||||
else
|
||||
bgpq_expand_irrd(f,bgpq_expanded_v6prefix,b,
|
||||
"!6as%u\r\n", i*8+j);
|
||||
bgpq_expand_irrd(fd,bgpq_expanded_v6prefix,
|
||||
b,"!6as%u\r\n", i*8+j);
|
||||
} else {
|
||||
if(k>0)
|
||||
bgpq_pipeline(b,f,bgpq_expanded_v6prefix,b,
|
||||
bgpq_pipeline(b,fd,bgpq_expanded_v6prefix,b,
|
||||
"!6as%u.%u\r\n", k, i*8+j);
|
||||
else
|
||||
bgpq_pipeline(b,f,bgpq_expanded_v6prefix,b,
|
||||
bgpq_pipeline(b,fd,bgpq_expanded_v6prefix,b,
|
||||
"!6as%u\r\n", i*8+j);
|
||||
};
|
||||
} else {
|
||||
if(!pipelining) {
|
||||
if(k>0)
|
||||
bgpq_expand_irrd(f,bgpq_expanded_prefix,b,
|
||||
bgpq_expand_irrd(fd,bgpq_expanded_prefix,b,
|
||||
"!gas%u.%u\n", k, i*8+j);
|
||||
else
|
||||
bgpq_expand_irrd(f,bgpq_expanded_prefix,b,
|
||||
bgpq_expand_irrd(fd,bgpq_expanded_prefix,b,
|
||||
"!gas%u\n", i*8+j);
|
||||
} else {
|
||||
if(k>0)
|
||||
bgpq_pipeline(b,f,bgpq_expanded_prefix,b,
|
||||
bgpq_pipeline(b,fd,bgpq_expanded_prefix,b,
|
||||
"!gas%u.%u\n", k, i*8+j);
|
||||
else
|
||||
bgpq_pipeline(b,f,bgpq_expanded_prefix,b,
|
||||
bgpq_pipeline(b,fd,bgpq_expanded_prefix,b,
|
||||
"!gas%u\n", i*8+j);
|
||||
};
|
||||
};
|
||||
@ -734,13 +804,17 @@ bgpq_expand(struct bgpq_expander* b)
|
||||
};
|
||||
};
|
||||
};
|
||||
if(pipelining && b->firstpipe) {
|
||||
bgpq_pipeline_dequeue(f,b);
|
||||
if(pipelining) {
|
||||
if(!STAILQ_EMPTY(&b->wq))
|
||||
bgpq_write(b, fd);
|
||||
if (!STAILQ_EMPTY(&b->rq))
|
||||
bgpq_read(b, fd);
|
||||
};
|
||||
};
|
||||
|
||||
fwrite("!q\n",1,3,f);
|
||||
fclose(f);
|
||||
write(fd, "!q\n",3);
|
||||
shutdown(fd, SHUT_RDWR);
|
||||
close(fd);
|
||||
return 1;
|
||||
};
|
||||
|
||||
|
Reference in New Issue
Block a user