314 lines
9.8 KiB
Go
314 lines
9.8 KiB
Go
package db
|
|
|
|
import (
|
|
"context"
|
|
"crypto/md5"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"path/filepath"
|
|
"time"
|
|
|
|
"git.keircn.com/keiran/termcloud/internal/config"
|
|
"github.com/jackc/pgx/v5/pgxpool"
|
|
)
|
|
|
|
type User struct {
|
|
ID int64 `json:"id"`
|
|
Username string `json:"username"`
|
|
Email string `json:"email"`
|
|
APIKey string `json:"apiKey,omitempty"`
|
|
StorageLimitBytes int64 `json:"storageLimitBytes"`
|
|
CreatedAt time.Time `json:"createdAt"`
|
|
}
|
|
|
|
type Bucket struct {
|
|
ID int64 `json:"id"`
|
|
Name string `json:"name"`
|
|
OwnerID int64 `json:"ownerId"`
|
|
StorageUsedBytes int64 `json:"storageUsedBytes"`
|
|
CreatedAt time.Time `json:"createdAt"`
|
|
}
|
|
|
|
type Object struct {
|
|
ID int64 `json:"id"`
|
|
BucketID int64 `json:"bucketId"`
|
|
Key string `json:"key"`
|
|
SizeBytes int64 `json:"sizeBytes"`
|
|
ContentType string `json:"contentType"`
|
|
LastModified time.Time `json:"lastModified"`
|
|
VersionID string `json:"versionId"`
|
|
MD5Checksum string `json:"md5Checksum"`
|
|
CustomMetadata map[string]string `json:"customMetadata"`
|
|
}
|
|
|
|
type BucketService struct {
|
|
pool *pgxpool.Pool
|
|
storageDir string
|
|
accountService *AccountService
|
|
}
|
|
|
|
func NewBucketService(pool *pgxpool.Pool, cfg *config.Config) *BucketService {
|
|
return &BucketService{
|
|
pool: pool,
|
|
storageDir: cfg.StorageDir,
|
|
accountService: NewAccountService(pool, cfg),
|
|
}
|
|
}
|
|
|
|
func (s *BucketService) CreateUser(ctx context.Context, username, email, apiKey string, storageLimit int64) (*User, error) {
|
|
var user User
|
|
err := s.pool.QueryRow(ctx, `
|
|
INSERT INTO users (username, email, api_key, storage_limit_bytes)
|
|
VALUES ($1, $2, $3, $4)
|
|
RETURNING id, username, email, storage_limit_bytes, created_at`,
|
|
username, email, apiKey, storageLimit).Scan(
|
|
&user.ID, &user.Username, &user.Email, &user.StorageLimitBytes, &user.CreatedAt)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create user: %w", err)
|
|
}
|
|
return &user, nil
|
|
}
|
|
|
|
func (s *BucketService) GetUserByAPIKey(ctx context.Context, apiKey string) (*User, error) {
|
|
var user User
|
|
err := s.pool.QueryRow(ctx, `
|
|
SELECT id, username, email, api_key, storage_limit_bytes, created_at
|
|
FROM users WHERE api_key = $1`,
|
|
apiKey).Scan(&user.ID, &user.Username, &user.Email, &user.APIKey, &user.StorageLimitBytes, &user.CreatedAt)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get user by API key: %w", err)
|
|
}
|
|
return &user, nil
|
|
}
|
|
|
|
func (s *BucketService) CreateBucket(ctx context.Context, name string, ownerID int64) (*Bucket, error) {
|
|
var bucket Bucket
|
|
err := s.pool.QueryRow(ctx, `
|
|
INSERT INTO buckets (name, owner_id)
|
|
VALUES ($1, $2)
|
|
RETURNING id, name, owner_id, storage_used_bytes, created_at`,
|
|
name, ownerID).Scan(
|
|
&bucket.ID, &bucket.Name, &bucket.OwnerID, &bucket.StorageUsedBytes, &bucket.CreatedAt)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create bucket: %w", err)
|
|
}
|
|
return &bucket, nil
|
|
}
|
|
|
|
func (s *BucketService) GetUserBuckets(ctx context.Context, ownerID int64) ([]Bucket, error) {
|
|
rows, err := s.pool.Query(ctx, `
|
|
SELECT id, name, owner_id, storage_used_bytes, created_at
|
|
FROM buckets WHERE owner_id = $1 ORDER BY created_at DESC`,
|
|
ownerID)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get user buckets: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
var buckets []Bucket
|
|
for rows.Next() {
|
|
var bucket Bucket
|
|
err := rows.Scan(&bucket.ID, &bucket.Name, &bucket.OwnerID, &bucket.StorageUsedBytes, &bucket.CreatedAt)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to scan bucket: %w", err)
|
|
}
|
|
buckets = append(buckets, bucket)
|
|
}
|
|
return buckets, nil
|
|
}
|
|
|
|
func (s *BucketService) GetBucket(ctx context.Context, bucketName string, ownerID int64) (*Bucket, error) {
|
|
var bucket Bucket
|
|
err := s.pool.QueryRow(ctx, `
|
|
SELECT id, name, owner_id, storage_used_bytes, created_at
|
|
FROM buckets WHERE name = $1 AND owner_id = $2`,
|
|
bucketName, ownerID).Scan(
|
|
&bucket.ID, &bucket.Name, &bucket.OwnerID, &bucket.StorageUsedBytes, &bucket.CreatedAt)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get bucket: %w", err)
|
|
}
|
|
return &bucket, nil
|
|
}
|
|
|
|
func (s *BucketService) DeleteBucket(ctx context.Context, bucketName string, ownerID int64) error {
|
|
bucket, err := s.GetBucket(ctx, bucketName, ownerID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
objects, err := s.ListObjects(ctx, bucket.ID)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to list objects for deletion: %w", err)
|
|
}
|
|
|
|
for _, obj := range objects {
|
|
if err := os.Remove(filepath.Join(s.storageDir, bucketName, obj.Key)); err != nil {
|
|
continue
|
|
}
|
|
}
|
|
|
|
os.RemoveAll(filepath.Join(s.storageDir, bucketName))
|
|
|
|
_, err = s.pool.Exec(ctx, "DELETE FROM buckets WHERE name = $1 AND owner_id = $2", bucketName, ownerID)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to delete bucket: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *BucketService) UploadObject(ctx context.Context, bucketID int64, key string, size int64, contentType string, content io.Reader) (*Object, error) {
|
|
bucket, err := s.getBucketByID(ctx, bucketID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
user, err := s.getUserByID(ctx, bucket.OwnerID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if bucket.StorageUsedBytes+size > user.StorageLimitBytes {
|
|
return nil, fmt.Errorf("storage limit exceeded")
|
|
}
|
|
|
|
bucketDir := filepath.Join(s.storageDir, bucket.Name)
|
|
if err := os.MkdirAll(bucketDir, 0755); err != nil {
|
|
return nil, fmt.Errorf("failed to create bucket directory: %w", err)
|
|
}
|
|
|
|
filePath := filepath.Join(bucketDir, key)
|
|
file, err := os.Create(filePath)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create file: %w", err)
|
|
}
|
|
defer file.Close()
|
|
|
|
hasher := md5.New()
|
|
writer := io.MultiWriter(file, hasher)
|
|
|
|
if _, err := io.Copy(writer, content); err != nil {
|
|
os.Remove(filePath)
|
|
return nil, fmt.Errorf("failed to write file: %w", err)
|
|
}
|
|
|
|
md5Sum := fmt.Sprintf("%x", hasher.Sum(nil))
|
|
|
|
var object Object
|
|
err = s.pool.QueryRow(ctx, `
|
|
INSERT INTO objects (bucket_id, key, size_bytes, content_type, md5_checksum)
|
|
VALUES ($1, $2, $3, $4, $5)
|
|
ON CONFLICT (bucket_id, key)
|
|
DO UPDATE SET size_bytes = $3, content_type = $4, md5_checksum = $5, last_modified = NOW()
|
|
RETURNING id, bucket_id, key, size_bytes, content_type, last_modified, md5_checksum`,
|
|
bucketID, key, size, contentType, md5Sum).Scan(
|
|
&object.ID, &object.BucketID, &object.Key, &object.SizeBytes, &object.ContentType, &object.LastModified, &object.MD5Checksum)
|
|
if err != nil {
|
|
os.Remove(filePath)
|
|
return nil, fmt.Errorf("failed to create object record: %w", err)
|
|
}
|
|
|
|
return &object, nil
|
|
}
|
|
|
|
func (s *BucketService) ListObjects(ctx context.Context, bucketID int64) ([]Object, error) {
|
|
rows, err := s.pool.Query(ctx, `
|
|
SELECT id, bucket_id, key, size_bytes, content_type, last_modified,
|
|
COALESCE(version_id, ''), COALESCE(md5_checksum, '')
|
|
FROM objects WHERE bucket_id = $1 ORDER BY last_modified DESC`,
|
|
bucketID)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to list objects: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
var objects []Object
|
|
for rows.Next() {
|
|
var object Object
|
|
err := rows.Scan(&object.ID, &object.BucketID, &object.Key, &object.SizeBytes,
|
|
&object.ContentType, &object.LastModified, &object.VersionID, &object.MD5Checksum)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to scan object: %w", err)
|
|
}
|
|
objects = append(objects, object)
|
|
}
|
|
return objects, nil
|
|
}
|
|
|
|
func (s *BucketService) GetObject(ctx context.Context, bucketID int64, key string) (*Object, error) {
|
|
var object Object
|
|
err := s.pool.QueryRow(ctx, `
|
|
SELECT id, bucket_id, key, size_bytes, content_type, last_modified,
|
|
COALESCE(version_id, ''), COALESCE(md5_checksum, '')
|
|
FROM objects WHERE bucket_id = $1 AND key = $2`,
|
|
bucketID, key).Scan(
|
|
&object.ID, &object.BucketID, &object.Key, &object.SizeBytes,
|
|
&object.ContentType, &object.LastModified, &object.VersionID, &object.MD5Checksum)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get object: %w", err)
|
|
}
|
|
return &object, nil
|
|
}
|
|
|
|
func (s *BucketService) DeleteObject(ctx context.Context, bucketID int64, key string) error {
|
|
bucket, err := s.getBucketByID(ctx, bucketID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
filePath := filepath.Join(s.storageDir, bucket.Name, key)
|
|
if err := os.Remove(filePath); err != nil && !os.IsNotExist(err) {
|
|
return fmt.Errorf("failed to remove file: %w", err)
|
|
}
|
|
|
|
_, err = s.pool.Exec(ctx, "DELETE FROM objects WHERE bucket_id = $1 AND key = $2", bucketID, key)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to delete object record: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *BucketService) GetObjectFile(ctx context.Context, bucketName, key string, ownerID int64) (*os.File, error) {
|
|
bucket, err := s.GetBucket(ctx, bucketName, ownerID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
_, err = s.GetObject(ctx, bucket.ID, key)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
filePath := filepath.Join(s.storageDir, bucketName, key)
|
|
file, err := os.Open(filePath)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to open file: %w", err)
|
|
}
|
|
return file, nil
|
|
}
|
|
|
|
func (s *BucketService) getBucketByID(ctx context.Context, bucketID int64) (*Bucket, error) {
|
|
var bucket Bucket
|
|
err := s.pool.QueryRow(ctx, `
|
|
SELECT id, name, owner_id, storage_used_bytes, created_at
|
|
FROM buckets WHERE id = $1`,
|
|
bucketID).Scan(
|
|
&bucket.ID, &bucket.Name, &bucket.OwnerID, &bucket.StorageUsedBytes, &bucket.CreatedAt)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get bucket by ID: %w", err)
|
|
}
|
|
return &bucket, nil
|
|
}
|
|
|
|
func (s *BucketService) getUserByID(ctx context.Context, userID int64) (*User, error) {
|
|
var user User
|
|
err := s.pool.QueryRow(ctx, `
|
|
SELECT id, username, email, storage_limit_bytes, created_at
|
|
FROM users WHERE id = $1`,
|
|
userID).Scan(&user.ID, &user.Username, &user.Email, &user.StorageLimitBytes, &user.CreatedAt)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get user by ID: %w", err)
|
|
}
|
|
return &user, nil
|
|
}
|