224 lines
5.3 KiB
Go
Executable File
224 lines
5.3 KiB
Go
Executable File
package bqueue
|
|
|
|
import (
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
// Queuer is an interface for a queue.
|
|
type Queuer[Q Queueable] interface {
|
|
AddItem(Q) error
|
|
AddItems(...Q) error
|
|
Height() uint32
|
|
}
|
|
|
|
//go:generate stringer -type=OperationMode
|
|
|
|
// OperationMode is the mode of operation for the queue.
|
|
// Could be either Blocking or NonBlocking.
|
|
type OperationMode byte
|
|
|
|
const (
|
|
// NonBlocking means that Put will return immediately if the queue is full.
|
|
NonBlocking OperationMode = 0
|
|
// Blocking means that Put will wait until there is enough space in the queue.
|
|
Blocking OperationMode = 1
|
|
)
|
|
|
|
// Queueable is an interface for a queue element.
|
|
type Queueable interface {
|
|
Indexable
|
|
comparable
|
|
}
|
|
|
|
// Indexable is an interface for an element that has an index.
|
|
type Indexable interface {
|
|
GetIndex() uint32
|
|
}
|
|
|
|
// Queue is the queue of queueable elements.
|
|
type Queue[Q Queueable] struct {
|
|
log *zap.Logger
|
|
queueLock sync.RWMutex
|
|
queue []Q
|
|
lastQ uint32
|
|
checkBlocks chan struct{}
|
|
chain Queuer[Q]
|
|
relayF func(Q)
|
|
discarded atomic.Bool
|
|
len int
|
|
lenUpdateF func(int)
|
|
cacheSize int
|
|
mode OperationMode
|
|
nilQ Q
|
|
}
|
|
|
|
// DefaultCacheSize is the default amount of Queueable elements above the current height
|
|
// which are stored in the queue.
|
|
const DefaultCacheSize = 2000
|
|
|
|
func (bq *Queue[Q]) indexToPosition(i uint32) int {
|
|
return int(i) % bq.cacheSize
|
|
}
|
|
|
|
// New creates an instance of Queue that handles Queueable elements.
|
|
func New[Q Queueable](bc Queuer[Q], log *zap.Logger, relayer func(Q), cacheSize int, lenMetricsUpdater func(l int), mode OperationMode) *Queue[Q] {
|
|
if log == nil {
|
|
return nil
|
|
}
|
|
if cacheSize <= 0 {
|
|
cacheSize = DefaultCacheSize
|
|
}
|
|
var nilQ Q
|
|
return &Queue[Q]{
|
|
log: log,
|
|
queue: make([]Q, cacheSize),
|
|
checkBlocks: make(chan struct{}, 1),
|
|
chain: bc,
|
|
relayF: relayer,
|
|
lenUpdateF: lenMetricsUpdater,
|
|
cacheSize: cacheSize,
|
|
mode: mode,
|
|
nilQ: nilQ,
|
|
}
|
|
}
|
|
|
|
// Run runs the Queue queueing loop. It must be called in a separate routine.
|
|
func (bq *Queue[Q]) Run() {
|
|
var lastHeight = bq.chain.Height()
|
|
for {
|
|
_, ok := <-bq.checkBlocks
|
|
if !ok {
|
|
break
|
|
}
|
|
for {
|
|
h := bq.chain.Height()
|
|
pos := bq.indexToPosition(h + 1)
|
|
bq.queueLock.Lock()
|
|
b := bq.queue[pos]
|
|
// The chain moved forward using elements from other sources (consensus).
|
|
for i := lastHeight; i < h; i++ {
|
|
old := bq.indexToPosition(i + 1)
|
|
if bq.queue[old] != bq.nilQ && bq.queue[old].GetIndex() == i {
|
|
bq.len--
|
|
bq.queue[old] = bq.nilQ
|
|
}
|
|
}
|
|
bq.queueLock.Unlock()
|
|
lastHeight = h
|
|
if b == bq.nilQ {
|
|
break
|
|
}
|
|
|
|
err := bq.chain.AddItem(b)
|
|
if err != nil {
|
|
// The element might already be added by the consensus.
|
|
if bq.chain.Height() < b.GetIndex() {
|
|
bq.log.Warn("queue: failed to add item into the blockchain",
|
|
zap.Uint32("index", b.GetIndex()),
|
|
zap.Uint32("chainHeight", bq.chain.Height()),
|
|
zap.Stringer("mode", bq.mode),
|
|
zap.Error(err))
|
|
}
|
|
} else if bq.relayF != nil {
|
|
bq.relayF(b)
|
|
}
|
|
bq.queueLock.Lock()
|
|
bq.len--
|
|
l := bq.len
|
|
if bq.queue[pos] == b {
|
|
bq.queue[pos] = bq.nilQ
|
|
}
|
|
bq.queueLock.Unlock()
|
|
if bq.lenUpdateF != nil {
|
|
bq.lenUpdateF(l)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Put enqueues Queueable element to be added to the chain.
|
|
func (bq *Queue[Q]) Put(element Q) error {
|
|
h := bq.chain.Height()
|
|
bq.queueLock.Lock()
|
|
defer bq.queueLock.Unlock()
|
|
if bq.discarded.Load() {
|
|
return nil
|
|
}
|
|
// Can easily happen when fetching the same blocks from
|
|
// different peers, thus not considered as error.
|
|
if element.GetIndex() <= h {
|
|
return nil
|
|
}
|
|
if h+uint32(bq.cacheSize) < element.GetIndex() {
|
|
switch bq.mode {
|
|
case NonBlocking:
|
|
return nil
|
|
case Blocking:
|
|
bq.queueLock.Unlock()
|
|
t := time.NewTicker(time.Second)
|
|
for range t.C {
|
|
if bq.discarded.Load() {
|
|
bq.queueLock.Lock()
|
|
return nil
|
|
}
|
|
h = bq.chain.Height()
|
|
if h+uint32(bq.cacheSize) >= element.GetIndex() {
|
|
bq.queueLock.Lock()
|
|
break
|
|
}
|
|
}
|
|
}
|
|
}
|
|
pos := bq.indexToPosition(element.GetIndex())
|
|
// If we already have it, keep the old element, throw away the new one.
|
|
if bq.queue[pos] == bq.nilQ || bq.queue[pos].GetIndex() < element.GetIndex() {
|
|
bq.len++
|
|
bq.queue[pos] = element
|
|
for pos < bq.cacheSize && bq.queue[pos] != bq.nilQ && bq.lastQ+1 == bq.queue[pos].GetIndex() {
|
|
bq.lastQ = bq.queue[pos].GetIndex()
|
|
pos++
|
|
}
|
|
}
|
|
// update metrics
|
|
if bq.lenUpdateF != nil {
|
|
bq.lenUpdateF(bq.len)
|
|
}
|
|
select {
|
|
case bq.checkBlocks <- struct{}{}:
|
|
// ok, signalled to goroutine processing queue
|
|
default:
|
|
// it's already busy processing elements
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// LastQueued returns the index of the last queued element and the queue's capacity
|
|
// left.
|
|
func (bq *Queue[Q]) LastQueued() (uint32, int) {
|
|
bq.queueLock.RLock()
|
|
defer bq.queueLock.RUnlock()
|
|
return bq.lastQ, bq.cacheSize - bq.len
|
|
}
|
|
|
|
// Discard stops the queue and prevents it from accepting more elements to enqueue.
|
|
func (bq *Queue[Q]) Discard() {
|
|
if bq.discarded.CompareAndSwap(false, true) {
|
|
bq.queueLock.Lock()
|
|
close(bq.checkBlocks)
|
|
// Technically we could bq.queue = nil, but this would cost
|
|
// another if in Run().
|
|
clear(bq.queue)
|
|
bq.len = 0
|
|
bq.queueLock.Unlock()
|
|
}
|
|
}
|
|
|
|
// Cap returns capacity of the block queue.
|
|
func (bq *Queue[Q]) Cap() int {
|
|
return bq.cacheSize
|
|
}
|