lazyqueue.go 5.8 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182
// Copyright 2019 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

package prque

import (
	"container/heap"
	"time"

	"github.com/ethereum/go-ethereum/common/mclock"
)

// LazyQueue is a priority queue data structure where priorities can change over
// time and are only evaluated on demand.
// Two callbacks are required:
// - priority evaluates the actual priority of an item
// - maxPriority gives an upper estimate for the priority in any moment between
//   now and the given absolute time
// If the upper estimate is exceeded then Update should be called for that item.
// A global Refresh function should also be called periodically.
type LazyQueue struct {
	clock mclock.Clock
	// Items are stored in one of two internal queues ordered by estimated max
	// priority until the next and the next-after-next refresh. Update and Refresh
	// always places items in queue[1].
	queue       [2]*sstack
	popQueue    *sstack
	period      time.Duration
	maxUntil    mclock.AbsTime
	indexOffset int
	setIndex    SetIndexCallback
	priority    PriorityCallback
	maxPriority MaxPriorityCallback
}

type (
	PriorityCallback    func(data interface{}, now mclock.AbsTime) int64   // actual priority callback
	MaxPriorityCallback func(data interface{}, until mclock.AbsTime) int64 // estimated maximum priority callback
)

// NewLazyQueue creates a new lazy queue
func NewLazyQueue(setIndex SetIndexCallback, priority PriorityCallback, maxPriority MaxPriorityCallback, clock mclock.Clock, refreshPeriod time.Duration) *LazyQueue {
	q := &LazyQueue{
		popQueue:    newSstack(nil),
		setIndex:    setIndex,
		priority:    priority,
		maxPriority: maxPriority,
		clock:       clock,
		period:      refreshPeriod}
	q.Reset()
	q.Refresh()
	return q
}

// Reset clears the contents of the queue
func (q *LazyQueue) Reset() {
	q.queue[0] = newSstack(q.setIndex0)
	q.queue[1] = newSstack(q.setIndex1)
}

// Refresh should be called at least with the frequency specified by the refreshPeriod parameter
func (q *LazyQueue) Refresh() {
	q.maxUntil = q.clock.Now() + mclock.AbsTime(q.period)
	for q.queue[0].Len() != 0 {
		q.Push(heap.Pop(q.queue[0]).(*item).value)
	}
	q.queue[0], q.queue[1] = q.queue[1], q.queue[0]
	q.indexOffset = 1 - q.indexOffset
	q.maxUntil += mclock.AbsTime(q.period)
}

// Push adds an item to the queue
func (q *LazyQueue) Push(data interface{}) {
	heap.Push(q.queue[1], &item{data, q.maxPriority(data, q.maxUntil)})
}

// Update updates the upper priority estimate for the item with the given queue index
func (q *LazyQueue) Update(index int) {
	q.Push(q.Remove(index))
}

// Pop removes and returns the item with the greatest actual priority
func (q *LazyQueue) Pop() (interface{}, int64) {
	var (
		resData interface{}
		resPri  int64
	)
	q.MultiPop(func(data interface{}, priority int64) bool {
		resData = data
		resPri = priority
		return false
	})
	return resData, resPri
}

// peekIndex returns the index of the internal queue where the item with the
// highest estimated priority is or -1 if both are empty
func (q *LazyQueue) peekIndex() int {
	if q.queue[0].Len() != 0 {
		if q.queue[1].Len() != 0 && q.queue[1].blocks[0][0].priority > q.queue[0].blocks[0][0].priority {
			return 1
		}
		return 0
	}
	if q.queue[1].Len() != 0 {
		return 1
	}
	return -1
}

// MultiPop pops multiple items from the queue and is more efficient than calling
// Pop multiple times. Popped items are passed to the callback. MultiPop returns
// when the callback returns false or there are no more items to pop.
func (q *LazyQueue) MultiPop(callback func(data interface{}, priority int64) bool) {
	now := q.clock.Now()
	nextIndex := q.peekIndex()
	for nextIndex != -1 {
		data := heap.Pop(q.queue[nextIndex]).(*item).value
		heap.Push(q.popQueue, &item{data, q.priority(data, now)})
		nextIndex = q.peekIndex()
		for q.popQueue.Len() != 0 && (nextIndex == -1 || q.queue[nextIndex].blocks[0][0].priority < q.popQueue.blocks[0][0].priority) {
			i := heap.Pop(q.popQueue).(*item)
			if !callback(i.value, i.priority) {
				for q.popQueue.Len() != 0 {
					q.Push(heap.Pop(q.popQueue).(*item).value)
				}
				return
			}
		}
	}
}

// PopItem pops the item from the queue only, dropping the associated priority value.
func (q *LazyQueue) PopItem() interface{} {
	i, _ := q.Pop()
	return i
}

// Remove removes removes the item with the given index.
func (q *LazyQueue) Remove(index int) interface{} {
	if index < 0 {
		return nil
	}
	return heap.Remove(q.queue[index&1^q.indexOffset], index>>1).(*item).value
}

// Empty checks whether the priority queue is empty.
func (q *LazyQueue) Empty() bool {
	return q.queue[0].Len() == 0 && q.queue[1].Len() == 0
}

// Size returns the number of items in the priority queue.
func (q *LazyQueue) Size() int {
	return q.queue[0].Len() + q.queue[1].Len()
}

// setIndex0 translates internal queue item index to the virtual index space of LazyQueue
func (q *LazyQueue) setIndex0(data interface{}, index int) {
	if index == -1 {
		q.setIndex(data, -1)
	} else {
		q.setIndex(data, index+index)
	}
}

// setIndex1 translates internal queue item index to the virtual index space of LazyQueue
func (q *LazyQueue) setIndex1(data interface{}, index int) {
	q.setIndex(data, index+index+1)
}