tutus-chain/pkg/rpcclient/wsclient.go

1130 lines
35 KiB
Go
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package rpcclient
import (
"context"
"encoding/json"
"errors"
"fmt"
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/gorilla/websocket"
"git.marketally.com/tutus-one/tutus-chain/pkg/core/block"
"git.marketally.com/tutus-one/tutus-chain/pkg/core/state"
"git.marketally.com/tutus-one/tutus-chain/pkg/core/transaction"
"git.marketally.com/tutus-one/tutus-chain/pkg/tutusrpc"
"git.marketally.com/tutus-one/tutus-chain/pkg/tutusrpc/result"
"git.marketally.com/tutus-one/tutus-chain/pkg/tutusrpc/rpcevent"
)
// WSClient is a websocket-enabled RPC client that can be used with appropriate
// servers. It's supposed to be faster than Client because it has persistent
// connection to the server and at the same time it exposes some functionality
// that is only provided via websockets (like event subscription mechanism).
// WSClient is thread-safe and can be used from multiple goroutines to perform
// RPC requests.
//
// It exposes a set of Receive* methods with the same behaviour pattern that
// is caused by the fact that the client itself receives every message from the
// server via a single channel. This includes any subscriptions and any replies
// to ordinary requests at the same. The client then routes these messages to
// channels provided on subscription (passed to Receive*) or to the respective
// receivers (API callers) if it's an ordinary JSON-RPC reply. While synchronous
// API users are blocked during their calls and wake up on reply, subscription
// channels must be read from to avoid blocking the client. Failure to do so
// will make WSClient wait for the channel reader to get the event and while
// it waits every other messages (subscription-related or request replies)
// will be blocked. This also means that subscription channel must be properly
// drained after unsubscription. If CloseNotificationChannelIfFull option is on
// then the receiver channel will be closed immediately in case if a subsequent
// notification can't be sent to it, which means WSClient's operations are
// unblocking in this mode. No unsubscription is performed in this case, so it's
// still the user responsibility to unsubscribe.
//
// All Receive* methods provide notifications ordering and persistence guarantees.
// See https://git.marketally.com/tutus-one/tutus-chain/blob/master/docs/notifications.md#ordering-and-persistence-guarantees
// for more details on this topic.
//
// Any received subscription items (blocks/transactions/notifications) are passed
// via pointers for efficiency, but the actual structures MUST NOT be changed, as
// it may affect the functionality of other notification receivers. If multiple
// subscriptions share the same receiver channel, then matching notification is
// only sent once per channel. The receiver channel will be closed by the WSClient
// immediately after MissedEvent is received from the server; no unsubscription
// is performed in this case, so it's the user responsibility to unsubscribe. It
// will also be closed on disconnection from server or on situation when it's
// impossible to send a subsequent notification to the subscriber's channel and
// CloseNotificationChannelIfFull option is on.
type WSClient struct {
Client
ws *websocket.Conn
wsOpts WSOptions
readerDone chan struct{}
writerDone chan struct{}
requests chan *tutusrpc.Request
shutdown chan struct{}
closeCalled atomic.Bool
closeErrLock sync.RWMutex
closeErr error
subscriptionsLock sync.RWMutex
subscriptions map[string]notificationReceiver
// receivers is a mapping from receiver channel to a set of corresponding subscription IDs.
// It must be accessed with subscriptionsLock taken. Its keys must be used to deliver
// notifications, if channel is not in the receivers list and corresponding subscription
// still exists, notification must not be sent.
receivers map[any][]string
// subscriptionsOrderLock manages sequential order of "subscribe" and "unsubscribe" WS
// requests processing in order to avoid server-side subscription ID conflicts.
subscriptionsOrderLock sync.Mutex
respLock sync.RWMutex
respChannels map[uint64]chan *tutusrpc.Response
}
// WSOptions defines options for the web-socket RPC client. It contains a
// set of options for the underlying standard RPC client as far as
// WSClient-specific options. See Options documentation for more details.
type WSOptions struct {
Options
// CloseNotificationChannelIfFull allows WSClient to close a subscriber's
// receive channel in case if the channel isn't read properly and no more
// events can be pushed to it. This option, if set, allows to avoid WSClient
// blocking on a subsequent notification dispatch. However, if enabled, the
// corresponding subscription is kept even after receiver's channel closing,
// thus it's still the caller's duty to call Unsubscribe() for this
// subscription.
CloseNotificationChannelIfFull bool
}
// notificationReceiver is an interface aimed to provide WS subscriber functionality
// for different types of subscriptions.
type notificationReceiver interface {
// Comparator provides notification filtering functionality.
rpcevent.Comparator
// Receiver returns notification receiver channel.
Receiver() any
// TrySend checks whether notification passes receiver filter and sends it
// to the underlying channel if so. It is performed under subscriptions lock
// taken. nonBlocking denotes whether the receiving operation shouldn't block
// the client's operation. It returns whether notification matches the filter
// and whether the receiver channel is overflown.
TrySend(ntf Notification, nonBlocking bool) (bool, bool)
// Close closes underlying receiver channel.
Close()
}
// blockReceiver stores information about block events subscriber.
type blockReceiver struct {
filter *tutusrpc.BlockFilter
ch chan<- *block.Block
}
// EventID implements tutusrpc.Comparator interface.
func (r *blockReceiver) EventID() tutusrpc.EventID {
return tutusrpc.BlockEventID
}
// Filter implements tutusrpc.Comparator interface.
func (r *blockReceiver) Filter() tutusrpc.SubscriptionFilter {
if r.filter == nil {
return nil
}
return *r.filter
}
// Receiver implements notificationReceiver interface.
func (r *blockReceiver) Receiver() any {
return r.ch
}
// TrySend implements notificationReceiver interface.
func (r *blockReceiver) TrySend(ntf Notification, nonBlocking bool) (bool, bool) {
if rpcevent.Matches(r, ntf) {
if nonBlocking {
select {
case r.ch <- ntf.Value.(*block.Block):
default:
return true, true
}
} else {
r.ch <- ntf.Value.(*block.Block)
}
return true, false
}
return false, false
}
// Close implements notificationReceiver interface.
func (r *blockReceiver) Close() {
close(r.ch)
}
// headerOfAddedBlockReceiver stores information about header of added block events subscriber.
type headerOfAddedBlockReceiver struct {
filter *tutusrpc.BlockFilter
ch chan<- *block.Header
}
// EventID implements tutusrpc.Comparator interface.
func (r *headerOfAddedBlockReceiver) EventID() tutusrpc.EventID {
return tutusrpc.HeaderOfAddedBlockEventID
}
// Filter implements tutusrpc.Comparator interface.
func (r *headerOfAddedBlockReceiver) Filter() tutusrpc.SubscriptionFilter {
if r.filter == nil {
return nil
}
return *r.filter
}
// Receiver implements notificationReceiver interface.
func (r *headerOfAddedBlockReceiver) Receiver() any {
return r.ch
}
// TrySend implements notificationReceiver interface.
func (r *headerOfAddedBlockReceiver) TrySend(ntf Notification, nonBlocking bool) (bool, bool) {
if rpcevent.Matches(r, ntf) {
if nonBlocking {
select {
case r.ch <- ntf.Value.(*block.Header):
default:
return true, true
}
} else {
r.ch <- ntf.Value.(*block.Header)
}
return true, false
}
return false, false
}
// Close implements notificationReceiver interface.
func (r *headerOfAddedBlockReceiver) Close() {
close(r.ch)
}
// txReceiver stores information about transaction events subscriber.
type txReceiver struct {
filter *tutusrpc.TxFilter
ch chan<- *transaction.Transaction
}
// EventID implements tutusrpc.Comparator interface.
func (r *txReceiver) EventID() tutusrpc.EventID {
return tutusrpc.TransactionEventID
}
// Filter implements tutusrpc.Comparator interface.
func (r *txReceiver) Filter() tutusrpc.SubscriptionFilter {
if r.filter == nil {
return nil
}
return *r.filter
}
// Receiver implements notificationReceiver interface.
func (r *txReceiver) Receiver() any {
return r.ch
}
// TrySend implements notificationReceiver interface.
func (r *txReceiver) TrySend(ntf Notification, nonBlocking bool) (bool, bool) {
if rpcevent.Matches(r, ntf) {
if nonBlocking {
select {
case r.ch <- ntf.Value.(*transaction.Transaction):
default:
return true, true
}
} else {
r.ch <- ntf.Value.(*transaction.Transaction)
}
return true, false
}
return false, false
}
// Close implements notificationReceiver interface.
func (r *txReceiver) Close() {
close(r.ch)
}
// executionNotificationReceiver stores information about execution notifications subscriber.
type executionNotificationReceiver struct {
filter *tutusrpc.NotificationFilter
ch chan<- *state.ContainedNotificationEvent
}
// EventID implements tutusrpc.Comparator interface.
func (r *executionNotificationReceiver) EventID() tutusrpc.EventID {
return tutusrpc.NotificationEventID
}
// Filter implements tutusrpc.Comparator interface.
func (r *executionNotificationReceiver) Filter() tutusrpc.SubscriptionFilter {
if r.filter == nil {
return nil
}
return *r.filter
}
// Receiver implements notificationReceiver interface.
func (r *executionNotificationReceiver) Receiver() any {
return r.ch
}
// TrySend implements notificationReceiver interface.
func (r *executionNotificationReceiver) TrySend(ntf Notification, nonBlocking bool) (bool, bool) {
if rpcevent.Matches(r, ntf) {
if nonBlocking {
select {
case r.ch <- ntf.Value.(*state.ContainedNotificationEvent):
default:
return true, true
}
} else {
r.ch <- ntf.Value.(*state.ContainedNotificationEvent)
}
return true, false
}
return false, false
}
// Close implements notificationReceiver interface.
func (r *executionNotificationReceiver) Close() {
close(r.ch)
}
// executionReceiver stores information about application execution results subscriber.
type executionReceiver struct {
filter *tutusrpc.ExecutionFilter
ch chan<- *state.AppExecResult
}
// EventID implements tutusrpc.Comparator interface.
func (r *executionReceiver) EventID() tutusrpc.EventID {
return tutusrpc.ExecutionEventID
}
// Filter implements tutusrpc.Comparator interface.
func (r *executionReceiver) Filter() tutusrpc.SubscriptionFilter {
if r.filter == nil {
return nil
}
return *r.filter
}
// Receiver implements notificationReceiver interface.
func (r *executionReceiver) Receiver() any {
return r.ch
}
// TrySend implements notificationReceiver interface.
func (r *executionReceiver) TrySend(ntf Notification, nonBlocking bool) (bool, bool) {
if rpcevent.Matches(r, ntf) {
if nonBlocking {
select {
case r.ch <- ntf.Value.(*state.AppExecResult):
default:
return true, true
}
} else {
r.ch <- ntf.Value.(*state.AppExecResult)
}
return true, false
}
return false, false
}
// Close implements notificationReceiver interface.
func (r *executionReceiver) Close() {
close(r.ch)
}
// notaryRequestReceiver stores information about notary requests subscriber.
type notaryRequestReceiver struct {
filter *tutusrpc.NotaryRequestFilter
ch chan<- *result.NotaryRequestEvent
}
// EventID implements tutusrpc.Comparator interface.
func (r *notaryRequestReceiver) EventID() tutusrpc.EventID {
return tutusrpc.NotaryRequestEventID
}
// Filter implements tutusrpc.Comparator interface.
func (r *notaryRequestReceiver) Filter() tutusrpc.SubscriptionFilter {
if r.filter == nil {
return nil
}
return *r.filter
}
// Receiver implements notificationReceiver interface.
func (r *notaryRequestReceiver) Receiver() any {
return r.ch
}
// TrySend implements notificationReceiver interface.
func (r *notaryRequestReceiver) TrySend(ntf Notification, nonBlocking bool) (bool, bool) {
if rpcevent.Matches(r, ntf) {
if nonBlocking {
select {
case r.ch <- ntf.Value.(*result.NotaryRequestEvent):
default:
return true, true
}
} else {
r.ch <- ntf.Value.(*result.NotaryRequestEvent)
}
return true, false
}
return false, false
}
// Close implements notificationReceiver interface.
func (r *notaryRequestReceiver) Close() {
close(r.ch)
}
// mempoolEventReceiver stores information about mempool events subscriber.
type mempoolEventReceiver struct {
filter *tutusrpc.MempoolEventFilter
ch chan<- *result.MempoolEvent
}
// EventID implements tutusrpc.Comparator interface.
func (r *mempoolEventReceiver) EventID() tutusrpc.EventID {
return tutusrpc.MempoolEventID
}
// Filter implements tutusrpc.Comparator interface.
func (r *mempoolEventReceiver) Filter() tutusrpc.SubscriptionFilter {
if r.filter == nil {
return nil
}
return *r.filter
}
// Receiver implements notificationReceiver interface.
func (r *mempoolEventReceiver) Receiver() any {
return r.ch
}
// TrySend implements notificationReceiver interface.
func (r *mempoolEventReceiver) TrySend(ntf Notification, nonBlocking bool) (bool, bool) {
if rpcevent.Matches(r, ntf) {
if nonBlocking {
select {
case r.ch <- ntf.Value.(*result.MempoolEvent):
default:
return true, true
}
} else {
r.ch <- ntf.Value.(*result.MempoolEvent)
}
return true, false
}
return false, false
}
// Close implements notificationReceiver interface.
func (r *mempoolEventReceiver) Close() {
close(r.ch)
}
// Notification represents a server-generated notification for client subscriptions.
// Value can be one of *block.Block, *state.AppExecResult, *state.ContainedNotificationEvent
// *transaction.Transaction or *subscriptions.NotaryRequestEvent based on Type.
type Notification struct {
Type tutusrpc.EventID
Value any
}
// EventID implements Container interface and returns notification ID.
func (n Notification) EventID() tutusrpc.EventID {
return n.Type
}
// EventPayload implements Container interface and returns notification
// object.
func (n Notification) EventPayload() any {
return n.Value
}
// requestResponse is a combined type for request and response since we can get
// any of them here.
type requestResponse struct {
tutusrpc.Response
Method string `json:"method"`
RawParams []json.RawMessage `json:"params,omitzero"`
}
const (
// Message limit for receiving side.
wsReadLimit = 10 * 1024 * 1024
// Disconnection timeout.
wsPongLimit = 60 * time.Second
// Ping period for connection liveness check.
wsPingPeriod = wsPongLimit / 2
// Write deadline.
wsWriteLimit = wsPingPeriod / 2
)
// ErrNilNotificationReceiver is returned when notification receiver channel is nil.
var ErrNilNotificationReceiver = errors.New("nil notification receiver")
// ErrWSConnLost is a WSClient-specific error that will be returned for any
// requests after disconnection (including intentional ones via
// (*WSClient).Close). Use errors.Is(err, ErrConnClosedByUser) check to
// distinguish the latter case from a side reason of lost connection.
var ErrWSConnLost = errors.New("connection lost")
// ErrConnClosedByUser is a WSClient error returned iff RPC request is sent after
// the user calls (*WSClient).Close method by himself. Note that this error won't
// be returned from (*WSClient).GetError method.
var ErrConnClosedByUser = errors.New("closed by user")
// NewWS returns a new WSClient ready to use (with established websocket
// connection). You need to use websocket URL for it like `ws://1.2.3.4/ws`.
// You should call Init method to initialize the network magic the client is
// operating on.
func NewWS(ctx context.Context, endpoint string, opts WSOptions) (*WSClient, error) {
dialer := websocket.Dialer{HandshakeTimeout: opts.DialTimeout}
ws, resp, err := dialer.DialContext(ctx, endpoint, nil)
if resp != nil && resp.Body != nil { // Can be non-nil even with error returned.
defer resp.Body.Close() // Not exactly required by websocket, but let's do this for bodyclose checker.
}
if err != nil {
if resp != nil && resp.Body != nil {
var srvErr tutusrpc.HeaderAndError
dec := json.NewDecoder(resp.Body)
decErr := dec.Decode(&srvErr)
if decErr == nil && srvErr.Error != nil {
err = srvErr.Error
}
}
return nil, err
}
wsc := &WSClient{
Client: Client{},
ws: ws,
wsOpts: opts,
shutdown: make(chan struct{}),
readerDone: make(chan struct{}),
writerDone: make(chan struct{}),
respChannels: make(map[uint64]chan *tutusrpc.Response),
requests: make(chan *tutusrpc.Request),
subscriptions: make(map[string]notificationReceiver),
receivers: make(map[any][]string),
}
err = initClient(ctx, &wsc.Client, endpoint, opts.Options)
if err != nil {
return nil, err
}
wsc.cli = nil
go wsc.wsReader()
go wsc.wsWriter()
wsc.requestF = wsc.makeWsRequest
return wsc, nil
}
// Close closes connection to the remote side rendering this client instance
// unusable.
func (c *WSClient) Close() {
if c.closeCalled.CompareAndSwap(false, true) {
c.setCloseErr(ErrConnClosedByUser)
// Closing shutdown channel sends a signal to wsWriter to break out of the
// loop. In doing so it does ws.Close() closing the network connection
// which in turn makes wsReader receive an err from ws.ReadJSON() and also
// break out of the loop closing c.done channel in its shutdown sequence.
close(c.shutdown)
// Call to cancel will send signal to all users of Context().
c.ctxCancel()
}
<-c.readerDone
}
func (c *WSClient) wsReader() {
c.ws.SetReadLimit(wsReadLimit)
c.ws.SetPongHandler(func(string) error {
err := c.ws.SetReadDeadline(time.Now().Add(wsPongLimit))
if err != nil {
c.setCloseErr(fmt.Errorf("failed to set pong read deadline: %w", err))
}
return err
})
var connCloseErr error
readloop:
for {
rr := new(requestResponse)
err := c.ws.SetReadDeadline(time.Now().Add(wsPongLimit))
if err != nil {
connCloseErr = fmt.Errorf("failed to set response read deadline: %w", err)
break readloop
}
err = c.ws.ReadJSON(rr)
if err != nil {
// Timeout/connection loss/malformed response.
connCloseErr = fmt.Errorf("failed to read JSON response (timeout/connection loss/malformed response): %w", err)
break readloop
}
if rr.ID == nil && rr.Method != "" {
event, err := tutusrpc.GetEventIDFromString(rr.Method)
if err != nil {
// Bad event received.
connCloseErr = fmt.Errorf("failed to perse event ID from string %s: %w", rr.Method, err)
break readloop
}
if event != tutusrpc.MissedEventID && len(rr.RawParams) != 1 {
// Bad event received.
connCloseErr = fmt.Errorf("bad event received: %s / %d", event, len(rr.RawParams))
break readloop
}
ntf := Notification{Type: event}
switch event {
case tutusrpc.BlockEventID:
sr, err := c.stateRootInHeader()
if err != nil {
// Client is not initialized.
connCloseErr = fmt.Errorf("failed to fetch StateRootInHeader: %w", err)
break readloop
}
ntf.Value = block.New(sr)
case tutusrpc.TransactionEventID:
ntf.Value = &transaction.Transaction{}
case tutusrpc.NotificationEventID:
ntf.Value = new(state.ContainedNotificationEvent)
case tutusrpc.ExecutionEventID:
ntf.Value = new(state.AppExecResult)
case tutusrpc.NotaryRequestEventID:
ntf.Value = new(result.NotaryRequestEvent)
case tutusrpc.HeaderOfAddedBlockEventID:
sr, err := c.stateRootInHeader()
if err != nil {
// Client is not initialized.
connCloseErr = fmt.Errorf("failed to fetch StateRootInHeader: %w", err)
break readloop
}
ntf.Value = &block.New(sr).Header
case tutusrpc.MempoolEventID:
ntf.Value = new(result.MempoolEvent)
case tutusrpc.MissedEventID:
// No value.
default:
// Bad event received.
connCloseErr = fmt.Errorf("unknown event received: %d", event)
break readloop
}
if event != tutusrpc.MissedEventID {
err = json.Unmarshal(rr.RawParams[0], ntf.Value)
if err != nil {
// Bad event received.
connCloseErr = fmt.Errorf("failed to unmarshal event of type %s from JSON: %w", event, err)
break readloop
}
}
c.notifySubscribers(ntf)
} else if rr.ID != nil && (rr.Error != nil || rr.Result != nil) {
id, err := strconv.ParseUint(string(rr.ID), 10, 64)
if err != nil {
connCloseErr = fmt.Errorf("failed to retrieve response ID from string %s: %w", string(rr.ID), err)
break readloop // Malformed response (invalid response ID).
}
ch := c.getResponseChannel(id)
if ch == nil {
connCloseErr = fmt.Errorf("unknown response channel for response %d", id)
break readloop // Unknown response (unexpected response ID).
}
select {
case <-c.writerDone:
break readloop
case <-c.shutdown:
break readloop
case ch <- &rr.Response:
}
} else {
// Malformed response, neither valid request, nor valid response.
connCloseErr = fmt.Errorf("malformed response")
break readloop
}
}
if connCloseErr != nil {
c.setCloseErr(connCloseErr)
}
close(c.readerDone)
c.respLock.Lock()
for _, ch := range c.respChannels {
close(ch)
}
c.respChannels = nil
c.respLock.Unlock()
c.subscriptionsLock.Lock()
for rcvrCh, ids := range c.receivers {
c.dropSubCh(rcvrCh, ids[0], true)
}
c.subscriptionsLock.Unlock()
c.ctxCancel()
}
// dropSubCh closes corresponding subscriber's channel and removes it from the
// receivers map. The channel is still being kept in
// the subscribers map as technically the server-side subscription still exists
// and the user is responsible for unsubscription. This method must be called
// under subscriptionsLock taken. It's the caller's duty to ensure dropSubCh
// will be called once per channel, otherwise panic will occur.
func (c *WSClient) dropSubCh(rcvrCh any, id string, ignoreCloseNotificationChannelIfFull bool) {
if ignoreCloseNotificationChannelIfFull || c.wsOpts.CloseNotificationChannelIfFull {
c.subscriptions[id].Close()
delete(c.receivers, rcvrCh)
}
}
func (c *WSClient) wsWriter() {
pingTicker := time.NewTicker(wsPingPeriod)
defer c.ws.Close()
defer close(c.writerDone)
var connCloseErr error
writeloop:
for {
select {
case <-c.shutdown:
return
case <-c.readerDone:
return
case req, ok := <-c.requests:
if !ok {
return
}
if err := c.ws.SetWriteDeadline(time.Now().Add(c.opts.RequestTimeout)); err != nil {
connCloseErr = fmt.Errorf("failed to set request write deadline: %w", err)
break writeloop
}
if err := c.ws.WriteJSON(req); err != nil {
connCloseErr = fmt.Errorf("failed to write JSON request (%s / %d): %w", req.Method, len(req.Params), err)
break writeloop
}
case <-pingTicker.C:
if err := c.ws.SetWriteDeadline(time.Now().Add(wsWriteLimit)); err != nil {
connCloseErr = fmt.Errorf("failed to set ping write deadline: %w", err)
break writeloop
}
if err := c.ws.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
connCloseErr = fmt.Errorf("failed to write ping message: %w", err)
break writeloop
}
}
}
if connCloseErr != nil {
c.setCloseErr(connCloseErr)
}
}
func (c *WSClient) notifySubscribers(ntf Notification) {
if ntf.Type == tutusrpc.MissedEventID {
c.subscriptionsLock.Lock()
for rcvr, ids := range c.receivers {
c.subscriptions[ids[0]].Close()
delete(c.receivers, rcvr)
}
c.subscriptionsLock.Unlock()
return
}
c.subscriptionsLock.Lock()
for rcvrCh, ids := range c.receivers {
for _, id := range ids {
ok, dropCh := c.subscriptions[id].TrySend(ntf, c.wsOpts.CloseNotificationChannelIfFull)
if dropCh {
c.dropSubCh(rcvrCh, id, false)
break // strictly single drop per channel
}
if ok {
break // strictly one notification per channel
}
}
}
c.subscriptionsLock.Unlock()
}
func (c *WSClient) unregisterRespChannel(id uint64) {
c.respLock.Lock()
defer c.respLock.Unlock()
if ch, ok := c.respChannels[id]; ok {
delete(c.respChannels, id)
close(ch)
}
}
func (c *WSClient) getResponseChannel(id uint64) chan *tutusrpc.Response {
c.respLock.RLock()
defer c.respLock.RUnlock()
return c.respChannels[id]
}
// closeErrOrConnLost returns ErrWSConnLost with details (set by wsReader or
// wsWriter) if available.
func (c *WSClient) closeErrOrConnLost() error {
var (
err = ErrWSConnLost
closeErr = c.getErrorOrClosedByUser()
)
if closeErr != nil {
err = fmt.Errorf("%w: %w", err, closeErr)
}
return err
}
func (c *WSClient) makeWsRequest(r *tutusrpc.Request) (*tutusrpc.Response, error) {
ch := make(chan *tutusrpc.Response)
c.respLock.Lock()
select {
case <-c.readerDone:
c.respLock.Unlock()
return nil, fmt.Errorf("before registering response channel: %w", c.closeErrOrConnLost())
default:
c.respChannels[r.ID] = ch
c.respLock.Unlock()
}
select {
case <-c.readerDone:
return nil, fmt.Errorf("before sending the request: %w", c.closeErrOrConnLost())
case <-c.writerDone:
return nil, fmt.Errorf("before sending the request: %w", c.closeErrOrConnLost())
case c.requests <- r:
}
select {
case <-c.readerDone:
return nil, fmt.Errorf("waiting for the response: %w", c.closeErrOrConnLost())
case <-c.writerDone:
return nil, fmt.Errorf("waiting for the response: %w", c.closeErrOrConnLost())
case resp, ok := <-ch:
if !ok {
return nil, fmt.Errorf("waiting for the response: %w", c.closeErrOrConnLost())
}
c.unregisterRespChannel(r.ID)
return resp, nil
}
}
func (c *WSClient) performSubscription(params []any, rcvr notificationReceiver) (string, error) {
var resp string
if flt := rcvr.Filter(); flt != nil {
if err := flt.IsValid(); err != nil {
return "", err
}
}
// Protect from concurrent subscribe/ubsubscribe requests, ref. #3093.
c.subscriptionsOrderLock.Lock()
defer c.subscriptionsOrderLock.Unlock()
if err := c.performRequest("subscribe", params, &resp); err != nil {
return "", err
}
c.subscriptionsLock.Lock()
defer c.subscriptionsLock.Unlock()
c.subscriptions[resp] = rcvr
ch := rcvr.Receiver()
c.receivers[ch] = append(c.receivers[ch], resp)
return resp, nil
}
// ReceiveBlocks registers provided channel as a receiver for the new block events.
// Events can be filtered by the given BlockFilter, nil value doesn't add any filter.
// See WSClient comments for generic Receive* behaviour details.
func (c *WSClient) ReceiveBlocks(flt *tutusrpc.BlockFilter, rcvr chan<- *block.Block) (string, error) {
if rcvr == nil {
return "", ErrNilNotificationReceiver
}
if !c.cache.initDone {
return "", errNetworkNotInitialized
}
params := []any{"block_added"}
if flt != nil {
flt = flt.Copy()
params = append(params, *flt)
}
r := &blockReceiver{
filter: flt,
ch: rcvr,
}
return c.performSubscription(params, r)
}
// ReceiveHeadersOfAddedBlocks registers provided channel as a receiver for new
// block's header events. Events can be filtered by the given [tutusrpc.BlockFilter],
// nil value doesn't add any filter. See WSClient comments for generic
// Receive* behaviour details.
func (c *WSClient) ReceiveHeadersOfAddedBlocks(flt *tutusrpc.BlockFilter, rcvr chan<- *block.Header) (string, error) {
if rcvr == nil {
return "", ErrNilNotificationReceiver
}
if !c.cache.initDone {
return "", errNetworkNotInitialized
}
params := []any{"header_of_added_block"}
if flt != nil {
flt = flt.Copy()
params = append(params, *flt)
}
r := &headerOfAddedBlockReceiver{
filter: flt,
ch: rcvr,
}
return c.performSubscription(params, r)
}
// ReceiveTransactions registers provided channel as a receiver for new transaction
// events. Events can be filtered by the given TxFilter, nil value doesn't add any
// filter. See WSClient comments for generic Receive* behaviour details.
func (c *WSClient) ReceiveTransactions(flt *tutusrpc.TxFilter, rcvr chan<- *transaction.Transaction) (string, error) {
if rcvr == nil {
return "", ErrNilNotificationReceiver
}
params := []any{"transaction_added"}
if flt != nil {
flt = flt.Copy()
params = append(params, *flt)
}
r := &txReceiver{
filter: flt,
ch: rcvr,
}
return c.performSubscription(params, r)
}
// ReceiveExecutionNotifications registers provided channel as a receiver for execution
// events. Events can be filtered by the given NotificationFilter, nil value doesn't add
// any filter. See WSClient comments for generic Receive* behaviour details.
func (c *WSClient) ReceiveExecutionNotifications(flt *tutusrpc.NotificationFilter, rcvr chan<- *state.ContainedNotificationEvent) (string, error) {
if rcvr == nil {
return "", ErrNilNotificationReceiver
}
params := []any{"notification_from_execution"}
if flt != nil {
flt = flt.Copy()
params = append(params, *flt)
}
r := &executionNotificationReceiver{
filter: flt,
ch: rcvr,
}
return c.performSubscription(params, r)
}
// ReceiveExecutions registers provided channel as a receiver for
// application execution result events generated during transaction execution.
// Events can be filtered by the given ExecutionFilter, nil value doesn't add any filter.
// See WSClient comments for generic Receive* behaviour details.
func (c *WSClient) ReceiveExecutions(flt *tutusrpc.ExecutionFilter, rcvr chan<- *state.AppExecResult) (string, error) {
if rcvr == nil {
return "", ErrNilNotificationReceiver
}
params := []any{"transaction_executed"}
if flt != nil {
flt = flt.Copy()
params = append(params, *flt)
}
r := &executionReceiver{
filter: flt,
ch: rcvr,
}
return c.performSubscription(params, r)
}
// ReceiveNotaryRequests registers provided channel as a receiver for notary request
// payload addition or removal events. Events can be filtered by the given NotaryRequestFilter
// where sender corresponds to notary request sender (the second fallback transaction
// signer), signer corresponds to main transaction signers and type corresponds to the
// [mempoolevent.Type] and denotes whether notary request was added to or removed from
// the notary request pool. nil value doesn't add any filter. See WSClient comments
// for generic Receive* behaviour details.
func (c *WSClient) ReceiveNotaryRequests(flt *tutusrpc.NotaryRequestFilter, rcvr chan<- *result.NotaryRequestEvent) (string, error) {
if rcvr == nil {
return "", ErrNilNotificationReceiver
}
params := []any{"notary_request_event"}
if flt != nil {
flt = flt.Copy()
params = append(params, *flt)
}
r := &notaryRequestReceiver{
filter: flt,
ch: rcvr,
}
return c.performSubscription(params, r)
}
// ReceiveMempoolEvents registers the provided channel as a receiver for mempool
// transaction addition or removal events. Events can be filtered by the given
// MempoolEventFilter where sender corresponds to the transactions sender, signer
// corresponds to one of the transactions signers, and type corresponds to the
// [mempoolevent.Type] and denotes whether the transaction was added to or removed
// from the mempool. See WSClient comments for generic Receive* behaviour details.
func (c *WSClient) ReceiveMempoolEvents(flt *tutusrpc.MempoolEventFilter, rcvr chan<- *result.MempoolEvent) (string, error) {
if rcvr == nil {
return "", ErrNilNotificationReceiver
}
params := []any{"mempool_event"}
if flt != nil {
flt = flt.Copy()
params = append(params, *flt)
}
r := &mempoolEventReceiver{
filter: flt,
ch: rcvr,
}
return c.performSubscription(params, r)
}
// Unsubscribe removes subscription for the given event stream. It will return an
// error in case if there's no subscription with the provided ID. Call to Unsubscribe
// doesn't block notifications receive process for given subscriber, thus, ensure
// that subscriber channel is properly drained while unsubscription is being
// performed. Failing to do so will cause WSClient to block even regular requests.
// You may probably need to run unsubscription process in a separate
// routine (in parallel with notification receiver routine) to avoid Client's
// notification dispatcher blocking.
func (c *WSClient) Unsubscribe(id string) error {
return c.performUnsubscription(id)
}
// UnsubscribeAll removes all active subscriptions of the current client. It copies
// the list of subscribers in order not to hold the lock for the whole execution
// time and tries to unsubscribe from us many feeds as possible returning the
// chunk of unsubscription errors afterwards. Call to UnsubscribeAll doesn't block
// notifications receive process for given subscribers, thus, ensure that subscribers
// channels are properly drained while unsubscription is being performed. Failing to
// do so will cause WSClient to block even regular requests. You may probably need
// to run unsubscription process in a separate routine (in parallel with notification
// receiver routines) to avoid Client's notification dispatcher blocking.
func (c *WSClient) UnsubscribeAll() error {
c.subscriptionsLock.Lock()
subs := make([]string, 0, len(c.subscriptions))
for id := range c.subscriptions {
subs = append(subs, id)
}
c.subscriptionsLock.Unlock()
var resErr error
for _, id := range subs {
err := c.performUnsubscription(id)
if err != nil {
errFmt := "failed to unsubscribe from feed %d: %w"
errArgs := []any{err}
if resErr != nil {
errFmt = "%w; " + errFmt
errArgs = append([]any{resErr}, errArgs...)
}
resErr = fmt.Errorf(errFmt, errArgs...)
}
}
return resErr
}
// performUnsubscription is internal method that removes subscription with the given
// ID from the list of subscriptions and receivers. It takes the subscriptions lock
// after WS RPC unsubscription request is completed. Until then the subscriber channel
// may still receive WS notifications.
func (c *WSClient) performUnsubscription(id string) error {
// Protect from concurrent subscribe/ubsubscribe requests, ref. #3093.
c.subscriptionsOrderLock.Lock()
defer c.subscriptionsOrderLock.Unlock()
c.subscriptionsLock.RLock()
rcvr, ok := c.subscriptions[id]
c.subscriptionsLock.RUnlock()
if !ok {
return errors.New("no subscription with this ID")
}
var resp bool
if err := c.performRequest("unsubscribe", []any{id}, &resp); err != nil {
return err
}
if !resp {
return errors.New("unsubscribe method returned false result")
}
c.subscriptionsLock.Lock()
defer c.subscriptionsLock.Unlock()
// Rely on fact that rcvr is still in the c.subscriptions map since only
// performUnsubscription (protected by subscriptionsOrderLock) is authorized
// to remove rcvr from this map, although the rcvr channel itself may be
// closed by this moment by notifySubscribers due to channel overflow or
// missed event receival.
ch := rcvr.Receiver()
ids := c.receivers[ch]
for i, rcvrID := range ids {
if rcvrID == id {
ids = append(ids[:i], ids[i+1:]...)
break
}
}
if len(ids) == 0 {
delete(c.receivers, ch)
} else {
c.receivers[ch] = ids
}
delete(c.subscriptions, id)
return nil
}
// setCloseErr is a thread-safe method setting closeErr in case if it's not yet set.
func (c *WSClient) setCloseErr(err error) {
c.closeErrLock.Lock()
defer c.closeErrLock.Unlock()
if c.closeErr == nil {
c.closeErr = err
}
}
// GetError returns the reason of WS connection closing. It returns nil in case if connection
// was closed by the user via Close() method.
func (c *WSClient) GetError() error {
err := c.getErrorOrClosedByUser()
if err != nil && errors.Is(err, ErrConnClosedByUser) {
return nil
}
return err
}
// getErrorOrClosedByUser returns the reason of WS connection closing. An error is returned
// even if connection was closed by the user via Close() method.
func (c *WSClient) getErrorOrClosedByUser() error {
c.closeErrLock.RLock()
defer c.closeErrLock.RUnlock()
return c.closeErr
}
// Context returns WSClient Cancel context that will be terminated on Client shutdown.
func (c *WSClient) Context() context.Context {
return c.ctx
}