1
0
mirror of https://github.com/bgp/bgpq4.git synced 2024-05-11 05:55:05 +00:00

pipelining now counts buffer size.

This commit is contained in:
Alexandre Snarskii
2015-06-23 14:56:05 +03:00
parent 19fd9cb752
commit 9eb665209b
3 changed files with 41 additions and 10 deletions

View File

@ -23,6 +23,7 @@ typedef enum {
struct bgpq_prequest {
struct bgpq_prequest* next;
char request[128];
int size;
int (*callback)(char*, void*);
void *udata;
};
@ -46,6 +47,8 @@ struct bgpq_expander {
char* match;
char* server;
unsigned maxlen;
int socksize;
int qsize;
};

View File

@ -31,7 +31,7 @@ bgpq_expander_init(struct bgpq_expander* b, int af)
if(!b) return 0;
memset(b,0,sizeof(struct bgpq_expander));
b->tree=sx_radix_tree_new(af);
if(!b->tree) goto fixups;
@ -258,9 +258,11 @@ bgpq_pipeline_dequeue_ripe(FILE* f, struct bgpq_expander* b)
/* end of object */
struct bgpq_prequest* p=b->firstpipe;
b->firstpipe=b->firstpipe->next;
b->qsize-=p->size;
free(p);
b->piped--;
if(!b->piped) {
b->lastpipe=NULL;
return 0;
};
};
@ -362,12 +364,14 @@ bgpq_expand_ripe(FILE* f, int (*callback)(char*, void*), void* udata,
return 0;
};
int bgpq_pipeline_dequeue(FILE* f, struct bgpq_expander* b);
int
bgpq_pipeline(FILE* f, int (*callback)(char*, void*), void* udata,
char* fmt, ...)
{
char request[128];
int ret;
int ret, rlen;
struct bgpq_prequest* bp=NULL;
struct bgpq_expander* d=(struct bgpq_expander*)udata;
va_list ap;
@ -375,7 +379,19 @@ bgpq_pipeline(FILE* f, int (*callback)(char*, void*), void* udata,
vsnprintf(request,sizeof(request),fmt,ap);
va_end(ap);
SX_DEBUG(debug_expander,"expander: sending '%s'\n", request);
rlen=strlen(request);
if(rlen+d->qsize >= d->socksize) {
SX_DEBUG(debug_expander, "looks like socket buffer shortage, "
"queued %i of %i, dequeueing\n", d->qsize, d->socksize);
if (d->family==AF_INET6) {
bgpq_pipeline_dequeue_ripe(f, d);
} else {
bgpq_pipeline_dequeue(f, d);
};
};
SX_DEBUG(debug_expander,"expander: sending '%s' (queued %i of %i)\n",
request, d->qsize, d->socksize);
bp=malloc(sizeof(struct bgpq_prequest));
if(!bp) {
@ -387,7 +403,7 @@ bgpq_pipeline(FILE* f, int (*callback)(char*, void*), void* udata,
ret=fwrite(request,1,strlen(request),f);
if(ret!=strlen(request)) {
if(ret!=rlen) {
sx_report(SX_FATAL,"Partial write to radb, only %i bytes written: %s\n",
ret,strerror(errno));
exit(1);
@ -396,6 +412,8 @@ bgpq_pipeline(FILE* f, int (*callback)(char*, void*), void* udata,
strlcpy(bp->request,request,sizeof(bp->request));
bp->callback=callback;
bp->udata=udata;
bp->size=rlen;
d->qsize+=rlen;
if(d->lastpipe) {
d->lastpipe->next=bp;
@ -484,9 +502,10 @@ bgpq_pipeline_dequeue(FILE* f, struct bgpq_expander* b)
pipe=b->firstpipe;
b->firstpipe=b->firstpipe->next;
b->piped--;
b->qsize-=pipe->size;
free(pipe);
};
b->lastpipe=NULL;
return 0;
};
@ -544,7 +563,7 @@ bgpq_expand_radb(FILE* f, int (*callback)(char*, void*), void* udata,
};
SX_DEBUG(debug_expander>2,"expander: final reply of %lu bytes, '%s'\n",
(unsigned long)strlen(recvbuffer),recvbuffer);
for(c=recvbuffer; c<recvbuffer+togot;) {
size_t spn=strcspn(c," \n");
if(spn) c[spn]=0;
@ -609,7 +628,16 @@ bgpq_expand(struct bgpq_expander* b)
fd=-1;
continue;
};
sx_maxsockbuf(fd,SO_SNDBUF);
err=sx_maxsockbuf(fd,SO_SNDBUF);
if(err>0) {
SX_DEBUG(debug_expander, "Acquired sendbuf of %i bytes\n", err);
b->socksize=err;
} else {
shutdown(fd, SHUT_RDWR);
close(fd);
fd=-1;
continue;
};
f=fdopen(fd,"a+");
if(!f) {
shutdown(fd,SHUT_RDWR);
@ -628,7 +656,7 @@ bgpq_expand(struct bgpq_expander* b)
" error: %s\n", b->server, strerror(errno));
exit(1);
};
if((ret=fwrite("!!\n",1,3,f))!=3) {
sx_report(SX_ERROR,"Partial fwrite to radb: %i bytes, %s\n",
ret, strerror(errno));
@ -719,7 +747,7 @@ bgpq_expand(struct bgpq_expander* b)
};
};
};
fwrite("!q\n",1,3,f);
fclose(f);
return 1;

View File

@ -85,5 +85,5 @@ sx_maxsockbuf(int s, int dir)
voptval, iterations);
*/
};
return 0;
return voptval;
};