mirror of
https://github.com/netsampler/goflow2.git
synced 2024-05-06 15:54:52 +00:00
utils: copy payload before accepting another UDP packet
In d1e1ace3186d ("Allow Flow Routines to be cancellable (#40)"), the payload was passed to another goroutine and erased by the next packet to be received if the goroutine did not process it fast enough. Make a copy before passing it to the goroutine to fix that.
This commit is contained in:
@ -155,6 +155,7 @@ func UDPStoppableRoutine(stopCh <-chan struct{}, name string, decodeFunc decoder
|
|||||||
type udpData struct {
|
type udpData struct {
|
||||||
size int
|
size int
|
||||||
pktAddr *net.UDPAddr
|
pktAddr *net.UDPAddr
|
||||||
|
payload []byte
|
||||||
}
|
}
|
||||||
|
|
||||||
stopped := atomic.Value{}
|
stopped := atomic.Value{}
|
||||||
@ -165,6 +166,8 @@ func UDPStoppableRoutine(stopCh <-chan struct{}, name string, decodeFunc decoder
|
|||||||
u := udpData{}
|
u := udpData{}
|
||||||
u.size, u.pktAddr, _ = udpconn.ReadFromUDP(payload)
|
u.size, u.pktAddr, _ = udpconn.ReadFromUDP(payload)
|
||||||
if stopped.Load() == false {
|
if stopped.Load() == false {
|
||||||
|
u.payload = make([]byte, u.size)
|
||||||
|
copy(u.payload, payload[0:u.size])
|
||||||
udpDataCh <- u
|
udpDataCh <- u
|
||||||
} else {
|
} else {
|
||||||
return
|
return
|
||||||
@ -174,7 +177,7 @@ func UDPStoppableRoutine(stopCh <-chan struct{}, name string, decodeFunc decoder
|
|||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case u := <-udpDataCh:
|
case u := <-udpDataCh:
|
||||||
process(u.size, payload, u.pktAddr, processor, localIP, addrUDP, name)
|
process(u.size, u.payload, u.pktAddr, processor, localIP, addrUDP, name)
|
||||||
case <-stopCh:
|
case <-stopCh:
|
||||||
stopped.Store(true)
|
stopped.Store(true)
|
||||||
udpconn.Close()
|
udpconn.Close()
|
||||||
@ -185,13 +188,10 @@ func UDPStoppableRoutine(stopCh <-chan struct{}, name string, decodeFunc decoder
|
|||||||
}
|
}
|
||||||
|
|
||||||
func process(size int, payload []byte, pktAddr *net.UDPAddr, processor decoder.Processor, localIP string, addrUDP net.UDPAddr, name string) {
|
func process(size int, payload []byte, pktAddr *net.UDPAddr, processor decoder.Processor, localIP string, addrUDP net.UDPAddr, name string) {
|
||||||
payloadCut := make([]byte, size)
|
|
||||||
copy(payloadCut, payload[0:size])
|
|
||||||
|
|
||||||
baseMessage := BaseMessage{
|
baseMessage := BaseMessage{
|
||||||
Src: pktAddr.IP,
|
Src: pktAddr.IP,
|
||||||
Port: pktAddr.Port,
|
Port: pktAddr.Port,
|
||||||
Payload: payloadCut,
|
Payload: payload,
|
||||||
}
|
}
|
||||||
processor.ProcessMessage(baseMessage)
|
processor.ProcessMessage(baseMessage)
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user