diff --git a/internal/db/bucket.go b/internal/db/bucket.go new file mode 100644 index 0000000..c096a72 --- /dev/null +++ b/internal/db/bucket.go @@ -0,0 +1,306 @@ +package db + +import ( + "context" + "crypto/md5" + "fmt" + "io" + "os" + "path/filepath" + "time" + + "github.com/jackc/pgx/v5/pgxpool" +) + +type User struct { + ID int64 `json:"id"` + Username string `json:"username"` + Email string `json:"email"` + APIKey string `json:"apiKey,omitempty"` + StorageLimitBytes int64 `json:"storageLimitBytes"` + CreatedAt time.Time `json:"createdAt"` +} + +type Bucket struct { + ID int64 `json:"id"` + Name string `json:"name"` + OwnerID int64 `json:"ownerId"` + StorageUsedBytes int64 `json:"storageUsedBytes"` + CreatedAt time.Time `json:"createdAt"` +} + +type Object struct { + ID int64 `json:"id"` + BucketID int64 `json:"bucketId"` + Key string `json:"key"` + SizeBytes int64 `json:"sizeBytes"` + ContentType string `json:"contentType"` + LastModified time.Time `json:"lastModified"` + VersionID string `json:"versionId"` + MD5Checksum string `json:"md5Checksum"` + CustomMetadata map[string]string `json:"customMetadata"` +} + +type BucketService struct { + pool *pgxpool.Pool +} + +func NewBucketService(pool *pgxpool.Pool) *BucketService { + return &BucketService{pool: pool} +} + +func (s *BucketService) CreateUser(ctx context.Context, username, email, apiKey string, storageLimit int64) (*User, error) { + var user User + err := s.pool.QueryRow(ctx, ` + INSERT INTO users (username, email, api_key, storage_limit_bytes) + VALUES ($1, $2, $3, $4) + RETURNING id, username, email, storage_limit_bytes, created_at`, + username, email, apiKey, storageLimit).Scan( + &user.ID, &user.Username, &user.Email, &user.StorageLimitBytes, &user.CreatedAt) + if err != nil { + return nil, fmt.Errorf("failed to create user: %w", err) + } + return &user, nil +} + +func (s *BucketService) GetUserByAPIKey(ctx context.Context, apiKey string) (*User, error) { + var user User + err := s.pool.QueryRow(ctx, ` + SELECT id, username, email, api_key, storage_limit_bytes, created_at + FROM users WHERE api_key = $1`, + apiKey).Scan(&user.ID, &user.Username, &user.Email, &user.APIKey, &user.StorageLimitBytes, &user.CreatedAt) + if err != nil { + return nil, fmt.Errorf("failed to get user by API key: %w", err) + } + return &user, nil +} + +func (s *BucketService) CreateBucket(ctx context.Context, name string, ownerID int64) (*Bucket, error) { + var bucket Bucket + err := s.pool.QueryRow(ctx, ` + INSERT INTO buckets (name, owner_id) + VALUES ($1, $2) + RETURNING id, name, owner_id, storage_used_bytes, created_at`, + name, ownerID).Scan( + &bucket.ID, &bucket.Name, &bucket.OwnerID, &bucket.StorageUsedBytes, &bucket.CreatedAt) + if err != nil { + return nil, fmt.Errorf("failed to create bucket: %w", err) + } + return &bucket, nil +} + +func (s *BucketService) GetUserBuckets(ctx context.Context, ownerID int64) ([]Bucket, error) { + rows, err := s.pool.Query(ctx, ` + SELECT id, name, owner_id, storage_used_bytes, created_at + FROM buckets WHERE owner_id = $1 ORDER BY created_at DESC`, + ownerID) + if err != nil { + return nil, fmt.Errorf("failed to get user buckets: %w", err) + } + defer rows.Close() + + var buckets []Bucket + for rows.Next() { + var bucket Bucket + err := rows.Scan(&bucket.ID, &bucket.Name, &bucket.OwnerID, &bucket.StorageUsedBytes, &bucket.CreatedAt) + if err != nil { + return nil, fmt.Errorf("failed to scan bucket: %w", err) + } + buckets = append(buckets, bucket) + } + return buckets, nil +} + +func (s *BucketService) GetBucket(ctx context.Context, bucketName string, ownerID int64) (*Bucket, error) { + var bucket Bucket + err := s.pool.QueryRow(ctx, ` + SELECT id, name, owner_id, storage_used_bytes, created_at + FROM buckets WHERE name = $1 AND owner_id = $2`, + bucketName, ownerID).Scan( + &bucket.ID, &bucket.Name, &bucket.OwnerID, &bucket.StorageUsedBytes, &bucket.CreatedAt) + if err != nil { + return nil, fmt.Errorf("failed to get bucket: %w", err) + } + return &bucket, nil +} + +func (s *BucketService) DeleteBucket(ctx context.Context, bucketName string, ownerID int64) error { + bucket, err := s.GetBucket(ctx, bucketName, ownerID) + if err != nil { + return err + } + + objects, err := s.ListObjects(ctx, bucket.ID) + if err != nil { + return fmt.Errorf("failed to list objects for deletion: %w", err) + } + + for _, obj := range objects { + if err := os.Remove(filepath.Join("storage", bucketName, obj.Key)); err != nil { + continue + } + } + + os.RemoveAll(filepath.Join("storage", bucketName)) + + _, err = s.pool.Exec(ctx, "DELETE FROM buckets WHERE name = $1 AND owner_id = $2", bucketName, ownerID) + if err != nil { + return fmt.Errorf("failed to delete bucket: %w", err) + } + return nil +} + +func (s *BucketService) UploadObject(ctx context.Context, bucketID int64, key string, size int64, contentType string, content io.Reader) (*Object, error) { + bucket, err := s.getBucketByID(ctx, bucketID) + if err != nil { + return nil, err + } + + user, err := s.getUserByID(ctx, bucket.OwnerID) + if err != nil { + return nil, err + } + + if bucket.StorageUsedBytes+size > user.StorageLimitBytes { + return nil, fmt.Errorf("storage limit exceeded") + } + + bucketDir := filepath.Join("storage", bucket.Name) + if err := os.MkdirAll(bucketDir, 0755); err != nil { + return nil, fmt.Errorf("failed to create bucket directory: %w", err) + } + + filePath := filepath.Join(bucketDir, key) + file, err := os.Create(filePath) + if err != nil { + return nil, fmt.Errorf("failed to create file: %w", err) + } + defer file.Close() + + hasher := md5.New() + writer := io.MultiWriter(file, hasher) + + if _, err := io.Copy(writer, content); err != nil { + os.Remove(filePath) + return nil, fmt.Errorf("failed to write file: %w", err) + } + + md5Sum := fmt.Sprintf("%x", hasher.Sum(nil)) + + var object Object + err = s.pool.QueryRow(ctx, ` + INSERT INTO objects (bucket_id, key, size_bytes, content_type, md5_checksum) + VALUES ($1, $2, $3, $4, $5) + ON CONFLICT (bucket_id, key) + DO UPDATE SET size_bytes = $3, content_type = $4, md5_checksum = $5, last_modified = NOW() + RETURNING id, bucket_id, key, size_bytes, content_type, last_modified, md5_checksum`, + bucketID, key, size, contentType, md5Sum).Scan( + &object.ID, &object.BucketID, &object.Key, &object.SizeBytes, &object.ContentType, &object.LastModified, &object.MD5Checksum) + if err != nil { + os.Remove(filePath) + return nil, fmt.Errorf("failed to create object record: %w", err) + } + + return &object, nil +} + +func (s *BucketService) ListObjects(ctx context.Context, bucketID int64) ([]Object, error) { + rows, err := s.pool.Query(ctx, ` + SELECT id, bucket_id, key, size_bytes, content_type, last_modified, + COALESCE(version_id, ''), COALESCE(md5_checksum, '') + FROM objects WHERE bucket_id = $1 ORDER BY last_modified DESC`, + bucketID) + if err != nil { + return nil, fmt.Errorf("failed to list objects: %w", err) + } + defer rows.Close() + + var objects []Object + for rows.Next() { + var object Object + err := rows.Scan(&object.ID, &object.BucketID, &object.Key, &object.SizeBytes, + &object.ContentType, &object.LastModified, &object.VersionID, &object.MD5Checksum) + if err != nil { + return nil, fmt.Errorf("failed to scan object: %w", err) + } + objects = append(objects, object) + } + return objects, nil +} + +func (s *BucketService) GetObject(ctx context.Context, bucketID int64, key string) (*Object, error) { + var object Object + err := s.pool.QueryRow(ctx, ` + SELECT id, bucket_id, key, size_bytes, content_type, last_modified, + COALESCE(version_id, ''), COALESCE(md5_checksum, '') + FROM objects WHERE bucket_id = $1 AND key = $2`, + bucketID, key).Scan( + &object.ID, &object.BucketID, &object.Key, &object.SizeBytes, + &object.ContentType, &object.LastModified, &object.VersionID, &object.MD5Checksum) + if err != nil { + return nil, fmt.Errorf("failed to get object: %w", err) + } + return &object, nil +} + +func (s *BucketService) DeleteObject(ctx context.Context, bucketID int64, key string) error { + bucket, err := s.getBucketByID(ctx, bucketID) + if err != nil { + return err + } + + filePath := filepath.Join("storage", bucket.Name, key) + if err := os.Remove(filePath); err != nil && !os.IsNotExist(err) { + return fmt.Errorf("failed to remove file: %w", err) + } + + _, err = s.pool.Exec(ctx, "DELETE FROM objects WHERE bucket_id = $1 AND key = $2", bucketID, key) + if err != nil { + return fmt.Errorf("failed to delete object record: %w", err) + } + return nil +} + +func (s *BucketService) GetObjectFile(ctx context.Context, bucketName, key string, ownerID int64) (*os.File, error) { + bucket, err := s.GetBucket(ctx, bucketName, ownerID) + if err != nil { + return nil, err + } + + _, err = s.GetObject(ctx, bucket.ID, key) + if err != nil { + return nil, err + } + + filePath := filepath.Join("storage", bucketName, key) + file, err := os.Open(filePath) + if err != nil { + return nil, fmt.Errorf("failed to open file: %w", err) + } + return file, nil +} + +func (s *BucketService) getBucketByID(ctx context.Context, bucketID int64) (*Bucket, error) { + var bucket Bucket + err := s.pool.QueryRow(ctx, ` + SELECT id, name, owner_id, storage_used_bytes, created_at + FROM buckets WHERE id = $1`, + bucketID).Scan( + &bucket.ID, &bucket.Name, &bucket.OwnerID, &bucket.StorageUsedBytes, &bucket.CreatedAt) + if err != nil { + return nil, fmt.Errorf("failed to get bucket by ID: %w", err) + } + return &bucket, nil +} + +func (s *BucketService) getUserByID(ctx context.Context, userID int64) (*User, error) { + var user User + err := s.pool.QueryRow(ctx, ` + SELECT id, username, email, storage_limit_bytes, created_at + FROM users WHERE id = $1`, + userID).Scan(&user.ID, &user.Username, &user.Email, &user.StorageLimitBytes, &user.CreatedAt) + if err != nil { + return nil, fmt.Errorf("failed to get user by ID: %w", err) + } + return &user, nil +}