Commit f7a71996 authored by Jeffrey Wilcke's avatar Jeffrey Wilcke

core, event/filter, xeth: refactored filter system

Moved the filtering system from `event` to `eth/filters` package and
removed the `core.Filter` object. The `filters.Filter` object now
requires a `common.Database` rather than a `eth.Backend` and invokes the
`core.GetBlockByX` directly rather than thru a "manager".
parent 0d78f962
...@@ -14,16 +14,15 @@ ...@@ -14,16 +14,15 @@
// You should have received a copy of the GNU Lesser General Public License // You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package core package filters
import ( import (
"math" "math"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
) )
type AccountChange struct { type AccountChange struct {
...@@ -32,7 +31,7 @@ type AccountChange struct { ...@@ -32,7 +31,7 @@ type AccountChange struct {
// Filtering interface // Filtering interface
type Filter struct { type Filter struct {
eth Backend db common.Database
earliest int64 earliest int64
latest int64 latest int64
skip int skip int
...@@ -47,8 +46,8 @@ type Filter struct { ...@@ -47,8 +46,8 @@ type Filter struct {
// Create a new filter which uses a bloom filter on blocks to figure out whether a particular block // Create a new filter which uses a bloom filter on blocks to figure out whether a particular block
// is interesting or not. // is interesting or not.
func NewFilter(eth Backend) *Filter { func New(db common.Database) *Filter {
return &Filter{eth: eth} return &Filter{db: db}
} }
// Set the earliest and latest block for filtering. // Set the earliest and latest block for filtering.
...@@ -80,7 +79,7 @@ func (self *Filter) SetSkip(skip int) { ...@@ -80,7 +79,7 @@ func (self *Filter) SetSkip(skip int) {
// Run filters logs with the current parameters set // Run filters logs with the current parameters set
func (self *Filter) Find() state.Logs { func (self *Filter) Find() state.Logs {
earliestBlock := self.eth.ChainManager().CurrentBlock() earliestBlock := core.GetCurrentBlock(self.db)
var earliestBlockNo uint64 = uint64(self.earliest) var earliestBlockNo uint64 = uint64(self.earliest)
if self.earliest == -1 { if self.earliest == -1 {
earliestBlockNo = earliestBlock.NumberU64() earliestBlockNo = earliestBlock.NumberU64()
...@@ -92,7 +91,7 @@ func (self *Filter) Find() state.Logs { ...@@ -92,7 +91,7 @@ func (self *Filter) Find() state.Logs {
var ( var (
logs state.Logs logs state.Logs
block = self.eth.ChainManager().GetBlockByNumber(latestBlockNo) block = core.GetBlockByNumber(self.db, latestBlockNo)
) )
done: done:
...@@ -111,17 +110,17 @@ done: ...@@ -111,17 +110,17 @@ done:
// current parameters // current parameters
if self.bloomFilter(block) { if self.bloomFilter(block) {
// Get the logs of the block // Get the logs of the block
unfiltered, err := self.eth.BlockProcessor().GetLogs(block) var (
if err != nil { receipts = core.GetBlockReceipts(self.db, block.Hash())
glog.V(logger.Warn).Infoln("err: filter get logs ", err) unfiltered state.Logs
)
break for _, receipt := range receipts {
unfiltered = append(unfiltered, receipt.Logs()...)
} }
logs = append(logs, self.FilterLogs(unfiltered)...) logs = append(logs, self.FilterLogs(unfiltered)...)
} }
block = self.eth.ChainManager().GetBlock(block.ParentHash()) block = core.GetBlockByHash(self.db, block.ParentHash())
} }
skip := int(math.Min(float64(len(logs)), float64(self.skip))) skip := int(math.Min(float64(len(logs)), float64(self.skip)))
......
...@@ -14,9 +14,9 @@ ...@@ -14,9 +14,9 @@
// You should have received a copy of the GNU Lesser General Public License // You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package filter // package filters implements an ethereum filtering system for block,
// transactions and log events.
// TODO make use of the generic filtering system package filters
import ( import (
"sync" "sync"
...@@ -26,60 +26,66 @@ import ( ...@@ -26,60 +26,66 @@ import (
"github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/event"
) )
type FilterManager struct { // FilterSystem manages filters that filter specific events such as
// block, transaction and log events. The Filtering system can be used to listen
// for specific LOG events fires by the EVM (Ethereum Virtual Machine).
type FilterSystem struct {
eventMux *event.TypeMux eventMux *event.TypeMux
filterMu sync.RWMutex filterMu sync.RWMutex
filterId int filterId int
filters map[int]*core.Filter filters map[int]*Filter
quit chan struct{} quit chan struct{}
} }
func NewFilterManager(mux *event.TypeMux) *FilterManager { // NewFilterSystem returns a newly allocated filter manager
return &FilterManager{ func NewFilterSystem(mux *event.TypeMux) *FilterSystem {
fs := &FilterSystem{
eventMux: mux, eventMux: mux,
filters: make(map[int]*core.Filter), filters: make(map[int]*Filter),
} }
go fs.filterLoop()
return fs
} }
func (self *FilterManager) Start() { // Stop quits the filter loop required for polling events
go self.filterLoop() func (fs *FilterSystem) Stop() {
} close(fs.quit)
func (self *FilterManager) Stop() {
close(self.quit)
} }
func (self *FilterManager) InstallFilter(filter *core.Filter) (id int) { // Add adds a filter to the filter manager
self.filterMu.Lock() func (fs *FilterSystem) Add(filter *Filter) (id int) {
defer self.filterMu.Unlock() fs.filterMu.Lock()
id = self.filterId defer fs.filterMu.Unlock()
self.filters[id] = filter id = fs.filterId
self.filterId++ fs.filters[id] = filter
fs.filterId++
return id return id
} }
func (self *FilterManager) UninstallFilter(id int) { // Remove removes a filter by filter id
self.filterMu.Lock() func (fs *FilterSystem) Remove(id int) {
defer self.filterMu.Unlock() fs.filterMu.Lock()
if _, ok := self.filters[id]; ok { defer fs.filterMu.Unlock()
delete(self.filters, id) if _, ok := fs.filters[id]; ok {
delete(fs.filters, id)
} }
} }
// GetFilter retrieves a filter installed using InstallFilter. // Get retrieves a filter installed using Add The filter may not be modified.
// The filter may not be modified. func (fs *FilterSystem) Get(id int) *Filter {
func (self *FilterManager) GetFilter(id int) *core.Filter { fs.filterMu.RLock()
self.filterMu.RLock() defer fs.filterMu.RUnlock()
defer self.filterMu.RUnlock() return fs.filters[id]
return self.filters[id]
} }
func (self *FilterManager) filterLoop() { // filterLoop waits for specific events from ethereum and fires their handlers
// when the filter matches the requirements.
func (fs *FilterSystem) filterLoop() {
// Subscribe to events // Subscribe to events
events := self.eventMux.Subscribe( events := fs.eventMux.Subscribe(
//core.PendingBlockEvent{}, //core.PendingBlockEvent{},
core.ChainEvent{}, core.ChainEvent{},
core.TxPreEvent{}, core.TxPreEvent{},
...@@ -88,31 +94,31 @@ func (self *FilterManager) filterLoop() { ...@@ -88,31 +94,31 @@ func (self *FilterManager) filterLoop() {
out: out:
for { for {
select { select {
case <-self.quit: case <-fs.quit:
break out break out
case event := <-events.Chan(): case event := <-events.Chan():
switch event := event.(type) { switch event := event.(type) {
case core.ChainEvent: case core.ChainEvent:
self.filterMu.RLock() fs.filterMu.RLock()
for _, filter := range self.filters { for _, filter := range fs.filters {
if filter.BlockCallback != nil { if filter.BlockCallback != nil {
filter.BlockCallback(event.Block, event.Logs) filter.BlockCallback(event.Block, event.Logs)
} }
} }
self.filterMu.RUnlock() fs.filterMu.RUnlock()
case core.TxPreEvent: case core.TxPreEvent:
self.filterMu.RLock() fs.filterMu.RLock()
for _, filter := range self.filters { for _, filter := range fs.filters {
if filter.TransactionCallback != nil { if filter.TransactionCallback != nil {
filter.TransactionCallback(event.Tx) filter.TransactionCallback(event.Tx)
} }
} }
self.filterMu.RUnlock() fs.filterMu.RUnlock()
case state.Logs: case state.Logs:
self.filterMu.RLock() fs.filterMu.RLock()
for _, filter := range self.filters { for _, filter := range fs.filters {
if filter.LogsCallback != nil { if filter.LogsCallback != nil {
msgs := filter.FilterLogs(event) msgs := filter.FilterLogs(event)
if len(msgs) > 0 { if len(msgs) > 0 {
...@@ -120,7 +126,7 @@ out: ...@@ -120,7 +126,7 @@ out:
} }
} }
} }
self.filterMu.RUnlock() fs.filterMu.RUnlock()
} }
} }
} }
......
...@@ -35,7 +35,7 @@ import ( ...@@ -35,7 +35,7 @@ import (
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/eth" "github.com/ethereum/go-ethereum/eth"
"github.com/ethereum/go-ethereum/event/filter" "github.com/ethereum/go-ethereum/eth/filters"
"github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/logger/glog"
"github.com/ethereum/go-ethereum/miner" "github.com/ethereum/go-ethereum/miner"
...@@ -75,7 +75,7 @@ type XEth struct { ...@@ -75,7 +75,7 @@ type XEth struct {
whisper *Whisper whisper *Whisper
quit chan struct{} quit chan struct{}
filterManager *filter.FilterManager filterManager *filters.FilterSystem
logMu sync.RWMutex logMu sync.RWMutex
logQueue map[int]*logQueue logQueue map[int]*logQueue
...@@ -111,7 +111,7 @@ func New(ethereum *eth.Ethereum, frontend Frontend) *XEth { ...@@ -111,7 +111,7 @@ func New(ethereum *eth.Ethereum, frontend Frontend) *XEth {
backend: ethereum, backend: ethereum,
frontend: frontend, frontend: frontend,
quit: make(chan struct{}), quit: make(chan struct{}),
filterManager: filter.NewFilterManager(ethereum.EventMux()), filterManager: filters.NewFilterSystem(ethereum.EventMux()),
logQueue: make(map[int]*logQueue), logQueue: make(map[int]*logQueue),
blockQueue: make(map[int]*hashQueue), blockQueue: make(map[int]*hashQueue),
transactionQueue: make(map[int]*hashQueue), transactionQueue: make(map[int]*hashQueue),
...@@ -128,7 +128,6 @@ func New(ethereum *eth.Ethereum, frontend Frontend) *XEth { ...@@ -128,7 +128,6 @@ func New(ethereum *eth.Ethereum, frontend Frontend) *XEth {
xeth.state = NewState(xeth, xeth.backend.ChainManager().State()) xeth.state = NewState(xeth, xeth.backend.ChainManager().State())
go xeth.start() go xeth.start()
go xeth.filterManager.Start()
return xeth return xeth
} }
...@@ -142,7 +141,7 @@ done: ...@@ -142,7 +141,7 @@ done:
self.logMu.Lock() self.logMu.Lock()
for id, filter := range self.logQueue { for id, filter := range self.logQueue {
if time.Since(filter.timeout) > filterTickerTime { if time.Since(filter.timeout) > filterTickerTime {
self.filterManager.UninstallFilter(id) self.filterManager.Remove(id)
delete(self.logQueue, id) delete(self.logQueue, id)
} }
} }
...@@ -151,7 +150,7 @@ done: ...@@ -151,7 +150,7 @@ done:
self.blockMu.Lock() self.blockMu.Lock()
for id, filter := range self.blockQueue { for id, filter := range self.blockQueue {
if time.Since(filter.timeout) > filterTickerTime { if time.Since(filter.timeout) > filterTickerTime {
self.filterManager.UninstallFilter(id) self.filterManager.Remove(id)
delete(self.blockQueue, id) delete(self.blockQueue, id)
} }
} }
...@@ -160,7 +159,7 @@ done: ...@@ -160,7 +159,7 @@ done:
self.transactionMu.Lock() self.transactionMu.Lock()
for id, filter := range self.transactionQueue { for id, filter := range self.transactionQueue {
if time.Since(filter.timeout) > filterTickerTime { if time.Since(filter.timeout) > filterTickerTime {
self.filterManager.UninstallFilter(id) self.filterManager.Remove(id)
delete(self.transactionQueue, id) delete(self.transactionQueue, id)
} }
} }
...@@ -504,7 +503,7 @@ func (self *XEth) IsContract(address string) bool { ...@@ -504,7 +503,7 @@ func (self *XEth) IsContract(address string) bool {
} }
func (self *XEth) UninstallFilter(id int) bool { func (self *XEth) UninstallFilter(id int) bool {
defer self.filterManager.UninstallFilter(id) defer self.filterManager.Remove(id)
if _, ok := self.logQueue[id]; ok { if _, ok := self.logQueue[id]; ok {
self.logMu.Lock() self.logMu.Lock()
...@@ -532,8 +531,8 @@ func (self *XEth) NewLogFilter(earliest, latest int64, skip, max int, address [] ...@@ -532,8 +531,8 @@ func (self *XEth) NewLogFilter(earliest, latest int64, skip, max int, address []
self.logMu.Lock() self.logMu.Lock()
defer self.logMu.Unlock() defer self.logMu.Unlock()
filter := core.NewFilter(self.backend) filter := filters.New(self.backend.ChainDb())
id := self.filterManager.InstallFilter(filter) id := self.filterManager.Add(filter)
self.logQueue[id] = &logQueue{timeout: time.Now()} self.logQueue[id] = &logQueue{timeout: time.Now()}
filter.SetEarliestBlock(earliest) filter.SetEarliestBlock(earliest)
...@@ -558,8 +557,8 @@ func (self *XEth) NewTransactionFilter() int { ...@@ -558,8 +557,8 @@ func (self *XEth) NewTransactionFilter() int {
self.transactionMu.Lock() self.transactionMu.Lock()
defer self.transactionMu.Unlock() defer self.transactionMu.Unlock()
filter := core.NewFilter(self.backend) filter := filters.New(self.backend.ChainDb())
id := self.filterManager.InstallFilter(filter) id := self.filterManager.Add(filter)
self.transactionQueue[id] = &hashQueue{timeout: time.Now()} self.transactionQueue[id] = &hashQueue{timeout: time.Now()}
filter.TransactionCallback = func(tx *types.Transaction) { filter.TransactionCallback = func(tx *types.Transaction) {
...@@ -577,8 +576,8 @@ func (self *XEth) NewBlockFilter() int { ...@@ -577,8 +576,8 @@ func (self *XEth) NewBlockFilter() int {
self.blockMu.Lock() self.blockMu.Lock()
defer self.blockMu.Unlock() defer self.blockMu.Unlock()
filter := core.NewFilter(self.backend) filter := filters.New(self.backend.ChainDb())
id := self.filterManager.InstallFilter(filter) id := self.filterManager.Add(filter)
self.blockQueue[id] = &hashQueue{timeout: time.Now()} self.blockQueue[id] = &hashQueue{timeout: time.Now()}
filter.BlockCallback = func(block *types.Block, logs state.Logs) { filter.BlockCallback = func(block *types.Block, logs state.Logs) {
...@@ -635,7 +634,7 @@ func (self *XEth) TransactionFilterChanged(id int) []common.Hash { ...@@ -635,7 +634,7 @@ func (self *XEth) TransactionFilterChanged(id int) []common.Hash {
} }
func (self *XEth) Logs(id int) state.Logs { func (self *XEth) Logs(id int) state.Logs {
filter := self.filterManager.GetFilter(id) filter := self.filterManager.Get(id)
if filter != nil { if filter != nil {
return filter.Find() return filter.Find()
} }
...@@ -644,7 +643,7 @@ func (self *XEth) Logs(id int) state.Logs { ...@@ -644,7 +643,7 @@ func (self *XEth) Logs(id int) state.Logs {
} }
func (self *XEth) AllLogs(earliest, latest int64, skip, max int, address []string, topics [][]string) state.Logs { func (self *XEth) AllLogs(earliest, latest int64, skip, max int, address []string, topics [][]string) state.Logs {
filter := core.NewFilter(self.backend) filter := filters.New(self.backend.ChainDb())
filter.SetEarliestBlock(earliest) filter.SetEarliestBlock(earliest)
filter.SetLatestBlock(latest) filter.SetLatestBlock(latest)
filter.SetSkip(skip) filter.SetSkip(skip)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment