tutus-chain/pkg/storage/local/adapter.go

333 lines
8.3 KiB
Go

// Package local provides a local filesystem storage adapter for Tutus.
//
// This adapter is suitable for:
// - Development and testing
// - Single-node deployments
// - NAS or SAN-backed storage
// - Air-gapped government networks
package local
import (
"context"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"os"
"path/filepath"
"strings"
"sync"
"time"
"github.com/tutus-one/tutus-chain/pkg/storage"
)
const providerName = "local"
// Adapter implements the storage.Provider interface for local filesystem.
type Adapter struct {
basePath string
maxSize int64
mu sync.RWMutex
}
// Config holds local adapter configuration.
type Config struct {
// Path is the base directory for storage
Path string
// MaxSize is the maximum storage size in bytes (0 = unlimited)
MaxSize int64
}
// New creates a new local filesystem storage adapter.
func New(cfg Config) (*Adapter, error) {
path := cfg.Path
if path == "" {
path = "./storage"
}
// Ensure base directory exists
if err := os.MkdirAll(path, 0755); err != nil {
return nil, fmt.Errorf("local: create storage directory: %w", err)
}
// Create subdirectories
for _, sub := range []string{"objects", "meta"} {
if err := os.MkdirAll(filepath.Join(path, sub), 0755); err != nil {
return nil, fmt.Errorf("local: create %s directory: %w", sub, err)
}
}
return &Adapter{
basePath: path,
maxSize: cfg.MaxSize,
}, nil
}
// Name returns the provider identifier.
func (a *Adapter) Name() string {
return providerName
}
// Put stores data on the local filesystem.
func (a *Adapter) Put(ctx context.Context, data io.Reader, opts storage.PutOptions) (storage.ObjectID, error) {
a.mu.Lock()
defer a.mu.Unlock()
// Read all data
content, err := io.ReadAll(data)
if err != nil {
return storage.ObjectID{}, fmt.Errorf("local: read data: %w", err)
}
// Generate content-addressed ID
hash := sha256.Sum256(content)
id := hex.EncodeToString(hash[:])
// Check size limits
if a.maxSize > 0 {
currentSize, _ := a.calculateSize()
if currentSize+int64(len(content)) > a.maxSize {
return storage.ObjectID{}, storage.ErrQuotaExceeded
}
}
// Write object data
objPath := a.objectPath(id)
if err := os.MkdirAll(filepath.Dir(objPath), 0755); err != nil {
return storage.ObjectID{}, fmt.Errorf("local: create object dir: %w", err)
}
if err := os.WriteFile(objPath, content, 0644); err != nil {
return storage.ObjectID{}, fmt.Errorf("local: write object: %w", err)
}
// Write metadata
meta := objectMeta{
Size: int64(len(content)),
ContentType: opts.ContentType,
Created: time.Now(),
Modified: time.Now(),
Checksum: id,
Attributes: opts.Attributes,
}
metaPath := a.metaPath(id)
metaData, _ := json.Marshal(meta)
if err := os.WriteFile(metaPath, metaData, 0644); err != nil {
os.Remove(objPath) // Clean up on error
return storage.ObjectID{}, fmt.Errorf("local: write metadata: %w", err)
}
return storage.ObjectID{
Provider: providerName,
Container: "default",
ID: id,
}, nil
}
// Get retrieves data from the local filesystem.
func (a *Adapter) Get(ctx context.Context, id storage.ObjectID) (io.ReadCloser, error) {
a.mu.RLock()
defer a.mu.RUnlock()
objPath := a.objectPath(id.ID)
file, err := os.Open(objPath)
if err != nil {
if os.IsNotExist(err) {
return nil, storage.ErrNotFound
}
return nil, fmt.Errorf("local: open object: %w", err)
}
return file, nil
}
// Delete removes an object from the local filesystem.
func (a *Adapter) Delete(ctx context.Context, id storage.ObjectID) error {
a.mu.Lock()
defer a.mu.Unlock()
objPath := a.objectPath(id.ID)
metaPath := a.metaPath(id.ID)
if _, err := os.Stat(objPath); os.IsNotExist(err) {
return storage.ErrNotFound
}
if err := os.Remove(objPath); err != nil {
return fmt.Errorf("local: remove object: %w", err)
}
os.Remove(metaPath) // Best effort metadata removal
return nil
}
// Exists checks if an object exists on the local filesystem.
func (a *Adapter) Exists(ctx context.Context, id storage.ObjectID) (bool, error) {
a.mu.RLock()
defer a.mu.RUnlock()
objPath := a.objectPath(id.ID)
_, err := os.Stat(objPath)
if err != nil {
if os.IsNotExist(err) {
return false, nil
}
return false, fmt.Errorf("local: stat object: %w", err)
}
return true, nil
}
// List returns objects matching the given prefix.
func (a *Adapter) List(ctx context.Context, prefix string, opts storage.ListOptions) ([]storage.ObjectInfo, error) {
a.mu.RLock()
defer a.mu.RUnlock()
objectsDir := filepath.Join(a.basePath, "objects")
var results []storage.ObjectInfo
err := filepath.Walk(objectsDir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return nil // Skip errors
}
if info.IsDir() {
return nil
}
// Get relative path as ID
rel, _ := filepath.Rel(objectsDir, path)
id := strings.ReplaceAll(rel, string(filepath.Separator), "")
if prefix != "" && !strings.HasPrefix(id, prefix) {
return nil
}
objInfo := storage.ObjectInfo{
ID: storage.ObjectID{
Provider: providerName,
Container: "default",
ID: id,
},
Size: info.Size(),
Modified: info.ModTime(),
}
if opts.IncludeMetadata {
if meta, err := a.loadMeta(id); err == nil {
objInfo.ContentType = meta.ContentType
objInfo.Created = meta.Created
objInfo.Checksum = meta.Checksum
objInfo.Attributes = meta.Attributes
}
}
results = append(results, objInfo)
if opts.MaxResults > 0 && len(results) >= opts.MaxResults {
return filepath.SkipAll
}
return nil
})
if err != nil {
return nil, fmt.Errorf("local: list objects: %w", err)
}
return results, nil
}
// Head retrieves object metadata.
func (a *Adapter) Head(ctx context.Context, id storage.ObjectID) (storage.ObjectInfo, error) {
a.mu.RLock()
defer a.mu.RUnlock()
objPath := a.objectPath(id.ID)
info, err := os.Stat(objPath)
if err != nil {
if os.IsNotExist(err) {
return storage.ObjectInfo{}, storage.ErrNotFound
}
return storage.ObjectInfo{}, fmt.Errorf("local: stat object: %w", err)
}
result := storage.ObjectInfo{
ID: id,
Size: info.Size(),
Modified: info.ModTime(),
}
if meta, err := a.loadMeta(id.ID); err == nil {
result.ContentType = meta.ContentType
result.Created = meta.Created
result.Checksum = meta.Checksum
result.Attributes = meta.Attributes
}
return result, nil
}
// Close is a no-op for local storage.
func (a *Adapter) Close() error {
return nil
}
// objectPath returns the filesystem path for an object.
func (a *Adapter) objectPath(id string) string {
// Use first 2 chars as subdirectory for better filesystem performance
if len(id) >= 2 {
return filepath.Join(a.basePath, "objects", id[:2], id)
}
return filepath.Join(a.basePath, "objects", id)
}
// metaPath returns the filesystem path for object metadata.
func (a *Adapter) metaPath(id string) string {
if len(id) >= 2 {
return filepath.Join(a.basePath, "meta", id[:2], id+".json")
}
return filepath.Join(a.basePath, "meta", id+".json")
}
// objectMeta holds object metadata.
type objectMeta struct {
Size int64 `json:"size"`
ContentType string `json:"content_type,omitempty"`
Created time.Time `json:"created"`
Modified time.Time `json:"modified"`
Checksum string `json:"checksum"`
Attributes map[string]string `json:"attributes,omitempty"`
}
// loadMeta loads object metadata from disk.
func (a *Adapter) loadMeta(id string) (objectMeta, error) {
metaPath := a.metaPath(id)
data, err := os.ReadFile(metaPath)
if err != nil {
return objectMeta{}, err
}
var meta objectMeta
err = json.Unmarshal(data, &meta)
return meta, err
}
// calculateSize returns the total size of stored objects.
func (a *Adapter) calculateSize() (int64, error) {
var total int64
objectsDir := filepath.Join(a.basePath, "objects")
err := filepath.Walk(objectsDir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return nil
}
if !info.IsDir() {
total += info.Size()
}
return nil
})
return total, err
}