Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 84 additions & 2 deletions internal/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,18 +295,24 @@ func (s *Server) handleList(w http.ResponseWriter, r *http.Request) {
defer cancel()
objects, err := client.ListObjects(ctx, prefix)
if err != nil {
// S3 unreachable — return empty list rather than blocking PVE
// S3 unreachable — still include locally cached files rather than blocking PVE
log.Printf("list failed for %s/%s (S3 unreachable?): %v", storageID, content, err)
writeJSON(w, []VolumeInfo{})
var volumes []VolumeInfo
localDir := filepath.Join(s.cfg.CacheDir, storageID, prefix)
s.mergeLocalFiles(&volumes, localDir, prefix, content, storageID, nil)
writeJSON(w, volumes)
return
}

// Build a set of S3 keys for quick lookup
s3Keys := make(map[string]bool, len(objects))
var volumes []VolumeInfo
for _, obj := range objects {
// Skip directory markers
if strings.HasSuffix(obj.Key, "/") {
continue
}
s3Keys[obj.Key] = true
// images volids use "vmid/diskname" format; others use "content/filename"
var volname string
if content == "images" {
Expand All @@ -324,6 +330,13 @@ func (s *Server) handleList(w http.ResponseWriter, r *http.Request) {
volumes = append(volumes, vol)
}

// Include locally cached files not yet uploaded to S3.
// PVE (and Terraform providers) may write files directly to the cache
// directory; the watcher uploads them asynchronously. Without this,
// list_volumes returns empty until the upload completes.
localDir := filepath.Join(s.cfg.CacheDir, storageID, prefix)
s.mergeLocalFiles(&volumes, localDir, prefix, content, storageID, s3Keys)

writeJSON(w, volumes)
}

Expand Down Expand Up @@ -589,6 +602,75 @@ func (s *Server) handleConfig(w http.ResponseWriter, r *http.Request) {
writeJSON(w, map[string]string{"cache_dir": s.cfg.CacheDir})
}

// mergeLocalFiles scans a local cache directory and adds any files that
// exist locally but are not yet in S3 (pending watcher upload) to the
// volume list. This ensures files written directly by PVE (e.g. via
// Terraform or the upload API) appear immediately in list_volumes.
func (s *Server) mergeLocalFiles(volumes *[]VolumeInfo, localDir, prefix, content, storageID string, s3Keys map[string]bool) {
entries, err := os.ReadDir(localDir)
if err != nil {
return // directory may not exist yet
}
for _, entry := range entries {
if entry.IsDir() {
if content == "images" {
if entry.Name() == ".meta" {
continue
}
// Recurse into VM ID subdirectories for images
subDir := filepath.Join(localDir, entry.Name())
subEntries, err := os.ReadDir(subDir)
if err != nil {
continue
}
for _, subEntry := range subEntries {
if subEntry.IsDir() {
continue
}
key := prefix + entry.Name() + "/" + subEntry.Name()
if s3Keys[key] {
continue
}
info, err := subEntry.Info()
if err != nil {
continue
}
volname := entry.Name() + "/" + subEntry.Name()
*volumes = append(*volumes, VolumeInfo{
Volume: fmt.Sprintf("%s:%s", storageID, volname),
Key: key,
Size: info.Size(),
Format: detectFormat(key),
Content: content,
})
}
}
continue
}
// Skip temp and meta files
name := entry.Name()
if strings.HasSuffix(name, ".tmp") || strings.HasSuffix(name, ".meta") {
continue
}
key := prefix + name
if s3Keys[key] {
continue
}
info, err := entry.Info()
if err != nil {
continue
}
volname := content + "/" + name
*volumes = append(*volumes, VolumeInfo{
Volume: fmt.Sprintf("%s:%s", storageID, volname),
Key: key,
Size: info.Size(),
Format: detectFormat(key),
Content: content,
})
}
}

// --- Helpers ---

func clearImmutable(path string) {
Expand Down
111 changes: 111 additions & 0 deletions internal/api/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,117 @@ func TestHandleList_Backups(t *testing.T) {
}
}

func TestHandleList_IncludesLocalPendingFiles(t *testing.T) {
mock := newMockClient("s3test")
// One file already in S3
mock.objects["snippets/existing.yaml"] = mockObject{size: 100, etag: "\"abc\""}
s := newTestServer(t, mock)

// Create a local file that is NOT in S3 (simulates pending watcher upload)
localDir := filepath.Join(s.cfg.CacheDir, "s3test", "snippets")
os.MkdirAll(localDir, 0755)
os.WriteFile(filepath.Join(localDir, "pending.yaml"), []byte("pending data"), 0644)

req := httptest.NewRequest("GET", "/v1/list?storage=s3test&content=snippets", nil)
w := httptest.NewRecorder()
s.handleList(w, req)

if w.Code != 200 {
t.Fatalf("expected 200, got %d", w.Code)
}

var volumes []VolumeInfo
json.NewDecoder(w.Body).Decode(&volumes)

if len(volumes) != 2 {
t.Fatalf("expected 2 volumes (1 S3 + 1 local pending), got %d", len(volumes))
}

// Verify both files are present
found := map[string]bool{}
for _, v := range volumes {
found[v.Volume] = true
}
if !found["s3test:snippets/existing.yaml"] {
t.Error("expected S3 file 's3test:snippets/existing.yaml' in list")
}
if !found["s3test:snippets/pending.yaml"] {
t.Error("expected local pending file 's3test:snippets/pending.yaml' in list")
}
}

func TestHandleList_LocalFileNotDuplicated(t *testing.T) {
mock := newMockClient("s3test")
// File exists in both S3 and local cache
mock.objects["snippets/both.yaml"] = mockObject{size: 100, etag: "\"abc\""}
s := newTestServer(t, mock)

localDir := filepath.Join(s.cfg.CacheDir, "s3test", "snippets")
os.MkdirAll(localDir, 0755)
os.WriteFile(filepath.Join(localDir, "both.yaml"), []byte("data"), 0644)

req := httptest.NewRequest("GET", "/v1/list?storage=s3test&content=snippets", nil)
w := httptest.NewRecorder()
s.handleList(w, req)

var volumes []VolumeInfo
json.NewDecoder(w.Body).Decode(&volumes)

if len(volumes) != 1 {
t.Fatalf("expected 1 volume (no duplicates), got %d", len(volumes))
}
}

func TestHandleList_SkipsTmpFiles(t *testing.T) {
mock := newMockClient("s3test")
s := newTestServer(t, mock)

localDir := filepath.Join(s.cfg.CacheDir, "s3test", "snippets")
os.MkdirAll(localDir, 0755)
os.WriteFile(filepath.Join(localDir, "upload.tmp"), []byte("partial"), 0644)
os.WriteFile(filepath.Join(localDir, "real.yaml"), []byte("data"), 0644)

req := httptest.NewRequest("GET", "/v1/list?storage=s3test&content=snippets", nil)
w := httptest.NewRecorder()
s.handleList(w, req)

var volumes []VolumeInfo
json.NewDecoder(w.Body).Decode(&volumes)

if len(volumes) != 1 {
t.Fatalf("expected 1 volume (.tmp skipped), got %d", len(volumes))
}
if volumes[0].Volume != "s3test:snippets/real.yaml" {
t.Errorf("expected 's3test:snippets/real.yaml', got %q", volumes[0].Volume)
}
}

func TestHandleList_S3Error_ShowsLocalFiles(t *testing.T) {
mock := newMockClient("s3test")
mock.listErr = fmt.Errorf("connection refused")
s := newTestServer(t, mock)

// Local file exists but S3 is unreachable
localDir := filepath.Join(s.cfg.CacheDir, "s3test", "snippets")
os.MkdirAll(localDir, 0755)
os.WriteFile(filepath.Join(localDir, "local.yaml"), []byte("data"), 0644)

req := httptest.NewRequest("GET", "/v1/list?storage=s3test&content=snippets", nil)
w := httptest.NewRecorder()
s.handleList(w, req)

if w.Code != 200 {
t.Fatalf("expected 200, got %d", w.Code)
}

var volumes []VolumeInfo
json.NewDecoder(w.Body).Decode(&volumes)

if len(volumes) != 1 {
t.Fatalf("expected 1 local volume when S3 unreachable, got %d", len(volumes))
}
}

// --- Download handler tests ---

func TestHandleDownload_Fresh(t *testing.T) {
Expand Down
Loading