package statefetcher import ( "bytes" "context" "errors" "fmt" "io" "net/url" "strconv" "sync" "sync/atomic" "github.com/tutus-one/tutus-chain/pkg/config" "github.com/tutus-one/tutus-chain/pkg/core/storage" gio "github.com/tutus-one/tutus-chain/pkg/io" "github.com/tutus-one/tutus-chain/pkg/services/helpers/neofs" "github.com/tutus-one/tutus-chain/pkg/util" "github.com/nspcc-dev/neofs-sdk-go/client" "github.com/nspcc-dev/neofs-sdk-go/container" "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "go.uber.org/zap" ) // Ledger is the interface for statefetcher. type Ledger interface { GetConfig() config.Blockchain HeaderHeight() uint32 AddContractStorageItems(kv []storage.KeyValue, syncHeight uint32, expectedRoot util.Uint256) error GetLastStoredKey() []byte } // Service fetches contract storage state from NeoFS. type Service struct { neofs.BasicService containerMagic int isActive atomic.Bool isShutdown atomic.Bool cfg config.NeoFSStateFetcher stateSyncInterval uint32 lock sync.RWMutex lastStateObjectIndex uint32 lastStateOID oid.ID chain Ledger log *zap.Logger quit chan bool quitOnce sync.Once runToExiter chan struct{} exiterToShutdown chan struct{} shutdownCallback func() } // New creates a new Service instance. func New(chain Ledger, cfg config.NeoFSStateFetcher, stateSyncInterval int, logger *zap.Logger, shutdownCallback func()) (*Service, error) { if !cfg.Enabled { return &Service{}, nil } if cfg.Timeout <= 0 { cfg.Timeout = neofs.DefaultTimeout } if cfg.StateAttribute == "" { cfg.StateAttribute = neofs.DefaultStateAttribute } if cfg.KeyValueBatchSize <= 0 { cfg.KeyValueBatchSize = neofs.DefaultKVBatchSize } basic, err := neofs.NewBasicService(cfg.NeoFSService) if err != nil { return nil, fmt.Errorf("failed to create service: %w", err) } s := &Service{ BasicService: basic, log: logger, cfg: cfg, chain: chain, shutdownCallback: shutdownCallback, stateSyncInterval: uint32(stateSyncInterval), quit: make(chan bool), runToExiter: make(chan struct{}), exiterToShutdown: make(chan struct{}), } if s.stateSyncInterval == 0 { s.stateSyncInterval = config.DefaultStateSyncInterval } var containerObj container.Container s.Ctx, s.CtxCancel = context.WithCancel(context.Background()) if err = s.Pool.Dial(context.Background()); err != nil { s.isActive.CompareAndSwap(true, false) return nil, fmt.Errorf("failed to dial NeoFS pool: %w", err) } err = s.Retry(func() error { containerObj, err = s.Pool.ContainerGet(s.Ctx, s.ContainerID, client.PrmContainerGet{}) return err }) if err != nil { s.isActive.CompareAndSwap(true, false) return nil, fmt.Errorf("failed to get container: %w", err) } containerMagic := containerObj.Attribute("Magic") if containerMagic != strconv.Itoa(int(s.chain.GetConfig().Magic)) { s.isActive.CompareAndSwap(true, false) return nil, fmt.Errorf("container magic mismatch: expected %d, got %s", s.chain.GetConfig().Magic, containerMagic) } s.containerMagic, err = strconv.Atoi(containerMagic) return s, nil } // LatestStateObjectHeight returns the height of the most recent state object found in the container. func (s *Service) LatestStateObjectHeight(h ...uint32) (uint32, error) { s.lock.RLock() if s.lastStateObjectIndex != 0 { idx := s.lastStateObjectIndex s.lock.RUnlock() return idx, nil } s.lock.RUnlock() var height uint32 if len(h) > 0 { height = h[0] } filters := object.NewSearchFilters() filters.AddFilter(s.cfg.StateAttribute, fmt.Sprintf("%d", height), object.MatchNumGE) ctx, cancel := context.WithTimeout(s.Ctx, s.cfg.Timeout) defer cancel() results, errs := neofs.ObjectSearch(ctx, s.Pool, s.Account.PrivateKey(), s.ContainerID, filters, []string{s.cfg.StateAttribute}) var ( lastItem *client.SearchResultItem lastFoundIdx uint64 ) loop: for { select { case item, ok := <-results: if !ok { break loop } lastItem = &item case err := <-errs: if err != nil && !neofs.IsContextCanceledErr(err) { s.isActive.CompareAndSwap(true, false) return 0, fmt.Errorf("failed to search state object at height %d: %w", height, err) } break loop } } lastFoundIdx, err := strconv.ParseUint(lastItem.Attributes[0], 10, 32) if err != nil || lastFoundIdx == 0 { s.isActive.CompareAndSwap(true, false) return 0, fmt.Errorf("failed to parse state object index: %w", err) } s.lock.Lock() s.lastStateObjectIndex = uint32(lastFoundIdx) s.lastStateOID = lastItem.ID s.lock.Unlock() return s.lastStateObjectIndex, nil } // Start begins state fetching. func (s *Service) Start() error { if s.IsShutdown() { return errors.New("service is already shut down") } if !s.isActive.CompareAndSwap(false, true) { return nil } s.log.Info("starting NeoFS StateFetcher service") go s.exiter() go s.run() return nil } func (s *Service) stopService(force bool) { s.quitOnce.Do(func() { s.quit <- force close(s.quit) }) } // Shutdown requests graceful shutdown of the service. func (s *Service) Shutdown() { if !s.IsActive() || s.IsShutdown() { return } s.stopService(true) <-s.exiterToShutdown } // exiter is a routine that is listening to a quitting signal and manages graceful // Service shutdown process. func (s *Service) exiter() { if !s.isActive.Load() { return } // Closing signal may come from anyone, but only once. force := <-s.quit s.log.Info("shutting down NeoFS StateFetcher service", zap.Bool("force", force)) s.isActive.Store(false) s.isShutdown.Store(true) // Cansel all pending OIDs/blocks downloads in case if shutdown requested by user // or caused by downloading error. if force { s.CtxCancel() } // Wait for the run() to finish. <-s.runToExiter // Everything is done, release resources, turn off the activity marker and let // the server know about it. _ = s.Pool.Close() _ = s.log.Sync() if s.shutdownCallback != nil { s.shutdownCallback() } // Notify Shutdown routine in case if it's user-triggered shutdown. close(s.exiterToShutdown) } func (s *Service) run() { defer close(s.runToExiter) var ( syncHeight uint32 expectedRoot util.Uint256 ) s.lock.RLock() isZero := s.lastStateOID.IsZero() s.lock.RUnlock() if isZero { _, err := s.LatestStateObjectHeight(s.chain.HeaderHeight() - 1) if err != nil { s.log.Error("failed to get state object", zap.Error(err)) s.stopService(true) return } } s.lock.RLock() oidStr := s.lastStateOID.String() s.lock.RUnlock() reader, err := s.objectGet(s.Ctx, oidStr) if err != nil { s.log.Error("failed to get state object", zap.Error(err), zap.String("oid", s.lastStateOID.String())) s.stopService(true) return } defer func() { if err = reader.Close(); err != nil { s.log.Warn("failed to close reader", zap.Error(err)) } }() batches := make(chan []storage.KeyValue, 2) go func() { defer close(batches) br := gio.NewBinReaderFromIO(reader) version := br.ReadB() if version != 0 || br.Err != nil { s.log.Error("invalid state object version", zap.Uint8("version", version), zap.Error(br.Err)) return } magic := br.ReadU32LE() if magic != uint32(s.containerMagic) || br.Err != nil { s.log.Error("invalid state object magic", zap.Uint32("magic", magic)) return } syncHeight = br.ReadU32LE() br.ReadBytes(expectedRoot[:]) if br.Err != nil { s.log.Error("failed to read state root", zap.Error(br.Err)) return } s.log.Info("contract storage state object found", zap.String("root", expectedRoot.StringLE()), zap.Uint32("height", syncHeight)) var ( lastKey = s.chain.GetLastStoredKey() skip = len(lastKey) > 0 batch = make([]storage.KeyValue, 0, s.cfg.KeyValueBatchSize) ) for { select { case <-s.Ctx.Done(): return default: } key := br.ReadVarBytes() if errors.Is(br.Err, io.EOF) { // Flush remainder. if len(batch) > 0 { batches <- batch } return } if br.Err != nil { s.log.Error("failed to read key", zap.Error(br.Err)) return } value := br.ReadVarBytes() if br.Err != nil { s.log.Error("failed to read value", zap.Error(br.Err)) return } if skip { if bytes.Equal(key, lastKey) { skip = false } continue } batch = append(batch, storage.KeyValue{Key: key, Value: value}) if len(batch) >= s.cfg.KeyValueBatchSize { batches <- batch batch = make([]storage.KeyValue, 0, s.cfg.KeyValueBatchSize) } } }() for { select { case <-s.Ctx.Done(): s.stopService(false) return case batch, ok := <-batches: if !ok { s.stopService(false) return } if len(batch) == 0 { continue } if err = s.chain.AddContractStorageItems(batch, syncHeight, expectedRoot); err != nil { s.log.Error("failed to add storage batch", zap.Error(err)) s.stopService(true) return } } } } func (s *Service) objectGet(ctx context.Context, oid string) (io.ReadCloser, error) { u, err := url.Parse(fmt.Sprintf("%s:%s/%s", neofs.URIScheme, s.cfg.ContainerID, oid)) if err != nil { return nil, err } var rc io.ReadCloser err = s.Retry(func() error { rc, err = neofs.GetWithClient(ctx, s.Pool, s.Account.PrivateKey(), u, false) return err }) return rc, err } // IsActive checks if the service is running. func (s *Service) IsActive() bool { return s.isActive.Load() && !s.isShutdown.Load() } // IsShutdown checks if the service is fully shut down. func (s *Service) IsShutdown() bool { return s.isShutdown.Load() }