diff --git a/bgpq3.h b/bgpq3.h index 8f1e599..72965eb 100644 --- a/bgpq3.h +++ b/bgpq3.h @@ -1,10 +1,12 @@ #ifndef BGPQ3_H_ #define BGPQ3_H_ +#include + #include "sx_prefix.h" #include "sx_slentry.h" -typedef enum { +typedef enum { V_CISCO = 0, V_JUNIPER, V_CISCO_XR, @@ -13,7 +15,7 @@ typedef enum { V_FORMAT } bgpq_vendor_t; -typedef enum { +typedef enum { T_NONE = 0, T_ASPATH, T_OASPATH, @@ -21,15 +23,17 @@ typedef enum { T_EACL } bgpq_gen_t; -struct bgpq_prequest { - struct bgpq_prequest* next; - char request[128]; - int size; +struct bgpq_request { + STAILQ_ENTRY(bgpq_request) next; + char* request; + int size, offset; int (*callback)(char*, void*, char*); void *udata; + char* response; + int rsize, roffset; }; -struct bgpq_expander { +struct bgpq_expander { struct sx_radix_tree* tree; struct sx_slentry* macroses; struct sx_slentry* rsets; @@ -52,8 +56,7 @@ struct bgpq_expander { char* server; char* format; unsigned maxlen; - int socksize; - int qsize; + STAILQ_HEAD(bgpq_requests, bgpq_request) wq, rq; }; diff --git a/bgpq_expander.c b/bgpq_expander.c index ca7e10c..5377257 100644 --- a/bgpq_expander.c +++ b/bgpq_expander.c @@ -6,6 +6,7 @@ #include #include +#include #include #include #include @@ -28,11 +29,11 @@ int expand_special_asn=0; struct limited_req { int cdepth; struct bgpq_expander* b; - FILE* f; + int fd; }; struct limited_req* -lreq_alloc(struct bgpq_expander* b, int cdepth, FILE* f) +lreq_alloc(struct bgpq_expander* b, int cdepth, int fd) { struct limited_req* lr = malloc(sizeof(struct limited_req)); if (!lr) @@ -40,7 +41,7 @@ lreq_alloc(struct bgpq_expander* b, int cdepth, FILE* f) memset(lr, 0, sizeof(struct limited_req)); lr->cdepth=cdepth; lr->b=b; - lr->f=f; + lr->fd=fd; return lr; }; @@ -69,6 +70,9 @@ bgpq_expander_init(struct bgpq_expander* b, int af) b->identify=1; b->server="whois.radb.net"; + STAILQ_INIT(&b->wq); + STAILQ_INIT(&b->rq); + return 1; fixups: /* if(b->tree) XXXXXXXXXXXXX sx_radix_tree_destroy(b->tree); */ @@ -238,9 +242,9 @@ bgpq_expanded_macro(char* as, void* udata, char* request) return 1; }; -int bgpq_pipeline(struct bgpq_expander* b, FILE* f, +int bgpq_pipeline(struct bgpq_expander* b, int fd, int (*callback)(char*, void*,char*), void* udata, char* fmt, ...); -int bgpq_expand_irrd(FILE* f, int (*callback)(char*, void*,char*), void* udata, +int bgpq_expand_irrd(int fd, int (*callback)(char*, void*,char*), void* udata, char* fmt, ...); int @@ -259,13 +263,13 @@ addasset: already=already->next; }; if(lr->cdepth + 1 < lr->b->maxdepth) { - struct limited_req* lr1 = lreq_alloc(lr->b,lr->cdepth+1,lr->f); + struct limited_req* lr1 = lreq_alloc(lr->b,lr->cdepth+1,lr->fd); bgpq_expander_add_already(lr->b,as); if (pipelining) { - bgpq_pipeline(lr->b,lr->f,bgpq_expanded_macro_limit,lr1, + bgpq_pipeline(lr->b,lr->fd,bgpq_expanded_macro_limit,lr1, "!i%s\n",as); } else { - bgpq_expand_irrd(lr->f,bgpq_expanded_macro_limit,lr1, + bgpq_expand_irrd(lr->fd,bgpq_expanded_macro_limit,lr1, "!i%s\n",as); }; } else { @@ -317,178 +321,234 @@ bgpq_expanded_v6prefix(char* prefix, void* udata, char* request) return 1; }; -int bgpq_pipeline_dequeue(FILE* f, struct bgpq_expander* b); +int bgpq_pipeline_dequeue(int fd, struct bgpq_expander* b); + +static struct bgpq_request* +bgpq_request_alloc(char* request, int (*callback)(char*, void*, char*), + void* udata) +{ + struct bgpq_request* bp = malloc(sizeof(struct bgpq_request)); + if (!bp) + return NULL; + memset(bp, 0, sizeof(struct bgpq_request)); + bp->request = strdup(request); + bp->offset = 0; + bp->size = strlen(bp->request); + bp->callback = callback; + bp->udata = udata; + return bp; +}; int -bgpq_pipeline(struct bgpq_expander* b, FILE* f, +bgpq_pipeline(struct bgpq_expander* b, int fd, int (*callback)(char*, void*,char*), void* udata, char* fmt, ...) { char request[128]; int ret, rlen; - struct bgpq_prequest* bp=NULL; + struct bgpq_request* bp=NULL; va_list ap; va_start(ap,fmt); vsnprintf(request,sizeof(request),fmt,ap); va_end(ap); rlen=strlen(request); - if(rlen+b->qsize >= b->socksize) { - SX_DEBUG(debug_expander, "looks like socket buffer shortage, " - "queued %i of %i, dequeueing\n", b->qsize, b->socksize); - bgpq_pipeline_dequeue(f, b); - }; - SX_DEBUG(debug_expander,"expander: sending '%s' (queued %i of %i)\n", - request, b->qsize, b->socksize); + SX_DEBUG(debug_expander,"expander: sending %s", request); - bp=malloc(sizeof(struct bgpq_prequest)); + bp = bgpq_request_alloc(request, callback, udata); if(!bp) { sx_report(SX_FATAL,"Unable to allocate %lu bytes: %s\n", - (unsigned long)sizeof(struct bgpq_prequest),strerror(errno)); + (unsigned long)sizeof(struct bgpq_request),strerror(errno)); exit(1); }; - memset(bp,0,sizeof(struct bgpq_prequest)); - - ret=fwrite(request,1,strlen(request),f); - - if(ret!=rlen) { - sx_report(SX_FATAL,"Partial write to IRRd, only %i bytes written: %s\n", - ret,strerror(errno)); - exit(1); - }; - - ret=fflush(f); - if(ret) { - sx_report(SX_FATAL, "fflush(%i) error: %s\n", fileno(f), - strerror(errno)); - exit(1); - }; - - strlcpy(bp->request,request,sizeof(bp->request)); - bp->callback=callback; - bp->udata=udata; - bp->size=rlen; - b->qsize+=rlen; - - if(b->lastpipe) { - b->lastpipe->next=bp; - b->lastpipe=bp; - } else { - b->firstpipe=b->lastpipe=bp; - }; - b->piped++; + if (STAILQ_EMPTY(&b->wq)) { + ret=write(fd, request, bp->size); + if (ret < 0) + sx_report(SX_FATAL, "Error writing request: %s\n", strerror(errno)); + if (ret == bp->size) { + STAILQ_INSERT_TAIL(&b->rq, bp, next); + } else { + bp->offset=ret; + STAILQ_INSERT_TAIL(&b->wq, bp, next); + }; + } else + STAILQ_INSERT_TAIL(&b->wq, bp, next); return 0; }; -int -bgpq_pipeline_dequeue(FILE* f, struct bgpq_expander* b) +static void +bgpq_write(struct bgpq_expander* b, int fd) { - while(b->piped>0) { - char response[256]; - struct bgpq_prequest* pipe; - memset(response,0,sizeof(response)); - SX_DEBUG(debug_expander>2, "waiting for answer to %s", - b->firstpipe->request); - if(!fgets(response,sizeof(response),f)) { - if(ferror(f)) { - sx_report(SX_FATAL,"Error reading data from IRRd: %s (dequeue)" - "\n", strerror(errno)); - } else { - sx_report(SX_FATAL,"EOF from IRRd (dequeue)\n"); - }; - exit(1); - }; + while(!STAILQ_EMPTY(&b->wq)) { + struct bgpq_request* req = STAILQ_FIRST(&b->wq); + int ret = write(fd, req->request+req->offset, req->size-req->offset); + if (ret < 0) + sx_report(SX_FATAL, "error writing data: %s\n", strerror(errno)); + if (ret == req->size - req->offset) { + /* this request was dequeued */ + STAILQ_REMOVE_HEAD(&b->wq, next); + STAILQ_INSERT_TAIL(&b->rq, req, next); + } else { + req->offset += ret; + break; + }; + }; +}; + +int +bgpq_read(struct bgpq_expander* b, int fd) +{ + static char response[256]; + static int off = 0; + + if (!STAILQ_EMPTY(&b->wq)) + bgpq_write(b, fd); + + while(!STAILQ_EMPTY(&b->rq)) { + struct bgpq_request* req = STAILQ_FIRST(&b->rq); + SX_DEBUG(debug_expander>2, "waiting for answer to %s, init %i '%.*s'\n", + req->request, off, off, response); + int ret = 0; + + if (strchr(response, '\n')) + goto have; +repeat: + ret = read(fd, response+off, sizeof(response)-off); + if (ret < 0) { + if (errno == EAGAIN) + goto repeat; + sx_report(SX_FATAL,"Error reading data from IRRd: %s (dequeue)\n", + strerror(errno)); + } else if (ret == 0) { + sx_report(SX_FATAL,"EOF from IRRd (dequeue)\n"); + }; + off += ret; + + if (!strchr(response, '\n')) + goto repeat; +have: + SX_DEBUG(debug_expander>5, "got response of %.*s\n", off, response); if(response[0]=='A') { - char* eon, *c; + char* eon, *c, *cres; unsigned long togot=strtoul(response+1,&eon,10); - char* recvbuffer=malloc(togot+256); + char* recvbuffer=malloc(togot); int offset = 0; - memset(recvbuffer,0,togot+256); + memset(recvbuffer,0,togot); if(!eon || *eon!='\n') { sx_report(SX_ERROR,"A-code finished with wrong char '%c'(%s)\n", eon?*eon:'0',response); exit(1); }; - if(fgets(recvbuffer,togot+256,f)==NULL) { - if(ferror(f)) { - sx_report(SX_FATAL,"Error reading IRRd: %s (dequeue, " - "result)\n", strerror(errno)); - } else { - sx_report(SX_FATAL,"EOF from IRRd (dequeue, result)\n"); - }; - exit(1); + + offset = off - ((eon+1) - response); + memcpy(recvbuffer, eon+1, off - ((eon+1) - response)); + + SX_DEBUG(debug_expander>5, + "starting read with ready '%.*s', waiting for %lu\n", + offset, recvbuffer, togot-offset); + if (togot < offset) { + memmove(response, recvbuffer+togot, offset-togot); + off = offset-togot; + goto have3; + } else if (togot == offset) { + goto have2; }; - if(strlen(recvbuffer) != togot) { - sx_report(SX_FATAL, "expected %lu, got %lu\n", +reread: + + ret = read(fd, recvbuffer+offset, togot-offset); + if (ret < 0) { + if (errno == EAGAIN) + goto reread; + sx_report(SX_FATAL,"Error reading IRRd: %s (dequeue, result)\n", + strerror(errno)); + } else if (ret == 0) { + sx_report(SX_FATAL,"EOF from IRRd (dequeue, result)\n"); + }; + SX_DEBUG(debug_expander>5, + "Read1: got '%.*s'\n", ret, recvbuffer+offset); + offset+=ret; + if(offset < togot) { + sx_report(SX_NOTICE, "expected %lu, got %lu\n", togot, strlen(recvbuffer)); - exit(1); + goto reread; }; - if(fgets(response,sizeof(response),f)==NULL) { - if(ferror(f)) { - sx_report(SX_FATAL,"Error reading IRRd: %s (dequeue,final)" - ")\n", strerror(errno)); - } else { - sx_report(SX_FATAL,"EOF from IRRd (dequeue,final)\n"); - }; - exit(1); + +have2: + off=0; + memset(response, 0, sizeof(response)); +reread2: + ret = read(fd, response+off, sizeof(response) - off); + if (ret < 0) { + if (errno == EAGAIN) + goto reread2; + sx_report(SX_FATAL,"Error reading IRRd: %s (dequeue,final)\n", + strerror(errno)); + } else if (ret == 0) { + sx_report(SX_FATAL,"EOF from IRRd (dequeue,final)\n"); }; + SX_DEBUG(debug_expander>5, + "Read2: got '%.*s'\n", ret, response+off); + off+=ret; + +have3: + if (!(cres = strchr(response, '\n'))) + goto reread2; + SX_DEBUG(debug_expander>=3,"Got %s (%lu bytes of %lu) in response " - "to %sfinal code: %s",recvbuffer,strlen(recvbuffer),togot, - b->firstpipe->request,response); + "to %sfinal code: %.*s",recvbuffer,strlen(recvbuffer),togot, + req->request,off,response); for(c=recvbuffer; cfirstpipe->callback) { - b->firstpipe->callback(c,b->firstpipe->udata, - b->firstpipe->request); - }; + req->callback(c, req->udata, req->request); c+=spn+1; }; + assert(c == recvbuffer+togot); + memmove(response, cres+1, off-((cres+1)-response)); + off -= (cres+1)-response; + memset(response+off, 0, sizeof(response) - off); + SX_DEBUG(debug_expander>5, + "fixed response of %i, %.*s\n", off, off, response); memset(recvbuffer,0,togot+2); free(recvbuffer); } else if(response[0]=='C') { /* No data */ - SX_DEBUG(debug_expander>2,"No data expanding %s\n", - b->firstpipe->request); + SX_DEBUG(debug_expander,"No data expanding %s\n", req->request); } else if(response[0]=='D') { /* .... */ - SX_DEBUG(debug_expander>2,"Key not found expanding %s\n", - b->firstpipe->request); + SX_DEBUG(debug_expander,"Key not found expanding %s\n", + req->request); } else if(response[0]=='E') { - sx_report(SX_ERROR, "Multiple keys expanding %s: %s\n", - b->firstpipe->request, response); + sx_report(SX_ERROR, "Multiple keys expanding %s: %s\n", + req->request, response); } else if(response[0]=='F') { - sx_report(SX_ERROR, "Error expanding %s: %s\n", - b->firstpipe->request, response); + sx_report(SX_ERROR, "Error expanding %s: %s\n", + req->request, response); } else { sx_report(SX_ERROR,"Wrong reply: %s to %s\n", response, - b->firstpipe->request); + req->request); exit(1); }; - pipe=b->firstpipe; - b->firstpipe=b->firstpipe->next; + STAILQ_REMOVE_HEAD(&b->rq, next); b->piped--; - b->qsize-=pipe->size; - free(pipe); + /* XXXXXX - free(req) */ }; - b->lastpipe=NULL; - assert(b->firstpipe == NULL); return 0; }; int -bgpq_expand_irrd(FILE* f, int (*callback)(char*, void*,char*), void* udata, +bgpq_expand_irrd(int fd, int (*callback)(char*, void*,char*), void* udata, char* fmt, ...) { char request[128], response[128]; va_list ap; - int ret; + int ret, off = 0; va_start(ap,fmt); vsnprintf(request,sizeof(request),fmt,ap); @@ -496,28 +556,36 @@ bgpq_expand_irrd(FILE* f, int (*callback)(char*, void*,char*), void* udata, SX_DEBUG(debug_expander,"expander: sending '%s'\n", request); - ret=fwrite(request,1,strlen(request),f); + ret=write(fd, request, strlen(request)); if(ret!=strlen(request)) { sx_report(SX_FATAL,"Partial write to IRRd, only %i bytes written: %s\n", - ret,strerror(errno)); + ret, strerror(errno)); exit(1); }; memset(response,0,sizeof(response)); - if(!fgets(response,sizeof(response),f)) { - if(ferror(f)) { - sx_report(SX_FATAL,"Error reading data from IRRd: %s (expand)" - "\n", strerror(errno)); - exit(1); - }; - sx_report(SX_FATAL,"EOF from IRRd (expand)\n"); + +repeat: + ret = read(fd, response+off, sizeof(response)-off); + if (ret < 0) { + sx_report(SX_ERROR, "Error reading IRRd: %s\n", strerror(errno)); + exit(1); + } else if (ret == 0) { + sx_report(SX_FATAL, "EOF reading IRRd\n"); exit(1); }; + + if (!strchr(response, '\n')) { + off += ret; + goto repeat; + }; + SX_DEBUG(debug_expander>2,"expander: initially got %lu bytes, '%s'\n", (unsigned long)strlen(response),response); if(response[0]=='A') { char* eon, *c; long togot=strtoul(response+1,&eon,10); char recvbuffer[togot+1]; + int recvoff = 0; if(eon && *eon!='\n') { sx_report(SX_ERROR,"A-code finised with wrong char '%c' (%s)\n", @@ -525,24 +593,30 @@ bgpq_expand_irrd(FILE* f, int (*callback)(char*, void*,char*), void* udata, exit(1); }; - if(fgets(recvbuffer,togot+1,f)==NULL) { - if(feof(f)) { - sx_report(SX_FATAL,"EOF from IRRd (expand,result)\n"); - } else { - sx_report(SX_FATAL,"Error reading IRRd: %s (expand,result)\n", - strerror(errno)); - }; - exit(1); - }; - if(fgets(response,sizeof(response),f)==NULL) { - if(ferror(f)) { - sx_report(SX_FATAL,"Error reading IRRd: %s (dequeue,final)", - strerror(errno)); - } else { - sx_report(SX_FATAL,"EOF from IRRd (dequeue,final)\n"); - }; +reread: + ret = read(fd, recvbuffer+recvoff, togot+1-recvoff); + if (ret == 0) { + sx_report(SX_FATAL,"EOF from IRRd (expand,result)\n"); + } else if (ret < 0) { + sx_report(SX_FATAL,"Error reading IRRd: %s (expand,result)\n", + strerror(errno)); + }; + recvoff += ret; + if (recvoff < togot+1) + goto reread; + + memset(response, 0, sizeof(response)); + off = 0; + ret = read(fd, response+off, sizeof(response)-off); + + if (ret < 0) { + sx_report(SX_FATAL, "error reading IRRd: %s\n", strerror(errno)); + exit(1); + } else if (ret == 0) { + sx_report(SX_FATAL, "eof reading IRRd\n"); exit(1); }; + SX_DEBUG(debug_expander>2,"expander: final reply of %lu bytes, '%s'\n", (unsigned long)strlen(recvbuffer),recvbuffer); @@ -574,7 +648,6 @@ bgpq_expand(struct bgpq_expander* b) int fd=-1, err, ret; struct sx_slentry* mc; struct addrinfo hints, *res=NULL, *rp; - FILE* f=NULL; memset(&hints,0,sizeof(struct addrinfo)); hints.ai_socktype=SOCK_STREAM; @@ -604,34 +677,25 @@ bgpq_expand(struct bgpq_expander* b) err=sx_maxsockbuf(fd,SO_SNDBUF); if(err>0) { SX_DEBUG(debug_expander, "Acquired sendbuf of %i bytes\n", err); - b->socksize=err; } else { shutdown(fd, SHUT_RDWR); close(fd); fd=-1; continue; }; - f=fdopen(fd,"a+"); - if(!f) { - shutdown(fd,SHUT_RDWR); - close(fd); - fd=-1; - f=NULL; - continue; - }; break; }; freeaddrinfo(res); - if(!f) { + if(fd == -1) { /* all our attempts to connect failed */ sx_report(SX_ERROR,"All attempts to connect %s failed, last" " error: %s\n", b->server, strerror(errno)); exit(1); }; - if((ret=fwrite("!!\n",1,3,f))!=3) { - sx_report(SX_ERROR,"Partial fwrite to IRRd: %i bytes, %s\n", + if((ret=write(fd, "!!\n", 3))!=3) { + sx_report(SX_ERROR,"Partial write to IRRd: %i bytes, %s\n", ret, strerror(errno)); exit(1); }; @@ -640,8 +704,9 @@ bgpq_expand(struct bgpq_expander* b) char sources[128]; snprintf(sources,sizeof(sources),"!s%s\n", b->sources); SX_DEBUG(debug_expander,"Requesting sources %s", sources); - fwrite(sources,strlen(sources),1,f); - fgets(sources,sizeof(sources),f); + write(fd, sources, strlen(sources)); + memset(sources, 0, sizeof(sources)); + read(fd, sources, sizeof(sources)); SX_DEBUG(debug_expander,"Got answer %s", sources); if(sources[0]!='C') { sx_report(SX_ERROR, "Invalid source(s) '%s': %s\n", b->sources, @@ -653,15 +718,17 @@ bgpq_expand(struct bgpq_expander* b) if(b->identify) { char ident[128]; snprintf(ident,sizeof(ident),"!n" PACKAGE_STRING "\n"); - fwrite(ident,strlen(ident),1,f); - fgets(ident,sizeof(ident),f); + write(fd, ident, strlen(ident)); + read(fd, ident, sizeof(ident)); }; + fcntl(fd, F_SETFL, O_NONBLOCK|(fcntl(fd, F_GETFL))); + for(mc=b->macroses;mc;mc=mc->next) { if (!b->maxdepth) { - bgpq_expand_irrd(f,bgpq_expanded_macro,b,"!i%s,1\n",mc->text); + bgpq_expand_irrd(fd,bgpq_expanded_macro,b,"!i%s,1\n",mc->text); } else { - struct limited_req* lr = lreq_alloc(b, 0, f); + struct limited_req* lr = lreq_alloc(b, 0, fd); bgpq_expander_add_already(b,mc->text); if (!lr) { sx_report(SX_FATAL, "Unable to allocate memory: %s\n", @@ -669,26 +736,29 @@ bgpq_expand(struct bgpq_expander* b) exit(1); }; if (pipelining) { - bgpq_pipeline(b,f,bgpq_expanded_macro_limit,lr,"!i%s\n", + bgpq_pipeline(b,fd,bgpq_expanded_macro_limit,lr,"!i%s\n", mc->text); } else { - bgpq_expand_irrd(f,bgpq_expanded_macro_limit,lr,"!i%s\n", + bgpq_expand_irrd(fd,bgpq_expanded_macro_limit,lr,"!i%s\n", mc->text); }; }; }; - if(pipelining && b->firstpipe) { - bgpq_pipeline_dequeue(f,b); + if(pipelining) { + if(!STAILQ_EMPTY(&b->wq)) + bgpq_write(b, fd); + if (!STAILQ_EMPTY(&b->rq)) + bgpq_read(b, fd); }; if(b->generation>=T_PREFIXLIST) { unsigned i, j, k; for(mc=b->rsets;mc;mc=mc->next) { if(b->family==AF_INET) { - bgpq_expand_irrd(f,bgpq_expanded_prefix,b,"!i%s,1\n",mc->text); + bgpq_expand_irrd(fd,bgpq_expanded_prefix,b,"!i%s,1\n",mc->text); } else { - bgpq_expand_irrd(f,bgpq_expanded_v6prefix,b,"!i%s,1\n", + bgpq_expand_irrd(fd,bgpq_expanded_v6prefix,b,"!i%s,1\n", mc->text); }; }; @@ -700,33 +770,33 @@ bgpq_expand(struct bgpq_expander* b) if(b->family==AF_INET6) { if(!pipelining) { if(k>0) - bgpq_expand_irrd(f,bgpq_expanded_v6prefix,b, - "!6as%u.%u\r\n", k, i*8+j); + bgpq_expand_irrd(fd,bgpq_expanded_v6prefix, + b,"!6as%u.%u\r\n", k, i*8+j); else - bgpq_expand_irrd(f,bgpq_expanded_v6prefix,b, - "!6as%u\r\n", i*8+j); + bgpq_expand_irrd(fd,bgpq_expanded_v6prefix, + b,"!6as%u\r\n", i*8+j); } else { if(k>0) - bgpq_pipeline(b,f,bgpq_expanded_v6prefix,b, + bgpq_pipeline(b,fd,bgpq_expanded_v6prefix,b, "!6as%u.%u\r\n", k, i*8+j); else - bgpq_pipeline(b,f,bgpq_expanded_v6prefix,b, + bgpq_pipeline(b,fd,bgpq_expanded_v6prefix,b, "!6as%u\r\n", i*8+j); }; } else { if(!pipelining) { if(k>0) - bgpq_expand_irrd(f,bgpq_expanded_prefix,b, + bgpq_expand_irrd(fd,bgpq_expanded_prefix,b, "!gas%u.%u\n", k, i*8+j); else - bgpq_expand_irrd(f,bgpq_expanded_prefix,b, + bgpq_expand_irrd(fd,bgpq_expanded_prefix,b, "!gas%u\n", i*8+j); } else { if(k>0) - bgpq_pipeline(b,f,bgpq_expanded_prefix,b, + bgpq_pipeline(b,fd,bgpq_expanded_prefix,b, "!gas%u.%u\n", k, i*8+j); else - bgpq_pipeline(b,f,bgpq_expanded_prefix,b, + bgpq_pipeline(b,fd,bgpq_expanded_prefix,b, "!gas%u\n", i*8+j); }; }; @@ -734,13 +804,17 @@ bgpq_expand(struct bgpq_expander* b) }; }; }; - if(pipelining && b->firstpipe) { - bgpq_pipeline_dequeue(f,b); + if(pipelining) { + if(!STAILQ_EMPTY(&b->wq)) + bgpq_write(b, fd); + if (!STAILQ_EMPTY(&b->rq)) + bgpq_read(b, fd); }; }; - fwrite("!q\n",1,3,f); - fclose(f); + write(fd, "!q\n",3); + shutdown(fd, SHUT_RDWR); + close(fd); return 1; };