package bqueue import ( "sync" "sync/atomic" "time" "go.uber.org/zap" ) // Queuer is an interface for a queue. type Queuer[Q Queueable] interface { AddItem(Q) error AddItems(...Q) error Height() uint32 } //go:generate stringer -type=OperationMode // OperationMode is the mode of operation for the queue. // Could be either Blocking or NonBlocking. type OperationMode byte const ( // NonBlocking means that Put will return immediately if the queue is full. NonBlocking OperationMode = 0 // Blocking means that Put will wait until there is enough space in the queue. Blocking OperationMode = 1 ) // Queueable is an interface for a queue element. type Queueable interface { Indexable comparable } // Indexable is an interface for an element that has an index. type Indexable interface { GetIndex() uint32 } // Queue is the queue of queueable elements. type Queue[Q Queueable] struct { log *zap.Logger queueLock sync.RWMutex queue []Q lastQ uint32 checkBlocks chan struct{} chain Queuer[Q] relayF func(Q) discarded atomic.Bool len int lenUpdateF func(int) cacheSize int mode OperationMode nilQ Q } // DefaultCacheSize is the default amount of Queueable elements above the current height // which are stored in the queue. const DefaultCacheSize = 2000 func (bq *Queue[Q]) indexToPosition(i uint32) int { return int(i) % bq.cacheSize } // New creates an instance of Queue that handles Queueable elements. func New[Q Queueable](bc Queuer[Q], log *zap.Logger, relayer func(Q), cacheSize int, lenMetricsUpdater func(l int), mode OperationMode) *Queue[Q] { if log == nil { return nil } if cacheSize <= 0 { cacheSize = DefaultCacheSize } var nilQ Q return &Queue[Q]{ log: log, queue: make([]Q, cacheSize), checkBlocks: make(chan struct{}, 1), chain: bc, relayF: relayer, lenUpdateF: lenMetricsUpdater, cacheSize: cacheSize, mode: mode, nilQ: nilQ, } } // Run runs the Queue queueing loop. It must be called in a separate routine. func (bq *Queue[Q]) Run() { var lastHeight = bq.chain.Height() for { _, ok := <-bq.checkBlocks if !ok { break } for { h := bq.chain.Height() pos := bq.indexToPosition(h + 1) bq.queueLock.Lock() b := bq.queue[pos] // The chain moved forward using elements from other sources (consensus). for i := lastHeight; i < h; i++ { old := bq.indexToPosition(i + 1) if bq.queue[old] != bq.nilQ && bq.queue[old].GetIndex() == i { bq.len-- bq.queue[old] = bq.nilQ } } bq.queueLock.Unlock() lastHeight = h if b == bq.nilQ { break } err := bq.chain.AddItem(b) if err != nil { // The element might already be added by the consensus. if bq.chain.Height() < b.GetIndex() { bq.log.Warn("queue: failed to add item into the blockchain", zap.Uint32("index", b.GetIndex()), zap.Uint32("chainHeight", bq.chain.Height()), zap.Stringer("mode", bq.mode), zap.Error(err)) } } else if bq.relayF != nil { bq.relayF(b) } bq.queueLock.Lock() bq.len-- l := bq.len if bq.queue[pos] == b { bq.queue[pos] = bq.nilQ } bq.queueLock.Unlock() if bq.lenUpdateF != nil { bq.lenUpdateF(l) } } } } // Put enqueues Queueable element to be added to the chain. func (bq *Queue[Q]) Put(element Q) error { h := bq.chain.Height() bq.queueLock.Lock() defer bq.queueLock.Unlock() if bq.discarded.Load() { return nil } // Can easily happen when fetching the same blocks from // different peers, thus not considered as error. if element.GetIndex() <= h { return nil } if h+uint32(bq.cacheSize) < element.GetIndex() { switch bq.mode { case NonBlocking: return nil case Blocking: bq.queueLock.Unlock() t := time.NewTicker(time.Second) for range t.C { if bq.discarded.Load() { bq.queueLock.Lock() return nil } h = bq.chain.Height() if h+uint32(bq.cacheSize) >= element.GetIndex() { bq.queueLock.Lock() break } } } } pos := bq.indexToPosition(element.GetIndex()) // If we already have it, keep the old element, throw away the new one. if bq.queue[pos] == bq.nilQ || bq.queue[pos].GetIndex() < element.GetIndex() { bq.len++ bq.queue[pos] = element for pos < bq.cacheSize && bq.queue[pos] != bq.nilQ && bq.lastQ+1 == bq.queue[pos].GetIndex() { bq.lastQ = bq.queue[pos].GetIndex() pos++ } } // update metrics if bq.lenUpdateF != nil { bq.lenUpdateF(bq.len) } select { case bq.checkBlocks <- struct{}{}: // ok, signalled to goroutine processing queue default: // it's already busy processing elements } return nil } // LastQueued returns the index of the last queued element and the queue's capacity // left. func (bq *Queue[Q]) LastQueued() (uint32, int) { bq.queueLock.RLock() defer bq.queueLock.RUnlock() return bq.lastQ, bq.cacheSize - bq.len } // Discard stops the queue and prevents it from accepting more elements to enqueue. func (bq *Queue[Q]) Discard() { if bq.discarded.CompareAndSwap(false, true) { bq.queueLock.Lock() close(bq.checkBlocks) // Technically we could bq.queue = nil, but this would cost // another if in Run(). clear(bq.queue) bq.len = 0 bq.queueLock.Unlock() } } // Cap returns capacity of the block queue. func (bq *Queue[Q]) Cap() int { return bq.cacheSize }