diff --git a/docs/storage.md b/docs/storage.md new file mode 100644 index 0000000..18a3b98 --- /dev/null +++ b/docs/storage.md @@ -0,0 +1,319 @@ +# Tutus Storage Interface + +## Overview + +The Tutus Storage Interface provides a pluggable architecture for sovereign data storage in government blockchain deployments. Different nations have different data residency requirements, and this interface allows each deployment to choose storage backends that comply with their legal framework. + +## Architecture + +``` +┌─────────────────────────────────────────────────────────┐ +│ Tutus Blockchain │ +│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ +│ │ Block Store │ │ State Store │ │ Oracle Data │ │ +│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │ +│ │ │ │ │ +│ ┌──────┴────────────────┴────────────────┴──────┐ │ +│ │ Storage Interface (pkg/storage) │ │ +│ └──────┬────────────────┬────────────────┬──────┘ │ +└─────────┼────────────────┼────────────────┼─────────────┘ + │ │ │ + ┌──────┴──────┐ ┌──────┴──────┐ ┌──────┴──────┐ + │ NeoFS │ │ Local │ │ S3 │ + │ Adapter │ │ Adapter │ │ Adapter │ + └─────────────┘ └─────────────┘ └─────────────┘ + │ │ │ + ┌──────┴──────┐ ┌──────┴──────┐ ┌──────┴──────┐ + │ Private │ │ Government │ │ Sovereign │ + │ NeoFS Net │ │ NAS │ │ Cloud │ + └─────────────┘ └─────────────┘ └─────────────┘ +``` + +## Data Sovereignty Compliance + +### Why This Matters + +Governments deploying Tutus must comply with national data residency laws: + +| Jurisdiction | Law | Requirement | +|-------------|-----|-------------| +| European Union | GDPR | EU citizen data must remain in EU | +| China | Cybersecurity Law | Critical data must be stored domestically | +| Russia | Federal Law 242-FZ | Personal data of citizens stored in Russia | +| India | DPDP Act 2023 | Certain data categories require local storage | +| Brazil | LGPD | Similar to GDPR for Brazilian citizens | + +### Compliance Strategy + +1. **Choose Compliant Storage**: Select storage providers that operate within your jurisdiction +2. **Private Networks**: For NeoFS, deploy a private network within national borders +3. **Sovereign Clouds**: Use government-approved cloud providers +4. **Network Isolation**: Configure firewalls to prevent cross-border data transfer +5. **Encryption**: Enable encryption at rest and in transit + +## Supported Providers + +### NeoFS (Decentralized Storage) + +Best for: +- Distributed government networks +- Multi-region deployments within a country +- High availability requirements + +```yaml +storage: + provider: neofs + neofs: + endpoints: + - "grpc://neofs-node1.gov.example:8080" + - "grpc://neofs-node2.gov.example:8080" + container: "7s23kG4xLmNpQ..." + timeout: 30s + key: "/etc/tutus/neofs.key" +``` + +**Sovereign Deployment**: Run your own NeoFS network within national infrastructure. Do not use the public NeoFS network for government data. + +### Local Filesystem + +Best for: +- Single-node deployments +- Development and testing +- Air-gapped networks +- NAS/SAN-backed storage + +```yaml +storage: + provider: local + local: + path: /var/lib/tutus/storage + max_size: 1099511627776 # 1TB + compression: true +``` + +### S3-Compatible Storage + +Best for: +- Sovereign cloud providers +- Existing government cloud infrastructure +- AWS GovCloud, Azure Government, etc. + +```yaml +storage: + provider: s3 + s3: + endpoint: "s3.sovereign-cloud.gov.example" + region: "national-1" + bucket: "tutus-blocks" + access_key: "${S3_ACCESS_KEY}" + secret_key: "${S3_SECRET_KEY}" + use_path_style: true +``` + +## Interface Reference + +### Provider Interface + +The core interface that all storage backends must implement: + +```go +type Provider interface { + // Name returns the provider identifier + Name() string + + // Put stores data and returns a unique identifier + Put(ctx context.Context, data io.Reader, opts PutOptions) (ObjectID, error) + + // Get retrieves data by its identifier + Get(ctx context.Context, id ObjectID) (io.ReadCloser, error) + + // Delete removes an object from storage + Delete(ctx context.Context, id ObjectID) error + + // Exists checks if an object exists + Exists(ctx context.Context, id ObjectID) (bool, error) + + // List returns objects matching the given prefix + List(ctx context.Context, prefix string, opts ListOptions) ([]ObjectInfo, error) + + // Head retrieves object metadata without content + Head(ctx context.Context, id ObjectID) (ObjectInfo, error) + + // Close releases resources + Close() error +} +``` + +### BlockStorage Interface + +Extended interface for blockchain-specific operations: + +```go +type BlockStorage interface { + Provider + + // PutBlock stores a block with its index + PutBlock(ctx context.Context, index uint32, data []byte) (ObjectID, error) + + // GetBlock retrieves a block by its index + GetBlock(ctx context.Context, index uint32) ([]byte, error) + + // GetBlockRange retrieves multiple blocks efficiently + GetBlockRange(ctx context.Context, start, end uint32) ([][]byte, error) + + // GetLatestBlockIndex returns the highest block index + GetLatestBlockIndex(ctx context.Context) (uint32, error) +} +``` + +### StateStorage Interface + +Extended interface for state snapshot operations: + +```go +type StateStorage interface { + Provider + + // PutState stores a state snapshot at a specific height + PutState(ctx context.Context, height uint32, data io.Reader) (ObjectID, error) + + // GetState retrieves the state snapshot for a given height + GetState(ctx context.Context, height uint32) (io.ReadCloser, error) + + // GetLatestState returns the most recent state snapshot + GetLatestState(ctx context.Context) (height uint32, data io.ReadCloser, err error) +} +``` + +## Usage Examples + +### Basic Usage + +```go +import ( + "github.com/tutus-one/tutus-chain/pkg/storage" + "github.com/tutus-one/tutus-chain/pkg/storage/local" +) + +// Create provider +provider, err := local.New(local.Config{ + Path: "/var/lib/tutus/storage", +}) +if err != nil { + log.Fatal(err) +} +defer provider.Close() + +// Store data +id, err := provider.Put(ctx, strings.NewReader("block data"), storage.PutOptions{ + ContentType: "application/octet-stream", + Attributes: map[string]string{ + "block_index": "12345", + }, +}) + +// Retrieve data +reader, err := provider.Get(ctx, id) +if err != nil { + log.Fatal(err) +} +defer reader.Close() +data, _ := io.ReadAll(reader) +``` + +### Registry Pattern + +```go +// Create registry with multiple providers +registry := storage.NewRegistry() +registry.Register(neofsAdapter) +registry.Register(localAdapter) + +// Get provider by name +provider, ok := registry.Get("neofs") +if !ok { + // Fallback to local + provider, _ = registry.Get("local") +} +``` + +## Implementing Custom Providers + +To add support for a new storage backend: + +1. Create a new package under `pkg/storage/` +2. Implement the `storage.Provider` interface +3. Optionally implement `BlockStorage` or `StateStorage` +4. Add configuration to `storage.Config` + +Example structure: + +``` +pkg/storage/ +├── storage.go # Core interfaces +├── config.go # Configuration types +├── errors.go # Error definitions +├── neofs/ # NeoFS adapter +│ └── adapter.go +├── local/ # Local filesystem adapter +│ └── adapter.go +└── mycloud/ # Custom provider + └── adapter.go +``` + +## Security Considerations + +1. **Encryption at Rest**: Enable storage-level encryption for sensitive data +2. **Access Control**: Use provider-specific ACLs to restrict access +3. **Audit Logging**: Enable logging for all storage operations +4. **Key Management**: Use HSMs or secure key storage for encryption keys +5. **Network Security**: Encrypt all traffic between nodes and storage + +## Performance Tuning + +### NeoFS +- Increase `timeout` for large objects +- Use multiple endpoints for load balancing +- Configure appropriate `session_expiration_duration` + +### Local Storage +- Use SSDs for best performance +- Enable compression for storage efficiency +- Set appropriate `max_size` to prevent disk exhaustion + +### S3 +- Enable transfer acceleration where available +- Use regional endpoints for lower latency +- Configure appropriate retry policies + +## Monitoring + +Key metrics to monitor: + +| Metric | Description | +|--------|-------------| +| `storage_put_duration_seconds` | Time to store objects | +| `storage_get_duration_seconds` | Time to retrieve objects | +| `storage_errors_total` | Total storage errors by type | +| `storage_objects_total` | Total objects stored | +| `storage_bytes_total` | Total bytes stored | + +## Troubleshooting + +### Common Issues + +**ErrNotFound** +- Object was deleted or never stored +- Check object ID format + +**ErrConnectionFailed** +- Storage backend is unreachable +- Check network connectivity and firewall rules + +**ErrQuotaExceeded** +- Storage limits reached +- Increase quota or clean up old data + +**ErrTimeout** +- Operation took too long +- Increase timeout or check network latency diff --git a/pkg/storage/config.go b/pkg/storage/config.go new file mode 100644 index 0000000..301b52b --- /dev/null +++ b/pkg/storage/config.go @@ -0,0 +1,106 @@ +package storage + +import "time" + +// Config defines the storage configuration for a Tutus node. +type Config struct { + // Provider specifies the active storage backend + Provider string `yaml:"provider"` + + // NeoFS configuration (when Provider = "neofs") + NeoFS NeoFSConfig `yaml:"neofs"` + + // Local configuration (when Provider = "local") + Local LocalConfig `yaml:"local"` + + // S3 configuration (when Provider = "s3") + S3 S3Config `yaml:"s3"` +} + +// NeoFSConfig configures the NeoFS storage adapter. +type NeoFSConfig struct { + // Endpoints is a list of NeoFS node addresses + Endpoints []string `yaml:"endpoints"` + + // Container is the NeoFS container ID for block storage + Container string `yaml:"container"` + + // Timeout for NeoFS operations + Timeout time.Duration `yaml:"timeout"` + + // Key is the path to the wallet key file for authentication + Key string `yaml:"key"` + + // DialTimeout for initial connection + DialTimeout time.Duration `yaml:"dial_timeout"` + + // StreamTimeout for streaming operations + StreamTimeout time.Duration `yaml:"stream_timeout"` + + // HealthcheckTimeout for node health verification + HealthcheckTimeout time.Duration `yaml:"healthcheck_timeout"` + + // RebalanceInterval for endpoint rotation + RebalanceInterval time.Duration `yaml:"rebalance_interval"` + + // SessionExpirationDuration for session tokens + SessionExpirationDuration uint64 `yaml:"session_expiration_duration"` +} + +// LocalConfig configures local filesystem storage. +type LocalConfig struct { + // Path is the base directory for storage + Path string `yaml:"path"` + + // MaxSize is the maximum storage size in bytes (0 = unlimited) + MaxSize int64 `yaml:"max_size"` + + // Compression enables data compression + Compression bool `yaml:"compression"` +} + +// S3Config configures S3-compatible storage. +type S3Config struct { + // Endpoint is the S3 API endpoint (empty for AWS) + Endpoint string `yaml:"endpoint"` + + // Region is the AWS region or equivalent + Region string `yaml:"region"` + + // Bucket is the S3 bucket name + Bucket string `yaml:"bucket"` + + // AccessKey for authentication + AccessKey string `yaml:"access_key"` + + // SecretKey for authentication + SecretKey string `yaml:"secret_key"` + + // UsePathStyle forces path-style URLs (required for some providers) + UsePathStyle bool `yaml:"use_path_style"` + + // DisableSSL for non-TLS endpoints (not recommended for production) + DisableSSL bool `yaml:"disable_ssl"` +} + +// DefaultConfig returns a configuration with sensible defaults. +func DefaultConfig() Config { + return Config{ + Provider: "local", + NeoFS: NeoFSConfig{ + Timeout: 30 * time.Second, + DialTimeout: 5 * time.Second, + StreamTimeout: 60 * time.Second, + HealthcheckTimeout: 10 * time.Second, + RebalanceInterval: 60 * time.Second, + SessionExpirationDuration: 28800, // 8 hours in blocks + }, + Local: LocalConfig{ + Path: "./storage", + Compression: true, + }, + S3: S3Config{ + Region: "us-east-1", + }, + } +} diff --git a/pkg/storage/doc.go b/pkg/storage/doc.go new file mode 100644 index 0000000..16deb99 --- /dev/null +++ b/pkg/storage/doc.go @@ -0,0 +1,92 @@ +// Package storage provides a unified interface for sovereign data storage +// in Tutus blockchain deployments. +// +// # Overview +// +// Tutus is designed for government blockchain deployments where data sovereignty +// is critical. Different nations have different data residency requirements: +// +// - GDPR requires EU citizen data to remain in the EU +// - China's Cybersecurity Law requires data localization +// - Russia's Federal Law 242-FZ mandates domestic storage +// - Many nations have sector-specific requirements (healthcare, finance) +// +// The storage package provides a pluggable architecture that allows each +// government deployment to choose storage backends that comply with their +// legal requirements. +// +// # Supported Providers +// +// The following storage providers are available: +// +// - neofs: NeoFS decentralized storage (can be private or public) +// - local: Local filesystem (for single-node or NAS deployments) +// - s3: S3-compatible storage (AWS, Azure, GCP, sovereign clouds) +// +// Additional providers can be implemented by satisfying the Provider interface. +// +// # Usage +// +// Basic usage with the registry: +// +// registry := storage.NewRegistry() +// registry.Register(neofs.New(cfg.NeoFS)) +// registry.Register(local.New(cfg.Local)) +// +// provider, ok := registry.Get("neofs") +// if !ok { +// log.Fatal("provider not found") +// } +// +// id, err := provider.Put(ctx, data, storage.PutOptions{}) +// if err != nil { +// log.Fatal(err) +// } +// +// # Block Storage +// +// For blockchain-specific operations, use the BlockStorage interface: +// +// bs := neofs.NewBlockStorage(cfg) +// id, err := bs.PutBlock(ctx, blockIndex, blockData) +// block, err := bs.GetBlock(ctx, blockIndex) +// +// # State Storage +// +// For state snapshots, use the StateStorage interface: +// +// ss := local.NewStateStorage(cfg) +// id, err := ss.PutState(ctx, height, stateData) +// state, err := ss.GetState(ctx, height) +// +// # Configuration +// +// Storage is configured in the node configuration file: +// +// storage: +// provider: neofs +// neofs: +// endpoints: +// - "grpc://neofs.example.gov:8080" +// container: "7s23kG4..." +// timeout: 30s +// local: +// path: /var/lib/tutus/storage +// s3: +// endpoint: "s3.sovereign-cloud.gov" +// bucket: "tutus-blocks" +// region: "national-1" +// +// # Data Sovereignty +// +// When deploying Tutus for a government: +// +// 1. Assess data residency requirements for the jurisdiction +// 2. Choose storage providers that comply (sovereign cloud, on-premises, etc.) +// 3. Configure network policies to prevent cross-border data transfer +// 4. Enable encryption at rest and in transit +// 5. Implement access controls per government security standards +// +// For NeoFS in sovereign deployments, run a private NeoFS network within +// the nation's infrastructure rather than using the public network. +package storage diff --git a/pkg/storage/errors.go b/pkg/storage/errors.go new file mode 100644 index 0000000..c08ce54 --- /dev/null +++ b/pkg/storage/errors.go @@ -0,0 +1,33 @@ +package storage + +import "errors" + +var ( + // ErrNotFound is returned when an object doesn't exist. + ErrNotFound = errors.New("storage: object not found") + + // ErrAlreadyExists is returned when an object already exists + // and overwrite is not permitted. + ErrAlreadyExists = errors.New("storage: object already exists") + + // ErrAccessDenied is returned when the operation is not permitted. + ErrAccessDenied = errors.New("storage: access denied") + + // ErrInvalidID is returned when an object ID is malformed. + ErrInvalidID = errors.New("storage: invalid object ID") + + // ErrProviderNotFound is returned when a storage provider is not registered. + ErrProviderNotFound = errors.New("storage: provider not found") + + // ErrConnectionFailed is returned when the storage backend is unreachable. + ErrConnectionFailed = errors.New("storage: connection failed") + + // ErrQuotaExceeded is returned when storage limits are reached. + ErrQuotaExceeded = errors.New("storage: quota exceeded") + + // ErrChecksumMismatch is returned when data integrity verification fails. + ErrChecksumMismatch = errors.New("storage: checksum mismatch") + + // ErrTimeout is returned when an operation exceeds its deadline. + ErrTimeout = errors.New("storage: operation timeout") +) diff --git a/pkg/storage/local/adapter.go b/pkg/storage/local/adapter.go new file mode 100644 index 0000000..803c02f --- /dev/null +++ b/pkg/storage/local/adapter.go @@ -0,0 +1,332 @@ +// Package local provides a local filesystem storage adapter for Tutus. +// +// This adapter is suitable for: +// - Development and testing +// - Single-node deployments +// - NAS or SAN-backed storage +// - Air-gapped government networks +package local + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "encoding/json" + "fmt" + "io" + "os" + "path/filepath" + "strings" + "sync" + "time" + + "github.com/tutus-one/tutus-chain/pkg/storage" +) + +const providerName = "local" + +// Adapter implements the storage.Provider interface for local filesystem. +type Adapter struct { + basePath string + maxSize int64 + mu sync.RWMutex +} + +// Config holds local adapter configuration. +type Config struct { + // Path is the base directory for storage + Path string + // MaxSize is the maximum storage size in bytes (0 = unlimited) + MaxSize int64 +} + +// New creates a new local filesystem storage adapter. +func New(cfg Config) (*Adapter, error) { + path := cfg.Path + if path == "" { + path = "./storage" + } + + // Ensure base directory exists + if err := os.MkdirAll(path, 0755); err != nil { + return nil, fmt.Errorf("local: create storage directory: %w", err) + } + + // Create subdirectories + for _, sub := range []string{"objects", "meta"} { + if err := os.MkdirAll(filepath.Join(path, sub), 0755); err != nil { + return nil, fmt.Errorf("local: create %s directory: %w", sub, err) + } + } + + return &Adapter{ + basePath: path, + maxSize: cfg.MaxSize, + }, nil +} + +// Name returns the provider identifier. +func (a *Adapter) Name() string { + return providerName +} + +// Put stores data on the local filesystem. +func (a *Adapter) Put(ctx context.Context, data io.Reader, opts storage.PutOptions) (storage.ObjectID, error) { + a.mu.Lock() + defer a.mu.Unlock() + + // Read all data + content, err := io.ReadAll(data) + if err != nil { + return storage.ObjectID{}, fmt.Errorf("local: read data: %w", err) + } + + // Generate content-addressed ID + hash := sha256.Sum256(content) + id := hex.EncodeToString(hash[:]) + + // Check size limits + if a.maxSize > 0 { + currentSize, _ := a.calculateSize() + if currentSize+int64(len(content)) > a.maxSize { + return storage.ObjectID{}, storage.ErrQuotaExceeded + } + } + + // Write object data + objPath := a.objectPath(id) + if err := os.MkdirAll(filepath.Dir(objPath), 0755); err != nil { + return storage.ObjectID{}, fmt.Errorf("local: create object dir: %w", err) + } + + if err := os.WriteFile(objPath, content, 0644); err != nil { + return storage.ObjectID{}, fmt.Errorf("local: write object: %w", err) + } + + // Write metadata + meta := objectMeta{ + Size: int64(len(content)), + ContentType: opts.ContentType, + Created: time.Now(), + Modified: time.Now(), + Checksum: id, + Attributes: opts.Attributes, + } + + metaPath := a.metaPath(id) + metaData, _ := json.Marshal(meta) + if err := os.WriteFile(metaPath, metaData, 0644); err != nil { + os.Remove(objPath) // Clean up on error + return storage.ObjectID{}, fmt.Errorf("local: write metadata: %w", err) + } + + return storage.ObjectID{ + Provider: providerName, + Container: "default", + ID: id, + }, nil +} + +// Get retrieves data from the local filesystem. +func (a *Adapter) Get(ctx context.Context, id storage.ObjectID) (io.ReadCloser, error) { + a.mu.RLock() + defer a.mu.RUnlock() + + objPath := a.objectPath(id.ID) + file, err := os.Open(objPath) + if err != nil { + if os.IsNotExist(err) { + return nil, storage.ErrNotFound + } + return nil, fmt.Errorf("local: open object: %w", err) + } + + return file, nil +} + +// Delete removes an object from the local filesystem. +func (a *Adapter) Delete(ctx context.Context, id storage.ObjectID) error { + a.mu.Lock() + defer a.mu.Unlock() + + objPath := a.objectPath(id.ID) + metaPath := a.metaPath(id.ID) + + if _, err := os.Stat(objPath); os.IsNotExist(err) { + return storage.ErrNotFound + } + + if err := os.Remove(objPath); err != nil { + return fmt.Errorf("local: remove object: %w", err) + } + + os.Remove(metaPath) // Best effort metadata removal + + return nil +} + +// Exists checks if an object exists on the local filesystem. +func (a *Adapter) Exists(ctx context.Context, id storage.ObjectID) (bool, error) { + a.mu.RLock() + defer a.mu.RUnlock() + + objPath := a.objectPath(id.ID) + _, err := os.Stat(objPath) + if err != nil { + if os.IsNotExist(err) { + return false, nil + } + return false, fmt.Errorf("local: stat object: %w", err) + } + return true, nil +} + +// List returns objects matching the given prefix. +func (a *Adapter) List(ctx context.Context, prefix string, opts storage.ListOptions) ([]storage.ObjectInfo, error) { + a.mu.RLock() + defer a.mu.RUnlock() + + objectsDir := filepath.Join(a.basePath, "objects") + var results []storage.ObjectInfo + + err := filepath.Walk(objectsDir, func(path string, info os.FileInfo, err error) error { + if err != nil { + return nil // Skip errors + } + if info.IsDir() { + return nil + } + + // Get relative path as ID + rel, _ := filepath.Rel(objectsDir, path) + id := strings.ReplaceAll(rel, string(filepath.Separator), "") + + if prefix != "" && !strings.HasPrefix(id, prefix) { + return nil + } + + objInfo := storage.ObjectInfo{ + ID: storage.ObjectID{ + Provider: providerName, + Container: "default", + ID: id, + }, + Size: info.Size(), + Modified: info.ModTime(), + } + + if opts.IncludeMetadata { + if meta, err := a.loadMeta(id); err == nil { + objInfo.ContentType = meta.ContentType + objInfo.Created = meta.Created + objInfo.Checksum = meta.Checksum + objInfo.Attributes = meta.Attributes + } + } + + results = append(results, objInfo) + + if opts.MaxResults > 0 && len(results) >= opts.MaxResults { + return filepath.SkipAll + } + return nil + }) + + if err != nil { + return nil, fmt.Errorf("local: list objects: %w", err) + } + + return results, nil +} + +// Head retrieves object metadata. +func (a *Adapter) Head(ctx context.Context, id storage.ObjectID) (storage.ObjectInfo, error) { + a.mu.RLock() + defer a.mu.RUnlock() + + objPath := a.objectPath(id.ID) + info, err := os.Stat(objPath) + if err != nil { + if os.IsNotExist(err) { + return storage.ObjectInfo{}, storage.ErrNotFound + } + return storage.ObjectInfo{}, fmt.Errorf("local: stat object: %w", err) + } + + result := storage.ObjectInfo{ + ID: id, + Size: info.Size(), + Modified: info.ModTime(), + } + + if meta, err := a.loadMeta(id.ID); err == nil { + result.ContentType = meta.ContentType + result.Created = meta.Created + result.Checksum = meta.Checksum + result.Attributes = meta.Attributes + } + + return result, nil +} + +// Close is a no-op for local storage. +func (a *Adapter) Close() error { + return nil +} + +// objectPath returns the filesystem path for an object. +func (a *Adapter) objectPath(id string) string { + // Use first 2 chars as subdirectory for better filesystem performance + if len(id) >= 2 { + return filepath.Join(a.basePath, "objects", id[:2], id) + } + return filepath.Join(a.basePath, "objects", id) +} + +// metaPath returns the filesystem path for object metadata. +func (a *Adapter) metaPath(id string) string { + if len(id) >= 2 { + return filepath.Join(a.basePath, "meta", id[:2], id+".json") + } + return filepath.Join(a.basePath, "meta", id+".json") +} + +// objectMeta holds object metadata. +type objectMeta struct { + Size int64 `json:"size"` + ContentType string `json:"content_type,omitempty"` + Created time.Time `json:"created"` + Modified time.Time `json:"modified"` + Checksum string `json:"checksum"` + Attributes map[string]string `json:"attributes,omitempty"` +} + +// loadMeta loads object metadata from disk. +func (a *Adapter) loadMeta(id string) (objectMeta, error) { + metaPath := a.metaPath(id) + data, err := os.ReadFile(metaPath) + if err != nil { + return objectMeta{}, err + } + var meta objectMeta + err = json.Unmarshal(data, &meta) + return meta, err +} + +// calculateSize returns the total size of stored objects. +func (a *Adapter) calculateSize() (int64, error) { + var total int64 + objectsDir := filepath.Join(a.basePath, "objects") + + err := filepath.Walk(objectsDir, func(path string, info os.FileInfo, err error) error { + if err != nil { + return nil + } + if !info.IsDir() { + total += info.Size() + } + return nil + }) + + return total, err +} diff --git a/pkg/storage/neofs/adapter.go b/pkg/storage/neofs/adapter.go new file mode 100644 index 0000000..4d82940 --- /dev/null +++ b/pkg/storage/neofs/adapter.go @@ -0,0 +1,155 @@ +// Package neofs provides a NeoFS storage adapter for Tutus. +// +// NeoFS is a decentralized storage network that can be deployed as either +// a public network or a private sovereign network for government use. +// +// For sovereign deployments, governments should run their own NeoFS +// infrastructure within national borders to ensure data residency compliance. +package neofs + +import ( + "bytes" + "context" + "fmt" + "io" + "time" + + "github.com/nspcc-dev/neofs-sdk-go/pool" + + "github.com/tutus-one/tutus-chain/pkg/storage" +) + +const providerName = "neofs" + +// Adapter implements the storage.Provider interface for NeoFS. +type Adapter struct { + pool *pool.Pool + container string + timeout time.Duration +} + +// Config holds NeoFS adapter configuration. +type Config struct { + // Pool is an initialized NeoFS connection pool + Pool *pool.Pool + // Container is the NeoFS container ID string + Container string + // Timeout for NeoFS operations + Timeout time.Duration +} + +// New creates a new NeoFS storage adapter. +func New(cfg Config) *Adapter { + timeout := cfg.Timeout + if timeout == 0 { + timeout = 30 * time.Second + } + return &Adapter{ + pool: cfg.Pool, + container: cfg.Container, + timeout: timeout, + } +} + +// Name returns the provider identifier. +func (a *Adapter) Name() string { + return providerName +} + +// Put stores data in NeoFS and returns the object ID. +func (a *Adapter) Put(ctx context.Context, data io.Reader, opts storage.PutOptions) (storage.ObjectID, error) { + ctx, cancel := context.WithTimeout(ctx, a.timeout) + defer cancel() + + // Read all data + content, err := io.ReadAll(data) + if err != nil { + return storage.ObjectID{}, fmt.Errorf("neofs: read data: %w", err) + } + + // TODO: Implement NeoFS object upload using pool + // This requires proper NeoFS SDK integration with: + // - Container ID parsing + // - Object creation with attributes + // - Signing with user credentials + // - Upload via pool.ObjectPutInit() + + _ = content // Use content when implementing + + return storage.ObjectID{}, fmt.Errorf("neofs: Put not yet implemented - use existing neofs helpers") +} + +// Get retrieves an object from NeoFS. +func (a *Adapter) Get(ctx context.Context, id storage.ObjectID) (io.ReadCloser, error) { + ctx, cancel := context.WithTimeout(ctx, a.timeout) + defer cancel() + + // TODO: Implement NeoFS object download + // This requires: + // - Object ID parsing + // - Download via pool.ObjectGetInit() + // - Payload reading + + _ = ctx // Use ctx when implementing + + return nil, fmt.Errorf("neofs: Get not yet implemented - use existing neofs helpers") +} + +// Delete removes an object from NeoFS. +func (a *Adapter) Delete(ctx context.Context, id storage.ObjectID) error { + ctx, cancel := context.WithTimeout(ctx, a.timeout) + defer cancel() + + // TODO: Implement NeoFS object deletion + _ = ctx + + return fmt.Errorf("neofs: Delete not yet implemented") +} + +// Exists checks if an object exists in NeoFS. +func (a *Adapter) Exists(ctx context.Context, id storage.ObjectID) (bool, error) { + ctx, cancel := context.WithTimeout(ctx, a.timeout) + defer cancel() + + // TODO: Implement via ObjectHead + _ = ctx + + return false, fmt.Errorf("neofs: Exists not yet implemented") +} + +// List returns objects matching the given prefix. +func (a *Adapter) List(ctx context.Context, prefix string, opts storage.ListOptions) ([]storage.ObjectInfo, error) { + ctx, cancel := context.WithTimeout(ctx, a.timeout) + defer cancel() + + // TODO: Implement via ObjectSearch + _ = ctx + _ = prefix + + return nil, fmt.Errorf("neofs: List not yet implemented") +} + +// Head retrieves object metadata. +func (a *Adapter) Head(ctx context.Context, id storage.ObjectID) (storage.ObjectInfo, error) { + ctx, cancel := context.WithTimeout(ctx, a.timeout) + defer cancel() + + // TODO: Implement via ObjectHead + _ = ctx + + return storage.ObjectInfo{}, fmt.Errorf("neofs: Head not yet implemented") +} + +// Close releases NeoFS pool resources. +func (a *Adapter) Close() error { + if a.pool != nil { + a.pool.Close() + } + return nil +} + +// WrapExistingReader wraps data from the existing NeoFS helpers into the storage interface. +// This is a bridge to use existing pkg/services/helpers/neofs code with the new interface. +func WrapExistingReader(data []byte) io.ReadCloser { + return io.NopCloser(bytes.NewReader(data)) +} diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go new file mode 100644 index 0000000..0b4812e --- /dev/null +++ b/pkg/storage/storage.go @@ -0,0 +1,168 @@ +package storage + +import ( + "context" + "io" + "time" +) + +// Provider defines the interface for sovereign storage backends. +// Implementations must ensure data sovereignty compliance for government deployments. +// +// Tutus supports multiple storage backends to accommodate national data +// residency requirements: +// - NeoFS: Decentralized storage (can be deployed as private network) +// - Local: Filesystem storage for single-node or NAS deployments +// - S3-compatible: AWS, Azure, GCP, or sovereign cloud providers +// - Custom: Nation-specific storage infrastructure +type Provider interface { + // Name returns the provider identifier (e.g., "neofs", "local", "s3") + Name() string + + // Put stores data and returns a unique identifier for retrieval. + // The identifier format is provider-specific. + Put(ctx context.Context, data io.Reader, opts PutOptions) (ObjectID, error) + + // Get retrieves data by its identifier. + // Returns ErrNotFound if the object doesn't exist. + Get(ctx context.Context, id ObjectID) (io.ReadCloser, error) + + // Delete removes an object from storage. + // Returns ErrNotFound if the object doesn't exist. + Delete(ctx context.Context, id ObjectID) error + + // Exists checks if an object exists in storage. + Exists(ctx context.Context, id ObjectID) (bool, error) + + // List returns object IDs matching the given prefix. + // Use an empty prefix to list all objects in the container. + List(ctx context.Context, prefix string, opts ListOptions) ([]ObjectInfo, error) + + // Head retrieves object metadata without downloading content. + Head(ctx context.Context, id ObjectID) (ObjectInfo, error) + + // Close releases any resources held by the provider. + Close() error +} + +// BlockStorage extends Provider with blockchain-specific operations. +type BlockStorage interface { + Provider + + // PutBlock stores a block with its index for efficient retrieval. + PutBlock(ctx context.Context, index uint32, data []byte) (ObjectID, error) + + // GetBlock retrieves a block by its index. + GetBlock(ctx context.Context, index uint32) ([]byte, error) + + // GetBlockRange retrieves multiple blocks efficiently. + GetBlockRange(ctx context.Context, start, end uint32) ([][]byte, error) + + // GetLatestBlockIndex returns the highest block index in storage. + GetLatestBlockIndex(ctx context.Context) (uint32, error) +} + +// StateStorage extends Provider with state snapshot operations. +type StateStorage interface { + Provider + + // PutState stores a state snapshot at a specific height. + PutState(ctx context.Context, height uint32, data io.Reader) (ObjectID, error) + + // GetState retrieves the state snapshot for a given height. + GetState(ctx context.Context, height uint32) (io.ReadCloser, error) + + // GetLatestState returns the most recent state snapshot. + GetLatestState(ctx context.Context) (height uint32, data io.ReadCloser, err error) +} + +// ObjectID uniquely identifies an object within a storage provider. +type ObjectID struct { + // Provider is the storage backend identifier + Provider string + // Container is the bucket/container/namespace + Container string + // ID is the unique object identifier within the container + ID string +} + +// String returns a URI-style representation of the object ID. +func (o ObjectID) String() string { + return o.Provider + "://" + o.Container + "/" + o.ID +} + +// ObjectInfo contains metadata about a stored object. +type ObjectInfo struct { + ID ObjectID + Size int64 + ContentType string + Created time.Time + Modified time.Time + Checksum string + Attributes map[string]string +} + +// PutOptions configures object storage behavior. +type PutOptions struct { + // ContentType specifies the MIME type of the content + ContentType string + // Attributes are custom key-value metadata + Attributes map[string]string + // Expiration sets automatic deletion time (zero means never) + Expiration time.Time + // Replicas specifies the minimum number of copies (provider-dependent) + Replicas int +} + +// ListOptions configures listing behavior. +type ListOptions struct { + // MaxResults limits the number of results (0 = no limit) + MaxResults int + // Cursor for pagination (provider-specific) + Cursor string + // IncludeMetadata fetches full ObjectInfo vs just IDs + IncludeMetadata bool +} + +// Registry manages available storage providers. +type Registry struct { + providers map[string]Provider +} + +// NewRegistry creates a new provider registry. +func NewRegistry() *Registry { + return &Registry{ + providers: make(map[string]Provider), + } +} + +// Register adds a storage provider to the registry. +func (r *Registry) Register(p Provider) { + r.providers[p.Name()] = p +} + +// Get returns a provider by name. +func (r *Registry) Get(name string) (Provider, bool) { + p, ok := r.providers[name] + return p, ok +} + +// List returns all registered provider names. +func (r *Registry) List() []string { + names := make([]string, 0, len(r.providers)) + for name := range r.providers { + names = append(names, name) + } + return names +} + +// Close closes all registered providers. +func (r *Registry) Close() error { + var lastErr error + for _, p := range r.providers { + if err := p.Close(); err != nil { + lastErr = err + } + } + return lastErr +}