tutus-chain/pkg/storage/local/adapter.go

599 lines
15 KiB
Go

// Package local provides a local filesystem storage adapter for Tutus.
//
// This adapter is suitable for:
// - Development and testing
// - Single-node deployments
// - NAS or SAN-backed storage
// - Air-gapped government networks
package local
import (
"context"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"os"
"path/filepath"
"strings"
"sync"
"time"
"github.com/tutus-one/tutus-chain/pkg/storage"
)
const providerName = "local"
// Adapter implements the storage.Provider interface for local filesystem.
type Adapter struct {
basePath string
maxSize int64
mu sync.RWMutex
}
// Config holds local adapter configuration.
type Config struct {
// Path is the base directory for storage
Path string
// MaxSize is the maximum storage size in bytes (0 = unlimited)
MaxSize int64
}
// New creates a new local filesystem storage adapter.
func New(cfg Config) (*Adapter, error) {
path := cfg.Path
if path == "" {
path = "./storage"
}
// Ensure base directory exists
if err := os.MkdirAll(path, 0755); err != nil {
return nil, fmt.Errorf("local: create storage directory: %w", err)
}
// Create subdirectories
for _, sub := range []string{"objects", "meta", "blocks", "states"} {
if err := os.MkdirAll(filepath.Join(path, sub), 0755); err != nil {
return nil, fmt.Errorf("local: create %s directory: %w", sub, err)
}
}
return &Adapter{
basePath: path,
maxSize: cfg.MaxSize,
}, nil
}
// Name returns the provider identifier.
func (a *Adapter) Name() string {
return providerName
}
// Put stores data on the local filesystem.
func (a *Adapter) Put(ctx context.Context, data io.Reader, opts storage.PutOptions) (storage.ObjectID, error) {
a.mu.Lock()
defer a.mu.Unlock()
// Read all data
content, err := io.ReadAll(data)
if err != nil {
return storage.ObjectID{}, fmt.Errorf("local: read data: %w", err)
}
// Generate content-addressed ID
hash := sha256.Sum256(content)
id := hex.EncodeToString(hash[:])
// Check size limits
if a.maxSize > 0 {
currentSize, _ := a.calculateSize()
if currentSize+int64(len(content)) > a.maxSize {
return storage.ObjectID{}, storage.ErrQuotaExceeded
}
}
// Write object data
objPath := a.objectPath(id)
if err := os.MkdirAll(filepath.Dir(objPath), 0755); err != nil {
return storage.ObjectID{}, fmt.Errorf("local: create object dir: %w", err)
}
if err := os.WriteFile(objPath, content, 0644); err != nil {
return storage.ObjectID{}, fmt.Errorf("local: write object: %w", err)
}
// Write metadata
meta := objectMeta{
Size: int64(len(content)),
ContentType: opts.ContentType,
Created: time.Now(),
Modified: time.Now(),
Checksum: id,
Attributes: opts.Attributes,
}
metaPath := a.metaPath(id)
if err := os.MkdirAll(filepath.Dir(metaPath), 0755); err != nil {
os.Remove(objPath) // Clean up on error
return storage.ObjectID{}, fmt.Errorf("local: create meta dir: %w", err)
}
metaData, _ := json.Marshal(meta)
if err := os.WriteFile(metaPath, metaData, 0644); err != nil {
os.Remove(objPath) // Clean up on error
return storage.ObjectID{}, fmt.Errorf("local: write metadata: %w", err)
}
return storage.ObjectID{
Provider: providerName,
Container: "default",
ID: id,
}, nil
}
// Get retrieves data from the local filesystem.
func (a *Adapter) Get(ctx context.Context, id storage.ObjectID) (io.ReadCloser, error) {
a.mu.RLock()
defer a.mu.RUnlock()
objPath := a.objectPath(id.ID)
file, err := os.Open(objPath)
if err != nil {
if os.IsNotExist(err) {
return nil, storage.ErrNotFound
}
return nil, fmt.Errorf("local: open object: %w", err)
}
return file, nil
}
// Delete removes an object from the local filesystem.
func (a *Adapter) Delete(ctx context.Context, id storage.ObjectID) error {
a.mu.Lock()
defer a.mu.Unlock()
objPath := a.objectPath(id.ID)
metaPath := a.metaPath(id.ID)
if _, err := os.Stat(objPath); os.IsNotExist(err) {
return storage.ErrNotFound
}
if err := os.Remove(objPath); err != nil {
return fmt.Errorf("local: remove object: %w", err)
}
os.Remove(metaPath) // Best effort metadata removal
return nil
}
// Exists checks if an object exists on the local filesystem.
func (a *Adapter) Exists(ctx context.Context, id storage.ObjectID) (bool, error) {
a.mu.RLock()
defer a.mu.RUnlock()
objPath := a.objectPath(id.ID)
_, err := os.Stat(objPath)
if err != nil {
if os.IsNotExist(err) {
return false, nil
}
return false, fmt.Errorf("local: stat object: %w", err)
}
return true, nil
}
// List returns objects matching the given prefix.
func (a *Adapter) List(ctx context.Context, prefix string, opts storage.ListOptions) ([]storage.ObjectInfo, error) {
a.mu.RLock()
defer a.mu.RUnlock()
objectsDir := filepath.Join(a.basePath, "objects")
var results []storage.ObjectInfo
err := filepath.Walk(objectsDir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return nil // Skip errors
}
if info.IsDir() {
return nil
}
// Get relative path as ID
rel, _ := filepath.Rel(objectsDir, path)
id := strings.ReplaceAll(rel, string(filepath.Separator), "")
if prefix != "" && !strings.HasPrefix(id, prefix) {
return nil
}
objInfo := storage.ObjectInfo{
ID: storage.ObjectID{
Provider: providerName,
Container: "default",
ID: id,
},
Size: info.Size(),
Modified: info.ModTime(),
}
if opts.IncludeMetadata {
if meta, err := a.loadMeta(id); err == nil {
objInfo.ContentType = meta.ContentType
objInfo.Created = meta.Created
objInfo.Checksum = meta.Checksum
objInfo.Attributes = meta.Attributes
}
}
results = append(results, objInfo)
if opts.MaxResults > 0 && len(results) >= opts.MaxResults {
return filepath.SkipAll
}
return nil
})
if err != nil {
return nil, fmt.Errorf("local: list objects: %w", err)
}
return results, nil
}
// Head retrieves object metadata.
func (a *Adapter) Head(ctx context.Context, id storage.ObjectID) (storage.ObjectInfo, error) {
a.mu.RLock()
defer a.mu.RUnlock()
objPath := a.objectPath(id.ID)
info, err := os.Stat(objPath)
if err != nil {
if os.IsNotExist(err) {
return storage.ObjectInfo{}, storage.ErrNotFound
}
return storage.ObjectInfo{}, fmt.Errorf("local: stat object: %w", err)
}
result := storage.ObjectInfo{
ID: id,
Size: info.Size(),
Modified: info.ModTime(),
}
if meta, err := a.loadMeta(id.ID); err == nil {
result.ContentType = meta.ContentType
result.Created = meta.Created
result.Checksum = meta.Checksum
result.Attributes = meta.Attributes
}
return result, nil
}
// Close is a no-op for local storage.
func (a *Adapter) Close() error {
return nil
}
// =============================================================================
// BlockStorage Interface Implementation
// =============================================================================
// PutBlock stores a block with its index.
func (a *Adapter) PutBlock(ctx context.Context, index uint32, data []byte) (storage.ObjectID, error) {
a.mu.Lock()
defer a.mu.Unlock()
// Check size limits
if a.maxSize > 0 {
currentSize, _ := a.calculateSize()
if currentSize+int64(len(data)) > a.maxSize {
return storage.ObjectID{}, storage.ErrQuotaExceeded
}
}
// Write block file
blockPath := a.blockPath(index)
if err := os.MkdirAll(filepath.Dir(blockPath), 0755); err != nil {
return storage.ObjectID{}, fmt.Errorf("local: create block dir: %w", err)
}
if err := os.WriteFile(blockPath, data, 0644); err != nil {
return storage.ObjectID{}, fmt.Errorf("local: write block: %w", err)
}
// Generate content hash for ID
hash := sha256.Sum256(data)
id := hex.EncodeToString(hash[:])
return storage.ObjectID{
Provider: providerName,
Container: "blocks",
ID: id,
}, nil
}
// GetBlock retrieves a block by its index.
func (a *Adapter) GetBlock(ctx context.Context, index uint32) ([]byte, error) {
a.mu.RLock()
defer a.mu.RUnlock()
blockPath := a.blockPath(index)
data, err := os.ReadFile(blockPath)
if err != nil {
if os.IsNotExist(err) {
return nil, storage.ErrNotFound
}
return nil, fmt.Errorf("local: read block: %w", err)
}
return data, nil
}
// GetBlockRange retrieves multiple blocks efficiently.
func (a *Adapter) GetBlockRange(ctx context.Context, start, end uint32) ([][]byte, error) {
a.mu.RLock()
defer a.mu.RUnlock()
if start > end {
return nil, fmt.Errorf("local: invalid block range: start %d > end %d", start, end)
}
blocks := make([][]byte, 0, end-start+1)
for i := start; i <= end; i++ {
blockPath := a.blockPath(i)
data, err := os.ReadFile(blockPath)
if err != nil {
if os.IsNotExist(err) {
return nil, storage.ErrNotFound
}
return nil, fmt.Errorf("local: read block %d: %w", i, err)
}
blocks = append(blocks, data)
}
return blocks, nil
}
// GetLatestBlockIndex returns the highest block index in storage.
func (a *Adapter) GetLatestBlockIndex(ctx context.Context) (uint32, error) {
a.mu.RLock()
defer a.mu.RUnlock()
blocksDir := filepath.Join(a.basePath, "blocks")
var latest uint32
var found bool
err := filepath.Walk(blocksDir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return nil // Skip errors
}
if info.IsDir() {
return nil
}
// Parse block index from filename
name := info.Name()
if !strings.HasSuffix(name, ".block") {
return nil
}
var index uint32
if _, err := fmt.Sscanf(name, "%d.block", &index); err != nil {
return nil
}
if !found || index > latest {
latest = index
found = true
}
return nil
})
if err != nil {
return 0, fmt.Errorf("local: scan blocks: %w", err)
}
if !found {
return 0, storage.ErrNotFound
}
return latest, nil
}
// =============================================================================
// StateStorage Interface Implementation
// =============================================================================
// PutState stores a state snapshot at a specific height.
func (a *Adapter) PutState(ctx context.Context, height uint32, data io.Reader) (storage.ObjectID, error) {
a.mu.Lock()
defer a.mu.Unlock()
// Read all data
content, err := io.ReadAll(data)
if err != nil {
return storage.ObjectID{}, fmt.Errorf("local: read state data: %w", err)
}
// Check size limits
if a.maxSize > 0 {
currentSize, _ := a.calculateSize()
if currentSize+int64(len(content)) > a.maxSize {
return storage.ObjectID{}, storage.ErrQuotaExceeded
}
}
// Write state file
statePath := a.statePath(height)
if err := os.MkdirAll(filepath.Dir(statePath), 0755); err != nil {
return storage.ObjectID{}, fmt.Errorf("local: create state dir: %w", err)
}
if err := os.WriteFile(statePath, content, 0644); err != nil {
return storage.ObjectID{}, fmt.Errorf("local: write state: %w", err)
}
// Generate content hash for ID
hash := sha256.Sum256(content)
id := hex.EncodeToString(hash[:])
return storage.ObjectID{
Provider: providerName,
Container: "states",
ID: id,
}, nil
}
// GetState retrieves the state snapshot for a given height.
func (a *Adapter) GetState(ctx context.Context, height uint32) (io.ReadCloser, error) {
a.mu.RLock()
defer a.mu.RUnlock()
statePath := a.statePath(height)
file, err := os.Open(statePath)
if err != nil {
if os.IsNotExist(err) {
return nil, storage.ErrNotFound
}
return nil, fmt.Errorf("local: open state: %w", err)
}
return file, nil
}
// GetLatestState returns the most recent state snapshot.
func (a *Adapter) GetLatestState(ctx context.Context) (uint32, io.ReadCloser, error) {
a.mu.RLock()
defer a.mu.RUnlock()
statesDir := filepath.Join(a.basePath, "states")
var latest uint32
var found bool
err := filepath.Walk(statesDir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return nil // Skip errors
}
if info.IsDir() {
return nil
}
// Parse height from filename
name := info.Name()
if !strings.HasSuffix(name, ".state") {
return nil
}
var height uint32
if _, err := fmt.Sscanf(name, "%d.state", &height); err != nil {
return nil
}
if !found || height > latest {
latest = height
found = true
}
return nil
})
if err != nil {
return 0, nil, fmt.Errorf("local: scan states: %w", err)
}
if !found {
return 0, nil, storage.ErrNotFound
}
// Open the latest state file
statePath := a.statePath(latest)
file, err := os.Open(statePath)
if err != nil {
return 0, nil, fmt.Errorf("local: open latest state: %w", err)
}
return latest, file, nil
}
// objectPath returns the filesystem path for an object.
func (a *Adapter) objectPath(id string) string {
// Use first 2 chars as subdirectory for better filesystem performance
if len(id) >= 2 {
return filepath.Join(a.basePath, "objects", id[:2], id)
}
return filepath.Join(a.basePath, "objects", id)
}
// metaPath returns the filesystem path for object metadata.
func (a *Adapter) metaPath(id string) string {
if len(id) >= 2 {
return filepath.Join(a.basePath, "meta", id[:2], id+".json")
}
return filepath.Join(a.basePath, "meta", id+".json")
}
// blockPath returns the filesystem path for a block by index.
func (a *Adapter) blockPath(index uint32) string {
// Use first 4 digits as subdirectory for better filesystem performance
// e.g., block 12345 -> blocks/0001/12345.block
subdir := fmt.Sprintf("%04d", index/10000)
return filepath.Join(a.basePath, "blocks", subdir, fmt.Sprintf("%d.block", index))
}
// statePath returns the filesystem path for a state snapshot by height.
func (a *Adapter) statePath(height uint32) string {
// Use first 4 digits as subdirectory for better filesystem performance
// e.g., state 12345 -> states/0001/12345.state
subdir := fmt.Sprintf("%04d", height/10000)
return filepath.Join(a.basePath, "states", subdir, fmt.Sprintf("%d.state", height))
}
// objectMeta holds object metadata.
type objectMeta struct {
Size int64 `json:"size"`
ContentType string `json:"content_type,omitempty"`
Created time.Time `json:"created"`
Modified time.Time `json:"modified"`
Checksum string `json:"checksum"`
Attributes map[string]string `json:"attributes,omitempty"`
}
// loadMeta loads object metadata from disk.
func (a *Adapter) loadMeta(id string) (objectMeta, error) {
metaPath := a.metaPath(id)
data, err := os.ReadFile(metaPath)
if err != nil {
return objectMeta{}, err
}
var meta objectMeta
err = json.Unmarshal(data, &meta)
return meta, err
}
// calculateSize returns the total size of all stored data.
func (a *Adapter) calculateSize() (int64, error) {
var total int64
// Calculate size across all storage directories
for _, subdir := range []string{"objects", "blocks", "states"} {
dir := filepath.Join(a.basePath, subdir)
err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return nil
}
if !info.IsDir() {
total += info.Size()
}
return nil
})
if err != nil {
return total, err
}
}
return total, nil
}