Commit de2a7bb7 authored by gary rong's avatar gary rong Committed by Péter Szilágyi

eth/downloader: wait for all fetcher goroutines to exit before terminating (#16509)

parent 6b2b328c
...@@ -135,9 +135,10 @@ type Downloader struct { ...@@ -135,9 +135,10 @@ type Downloader struct {
stateCh chan dataPack // [eth/63] Channel receiving inbound node state data stateCh chan dataPack // [eth/63] Channel receiving inbound node state data
// Cancellation and termination // Cancellation and termination
cancelPeer string // Identifier of the peer currently being used as the master (cancel on drop) cancelPeer string // Identifier of the peer currently being used as the master (cancel on drop)
cancelCh chan struct{} // Channel to cancel mid-flight syncs cancelCh chan struct{} // Channel to cancel mid-flight syncs
cancelLock sync.RWMutex // Lock to protect the cancel channel and peer in delivers cancelLock sync.RWMutex // Lock to protect the cancel channel and peer in delivers
cancelWg sync.WaitGroup // Make sure all fetcher goroutines have exited.
quitCh chan struct{} // Quit channel to signal termination quitCh chan struct{} // Quit channel to signal termination
quitLock sync.RWMutex // Lock to prevent double closes quitLock sync.RWMutex // Lock to prevent double closes
...@@ -476,12 +477,11 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I ...@@ -476,12 +477,11 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I
// spawnSync runs d.process and all given fetcher functions to completion in // spawnSync runs d.process and all given fetcher functions to completion in
// separate goroutines, returning the first error that appears. // separate goroutines, returning the first error that appears.
func (d *Downloader) spawnSync(fetchers []func() error) error { func (d *Downloader) spawnSync(fetchers []func() error) error {
var wg sync.WaitGroup
errc := make(chan error, len(fetchers)) errc := make(chan error, len(fetchers))
wg.Add(len(fetchers)) d.cancelWg.Add(len(fetchers))
for _, fn := range fetchers { for _, fn := range fetchers {
fn := fn fn := fn
go func() { defer wg.Done(); errc <- fn() }() go func() { defer d.cancelWg.Done(); errc <- fn() }()
} }
// Wait for the first error, then terminate the others. // Wait for the first error, then terminate the others.
var err error var err error
...@@ -498,12 +498,10 @@ func (d *Downloader) spawnSync(fetchers []func() error) error { ...@@ -498,12 +498,10 @@ func (d *Downloader) spawnSync(fetchers []func() error) error {
} }
d.queue.Close() d.queue.Close()
d.Cancel() d.Cancel()
wg.Wait()
return err return err
} }
// Cancel cancels all of the operations and resets the queue. It returns true // Cancel cancels all of the operations and resets the queue.
// if the cancel operation was completed.
func (d *Downloader) Cancel() { func (d *Downloader) Cancel() {
// Close the current cancel channel // Close the current cancel channel
d.cancelLock.Lock() d.cancelLock.Lock()
...@@ -516,6 +514,7 @@ func (d *Downloader) Cancel() { ...@@ -516,6 +514,7 @@ func (d *Downloader) Cancel() {
} }
} }
d.cancelLock.Unlock() d.cancelLock.Unlock()
d.cancelWg.Wait()
} }
// Terminate interrupts the downloader, canceling all pending operations. // Terminate interrupts the downloader, canceling all pending operations.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment