Commit b9e0b11e authored by Felix Lange's avatar Felix Lange

p2p: interrupt MsgPipe payload read/write

This is better because protocols might not actually read the payload for
some errors (msg too big, etc.) which can be a pain to test with the old
behaviour.
parent a7bced77
...@@ -185,7 +185,10 @@ func (p *MsgPipeRW) WriteMsg(msg Msg) error { ...@@ -185,7 +185,10 @@ func (p *MsgPipeRW) WriteMsg(msg Msg) error {
case p.w <- msg: case p.w <- msg:
if msg.Size > 0 { if msg.Size > 0 {
// wait for payload read or discard // wait for payload read or discard
<-consumed select {
case <-consumed:
case <-p.closing:
}
} }
return nil return nil
case <-p.closing: case <-p.closing:
...@@ -207,8 +210,8 @@ func (p *MsgPipeRW) ReadMsg() (Msg, error) { ...@@ -207,8 +210,8 @@ func (p *MsgPipeRW) ReadMsg() (Msg, error) {
} }
// Close unblocks any pending ReadMsg and WriteMsg calls on both ends // Close unblocks any pending ReadMsg and WriteMsg calls on both ends
// of the pipe. They will return ErrPipeClosed. Note that Close does // of the pipe. They will return ErrPipeClosed. Close also
// not interrupt any reads from a message payload. // interrupts any reads from a message payload.
func (p *MsgPipeRW) Close() error { func (p *MsgPipeRW) Close() error {
if atomic.AddInt32(p.closed, 1) != 1 { if atomic.AddInt32(p.closed, 1) != 1 {
// someone else is already closing // someone else is already closing
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment