307 lines
		
	
	
		
			9.6 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			307 lines
		
	
	
		
			9.6 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package db
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"crypto/md5"
 | |
| 	"fmt"
 | |
| 	"io"
 | |
| 	"os"
 | |
| 	"path/filepath"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/jackc/pgx/v5/pgxpool"
 | |
| )
 | |
| 
 | |
| type User struct {
 | |
| 	ID                int64     `json:"id"`
 | |
| 	Username          string    `json:"username"`
 | |
| 	Email             string    `json:"email"`
 | |
| 	APIKey            string    `json:"apiKey,omitempty"`
 | |
| 	StorageLimitBytes int64     `json:"storageLimitBytes"`
 | |
| 	CreatedAt         time.Time `json:"createdAt"`
 | |
| }
 | |
| 
 | |
| type Bucket struct {
 | |
| 	ID               int64     `json:"id"`
 | |
| 	Name             string    `json:"name"`
 | |
| 	OwnerID          int64     `json:"ownerId"`
 | |
| 	StorageUsedBytes int64     `json:"storageUsedBytes"`
 | |
| 	CreatedAt        time.Time `json:"createdAt"`
 | |
| }
 | |
| 
 | |
| type Object struct {
 | |
| 	ID             int64             `json:"id"`
 | |
| 	BucketID       int64             `json:"bucketId"`
 | |
| 	Key            string            `json:"key"`
 | |
| 	SizeBytes      int64             `json:"sizeBytes"`
 | |
| 	ContentType    string            `json:"contentType"`
 | |
| 	LastModified   time.Time         `json:"lastModified"`
 | |
| 	VersionID      string            `json:"versionId"`
 | |
| 	MD5Checksum    string            `json:"md5Checksum"`
 | |
| 	CustomMetadata map[string]string `json:"customMetadata"`
 | |
| }
 | |
| 
 | |
| type BucketService struct {
 | |
| 	pool *pgxpool.Pool
 | |
| }
 | |
| 
 | |
| func NewBucketService(pool *pgxpool.Pool) *BucketService {
 | |
| 	return &BucketService{pool: pool}
 | |
| }
 | |
| 
 | |
| func (s *BucketService) CreateUser(ctx context.Context, username, email, apiKey string, storageLimit int64) (*User, error) {
 | |
| 	var user User
 | |
| 	err := s.pool.QueryRow(ctx, `
 | |
| 		INSERT INTO users (username, email, api_key, storage_limit_bytes) 
 | |
| 		VALUES ($1, $2, $3, $4) 
 | |
| 		RETURNING id, username, email, storage_limit_bytes, created_at`,
 | |
| 		username, email, apiKey, storageLimit).Scan(
 | |
| 		&user.ID, &user.Username, &user.Email, &user.StorageLimitBytes, &user.CreatedAt)
 | |
| 	if err != nil {
 | |
| 		return nil, fmt.Errorf("failed to create user: %w", err)
 | |
| 	}
 | |
| 	return &user, nil
 | |
| }
 | |
| 
 | |
| func (s *BucketService) GetUserByAPIKey(ctx context.Context, apiKey string) (*User, error) {
 | |
| 	var user User
 | |
| 	err := s.pool.QueryRow(ctx, `
 | |
| 		SELECT id, username, email, api_key, storage_limit_bytes, created_at 
 | |
| 		FROM users WHERE api_key = $1`,
 | |
| 		apiKey).Scan(&user.ID, &user.Username, &user.Email, &user.APIKey, &user.StorageLimitBytes, &user.CreatedAt)
 | |
| 	if err != nil {
 | |
| 		return nil, fmt.Errorf("failed to get user by API key: %w", err)
 | |
| 	}
 | |
| 	return &user, nil
 | |
| }
 | |
| 
 | |
| func (s *BucketService) CreateBucket(ctx context.Context, name string, ownerID int64) (*Bucket, error) {
 | |
| 	var bucket Bucket
 | |
| 	err := s.pool.QueryRow(ctx, `
 | |
| 		INSERT INTO buckets (name, owner_id) 
 | |
| 		VALUES ($1, $2) 
 | |
| 		RETURNING id, name, owner_id, storage_used_bytes, created_at`,
 | |
| 		name, ownerID).Scan(
 | |
| 		&bucket.ID, &bucket.Name, &bucket.OwnerID, &bucket.StorageUsedBytes, &bucket.CreatedAt)
 | |
| 	if err != nil {
 | |
| 		return nil, fmt.Errorf("failed to create bucket: %w", err)
 | |
| 	}
 | |
| 	return &bucket, nil
 | |
| }
 | |
| 
 | |
| func (s *BucketService) GetUserBuckets(ctx context.Context, ownerID int64) ([]Bucket, error) {
 | |
| 	rows, err := s.pool.Query(ctx, `
 | |
| 		SELECT id, name, owner_id, storage_used_bytes, created_at 
 | |
| 		FROM buckets WHERE owner_id = $1 ORDER BY created_at DESC`,
 | |
| 		ownerID)
 | |
| 	if err != nil {
 | |
| 		return nil, fmt.Errorf("failed to get user buckets: %w", err)
 | |
| 	}
 | |
| 	defer rows.Close()
 | |
| 
 | |
| 	var buckets []Bucket
 | |
| 	for rows.Next() {
 | |
| 		var bucket Bucket
 | |
| 		err := rows.Scan(&bucket.ID, &bucket.Name, &bucket.OwnerID, &bucket.StorageUsedBytes, &bucket.CreatedAt)
 | |
| 		if err != nil {
 | |
| 			return nil, fmt.Errorf("failed to scan bucket: %w", err)
 | |
| 		}
 | |
| 		buckets = append(buckets, bucket)
 | |
| 	}
 | |
| 	return buckets, nil
 | |
| }
 | |
| 
 | |
| func (s *BucketService) GetBucket(ctx context.Context, bucketName string, ownerID int64) (*Bucket, error) {
 | |
| 	var bucket Bucket
 | |
| 	err := s.pool.QueryRow(ctx, `
 | |
| 		SELECT id, name, owner_id, storage_used_bytes, created_at 
 | |
| 		FROM buckets WHERE name = $1 AND owner_id = $2`,
 | |
| 		bucketName, ownerID).Scan(
 | |
| 		&bucket.ID, &bucket.Name, &bucket.OwnerID, &bucket.StorageUsedBytes, &bucket.CreatedAt)
 | |
| 	if err != nil {
 | |
| 		return nil, fmt.Errorf("failed to get bucket: %w", err)
 | |
| 	}
 | |
| 	return &bucket, nil
 | |
| }
 | |
| 
 | |
| func (s *BucketService) DeleteBucket(ctx context.Context, bucketName string, ownerID int64) error {
 | |
| 	bucket, err := s.GetBucket(ctx, bucketName, ownerID)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	objects, err := s.ListObjects(ctx, bucket.ID)
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("failed to list objects for deletion: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	for _, obj := range objects {
 | |
| 		if err := os.Remove(filepath.Join("storage", bucketName, obj.Key)); err != nil {
 | |
| 			continue
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	os.RemoveAll(filepath.Join("storage", bucketName))
 | |
| 
 | |
| 	_, err = s.pool.Exec(ctx, "DELETE FROM buckets WHERE name = $1 AND owner_id = $2", bucketName, ownerID)
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("failed to delete bucket: %w", err)
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (s *BucketService) UploadObject(ctx context.Context, bucketID int64, key string, size int64, contentType string, content io.Reader) (*Object, error) {
 | |
| 	bucket, err := s.getBucketByID(ctx, bucketID)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	user, err := s.getUserByID(ctx, bucket.OwnerID)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	if bucket.StorageUsedBytes+size > user.StorageLimitBytes {
 | |
| 		return nil, fmt.Errorf("storage limit exceeded")
 | |
| 	}
 | |
| 
 | |
| 	bucketDir := filepath.Join("storage", bucket.Name)
 | |
| 	if err := os.MkdirAll(bucketDir, 0755); err != nil {
 | |
| 		return nil, fmt.Errorf("failed to create bucket directory: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	filePath := filepath.Join(bucketDir, key)
 | |
| 	file, err := os.Create(filePath)
 | |
| 	if err != nil {
 | |
| 		return nil, fmt.Errorf("failed to create file: %w", err)
 | |
| 	}
 | |
| 	defer file.Close()
 | |
| 
 | |
| 	hasher := md5.New()
 | |
| 	writer := io.MultiWriter(file, hasher)
 | |
| 
 | |
| 	if _, err := io.Copy(writer, content); err != nil {
 | |
| 		os.Remove(filePath)
 | |
| 		return nil, fmt.Errorf("failed to write file: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	md5Sum := fmt.Sprintf("%x", hasher.Sum(nil))
 | |
| 
 | |
| 	var object Object
 | |
| 	err = s.pool.QueryRow(ctx, `
 | |
| 		INSERT INTO objects (bucket_id, key, size_bytes, content_type, md5_checksum) 
 | |
| 		VALUES ($1, $2, $3, $4, $5) 
 | |
| 		ON CONFLICT (bucket_id, key) 
 | |
| 		DO UPDATE SET size_bytes = $3, content_type = $4, md5_checksum = $5, last_modified = NOW()
 | |
| 		RETURNING id, bucket_id, key, size_bytes, content_type, last_modified, md5_checksum`,
 | |
| 		bucketID, key, size, contentType, md5Sum).Scan(
 | |
| 		&object.ID, &object.BucketID, &object.Key, &object.SizeBytes, &object.ContentType, &object.LastModified, &object.MD5Checksum)
 | |
| 	if err != nil {
 | |
| 		os.Remove(filePath)
 | |
| 		return nil, fmt.Errorf("failed to create object record: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	return &object, nil
 | |
| }
 | |
| 
 | |
| func (s *BucketService) ListObjects(ctx context.Context, bucketID int64) ([]Object, error) {
 | |
| 	rows, err := s.pool.Query(ctx, `
 | |
| 		SELECT id, bucket_id, key, size_bytes, content_type, last_modified, 
 | |
| 			   COALESCE(version_id, ''), COALESCE(md5_checksum, '') 
 | |
| 		FROM objects WHERE bucket_id = $1 ORDER BY last_modified DESC`,
 | |
| 		bucketID)
 | |
| 	if err != nil {
 | |
| 		return nil, fmt.Errorf("failed to list objects: %w", err)
 | |
| 	}
 | |
| 	defer rows.Close()
 | |
| 
 | |
| 	var objects []Object
 | |
| 	for rows.Next() {
 | |
| 		var object Object
 | |
| 		err := rows.Scan(&object.ID, &object.BucketID, &object.Key, &object.SizeBytes,
 | |
| 			&object.ContentType, &object.LastModified, &object.VersionID, &object.MD5Checksum)
 | |
| 		if err != nil {
 | |
| 			return nil, fmt.Errorf("failed to scan object: %w", err)
 | |
| 		}
 | |
| 		objects = append(objects, object)
 | |
| 	}
 | |
| 	return objects, nil
 | |
| }
 | |
| 
 | |
| func (s *BucketService) GetObject(ctx context.Context, bucketID int64, key string) (*Object, error) {
 | |
| 	var object Object
 | |
| 	err := s.pool.QueryRow(ctx, `
 | |
| 		SELECT id, bucket_id, key, size_bytes, content_type, last_modified, 
 | |
| 			   COALESCE(version_id, ''), COALESCE(md5_checksum, '') 
 | |
| 		FROM objects WHERE bucket_id = $1 AND key = $2`,
 | |
| 		bucketID, key).Scan(
 | |
| 		&object.ID, &object.BucketID, &object.Key, &object.SizeBytes,
 | |
| 		&object.ContentType, &object.LastModified, &object.VersionID, &object.MD5Checksum)
 | |
| 	if err != nil {
 | |
| 		return nil, fmt.Errorf("failed to get object: %w", err)
 | |
| 	}
 | |
| 	return &object, nil
 | |
| }
 | |
| 
 | |
| func (s *BucketService) DeleteObject(ctx context.Context, bucketID int64, key string) error {
 | |
| 	bucket, err := s.getBucketByID(ctx, bucketID)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	filePath := filepath.Join("storage", bucket.Name, key)
 | |
| 	if err := os.Remove(filePath); err != nil && !os.IsNotExist(err) {
 | |
| 		return fmt.Errorf("failed to remove file: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	_, err = s.pool.Exec(ctx, "DELETE FROM objects WHERE bucket_id = $1 AND key = $2", bucketID, key)
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("failed to delete object record: %w", err)
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (s *BucketService) GetObjectFile(ctx context.Context, bucketName, key string, ownerID int64) (*os.File, error) {
 | |
| 	bucket, err := s.GetBucket(ctx, bucketName, ownerID)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	_, err = s.GetObject(ctx, bucket.ID, key)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	filePath := filepath.Join("storage", bucketName, key)
 | |
| 	file, err := os.Open(filePath)
 | |
| 	if err != nil {
 | |
| 		return nil, fmt.Errorf("failed to open file: %w", err)
 | |
| 	}
 | |
| 	return file, nil
 | |
| }
 | |
| 
 | |
| func (s *BucketService) getBucketByID(ctx context.Context, bucketID int64) (*Bucket, error) {
 | |
| 	var bucket Bucket
 | |
| 	err := s.pool.QueryRow(ctx, `
 | |
| 		SELECT id, name, owner_id, storage_used_bytes, created_at 
 | |
| 		FROM buckets WHERE id = $1`,
 | |
| 		bucketID).Scan(
 | |
| 		&bucket.ID, &bucket.Name, &bucket.OwnerID, &bucket.StorageUsedBytes, &bucket.CreatedAt)
 | |
| 	if err != nil {
 | |
| 		return nil, fmt.Errorf("failed to get bucket by ID: %w", err)
 | |
| 	}
 | |
| 	return &bucket, nil
 | |
| }
 | |
| 
 | |
| func (s *BucketService) getUserByID(ctx context.Context, userID int64) (*User, error) {
 | |
| 	var user User
 | |
| 	err := s.pool.QueryRow(ctx, `
 | |
| 		SELECT id, username, email, storage_limit_bytes, created_at 
 | |
| 		FROM users WHERE id = $1`,
 | |
| 		userID).Scan(&user.ID, &user.Username, &user.Email, &user.StorageLimitBytes, &user.CreatedAt)
 | |
| 	if err != nil {
 | |
| 		return nil, fmt.Errorf("failed to get user by ID: %w", err)
 | |
| 	}
 | |
| 	return &user, nil
 | |
| }
 | 
