diff --git a/go.mod b/go.mod index c4bc98254..3edab3bb9 100644 --- a/go.mod +++ b/go.mod @@ -30,6 +30,7 @@ require ( github.com/couchbase/moss v0.2.0 github.com/spf13/cobra v1.8.1 go.etcd.io/bbolt v1.4.0 + golang.org/x/sys v0.29.0 golang.org/x/text v0.8.0 google.golang.org/protobuf v1.36.6 ) @@ -42,5 +43,10 @@ require ( github.com/json-iterator/go v0.0.0-20171115153421-f7279a603ede // indirect github.com/mschoch/smat v0.2.0 // indirect github.com/spf13/pflag v1.0.6 // indirect - golang.org/x/sys v0.29.0 // indirect ) + +// Use bleve_index_api branch with VFS support +replace github.com/blevesearch/bleve_index_api => github.com/ajroetker/bleve_index_api v0.0.0-20251111010750-7b3692d79f01 + +// Use VFS-enabled zapx from ajroetker fork +replace github.com/blevesearch/zapx/v16 => github.com/ajroetker/zapx/v16 v16.0.0-20251111234330-70822381ed85 diff --git a/go.sum b/go.sum index b46bebcef..017fd8000 100644 --- a/go.sum +++ b/go.sum @@ -1,10 +1,12 @@ github.com/RoaringBitmap/roaring/v2 v2.4.5 h1:uGrrMreGjvAtTBobc0g5IrW1D5ldxDQYe2JW2gggRdg= github.com/RoaringBitmap/roaring/v2 v2.4.5/go.mod h1:FiJcsfkGje/nZBZgCu0ZxCPOKD/hVXDS2dXi7/eUFE0= +github.com/ajroetker/bleve_index_api v0.0.0-20251111010750-7b3692d79f01 h1:SbGoS4vY5GDtDwxKy4iT+s0LsEBQMKd/FNRwTCnWssI= +github.com/ajroetker/bleve_index_api v0.0.0-20251111010750-7b3692d79f01/go.mod h1:rKQDl4u51uwafZxFrPD1R7xFOwKnzZW7s/LSeK4lgo0= +github.com/ajroetker/zapx/v16 v16.0.0-20251111234330-70822381ed85 h1:pY9/pLIPBX6JMYI9DzultU8yybMBVaqzN8ceU/dQurQ= +github.com/ajroetker/zapx/v16 v16.0.0-20251111234330-70822381ed85/go.mod h1:7y0yPdM9JLW29eRvgtmgaxujU4t0CUfzo1sFvP1lLss= github.com/bits-and-blooms/bitset v1.12.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8= github.com/bits-and-blooms/bitset v1.22.0 h1:Tquv9S8+SGaS3EhyA+up3FXzmkhxPGjQQCkcs2uw7w4= github.com/bits-and-blooms/bitset v1.22.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8= -github.com/blevesearch/bleve_index_api v1.2.11 h1:bXQ54kVuwP8hdrXUSOnvTQfgK0KI1+f9A0ITJT8tX1s= -github.com/blevesearch/bleve_index_api v1.2.11/go.mod h1:rKQDl4u51uwafZxFrPD1R7xFOwKnzZW7s/LSeK4lgo0= github.com/blevesearch/geo v0.2.4 h1:ECIGQhw+QALCZaDcogRTNSJYQXRtC8/m8IKiA706cqk= github.com/blevesearch/geo v0.2.4/go.mod h1:K56Q33AzXt2YExVHGObtmRSFYZKYGv0JEN5mdacJJR8= github.com/blevesearch/go-faiss v1.0.26 h1:4dRLolFgjPyjkaXwff4NfbZFdE/dfywbzDqporeQvXI= @@ -44,8 +46,6 @@ github.com/blevesearch/zapx/v14 v14.4.2 h1:2SGHakVKd+TrtEqpfeq8X+So5PShQ5nW6GNxT github.com/blevesearch/zapx/v14 v14.4.2/go.mod h1:rz0XNb/OZSMjNorufDGSpFpjoFKhXmppH9Hi7a877D8= github.com/blevesearch/zapx/v15 v15.4.2 h1:sWxpDE0QQOTjyxYbAVjt3+0ieu8NCE0fDRaFxEsp31k= github.com/blevesearch/zapx/v15 v15.4.2/go.mod h1:1pssev/59FsuWcgSnTa0OeEpOzmhtmr/0/11H0Z8+Nw= -github.com/blevesearch/zapx/v16 v16.2.7 h1:xcgFRa7f/tQXOwApVq7JWgPYSlzyUMmkuYa54tMDuR0= -github.com/blevesearch/zapx/v16 v16.2.7/go.mod h1:murSoCJPCk25MqURrcJaBQ1RekuqSCSfMjXH4rHyA14= github.com/couchbase/ghistogram v0.1.0 h1:b95QcQTCzjTUocDXp/uMgSNQi8oj1tGwnJ4bODWZnps= github.com/couchbase/ghistogram v0.1.0/go.mod h1:s1Jhy76zqfEecpNWJfWUiKZookAFaiGOEoyzgHt9i7k= github.com/couchbase/moss v0.2.0 h1:VCYrMzFwEryyhRSeI+/b3tRBSeTpi/8gn5Kf6dxqn+o= diff --git a/index/scorch/merge.go b/index/scorch/merge.go index 9abcf2db6..c64315740 100644 --- a/index/scorch/merge.go +++ b/index/scorch/merge.go @@ -310,6 +310,13 @@ func (s *Scorch) planMergeAtSnapshot(ctx context.Context, go cw.listen() for _, task := range resultMergePlan.Tasks { + // Check if context was cancelled before starting next task + select { + case <-cw.cancelCh: + return segment.ErrClosed + default: + } + if len(task.Segments) == 0 { atomic.AddUint64(&s.stats.TotFileMergePlanTasksSegmentsEmpty, 1) continue @@ -354,14 +361,26 @@ func (s *Scorch) planMergeAtSnapshot(ctx context.Context, if len(segmentsToMerge) > 0 { filename = zapFileName(newSegmentID) s.markIneligibleForRemoval(filename) - path := s.path + string(os.PathSeparator) + filename fileMergeZapStartTime := time.Now() atomic.AddUint64(&s.stats.TotFileMergeZapBeg, 1) prevBytesReadTotal := cumulateBytesRead(segmentsToMerge) - newDocNums, _, err := s.segPlugin.Merge(segmentsToMerge, docsToDrop, path, - cw.cancelCh, s) + + // Try VFS-aware plugin first, fall back to legacy path-based + var newDocNums [][]uint64 + var err error + if vfsPlugin, ok := s.segPlugin.(SegmentPluginVFS); ok && s.vfsDir != nil { + // Use VFS-aware merge with relative filename + newDocNums, _, err = vfsPlugin.MergeVFS(s.vfsDir, filename, + segmentsToMerge, docsToDrop, cw.cancelCh, s) + } else { + // Legacy path-based merge + path := s.path + string(os.PathSeparator) + filename + newDocNums, _, err = s.segPlugin.Merge(segmentsToMerge, docsToDrop, path, + cw.cancelCh, s) + } + atomic.AddUint64(&s.stats.TotFileMergeZapEnd, 1) fileMergeZapTime := uint64(time.Since(fileMergeZapStartTime)) @@ -379,7 +398,15 @@ func (s *Scorch) planMergeAtSnapshot(ctx context.Context, return fmt.Errorf("merging failed: %v", err) } - seg, err = s.segPlugin.Open(path) + // Open the newly merged segment + if vfsPlugin, ok := s.segPlugin.(SegmentPluginVFS); ok && s.vfsDir != nil { + // Use VFS-aware open with relative filename + seg, err = vfsPlugin.OpenVFS(s.vfsDir, filename) + } else { + // Legacy path-based open + path := s.path + string(os.PathSeparator) + filename + seg, err = s.segPlugin.Open(path) + } if err != nil { s.unmarkIneligibleForRemoval(filename) atomic.AddUint64(&s.stats.TotFileMergePlanTasksErr, 1) @@ -523,12 +550,23 @@ func (s *Scorch) mergeAndPersistInMemorySegments(snapshot *IndexSnapshot, defer wg.Done() newSegmentID := atomic.AddUint64(&s.nextSegmentID, 1) filename := zapFileName(newSegmentID) - path := s.path + string(os.PathSeparator) + filename // the newly merged segment is already flushed out to disk, just needs // to be opened using mmap. - newDocIDs, _, err := - s.segPlugin.Merge(segsBatch, dropsBatch, path, s.closeCh, s) + var newDocIDs [][]uint64 + var err error + + // Try VFS-aware plugin first, fall back to legacy path-based + if vfsPlugin, ok := s.segPlugin.(SegmentPluginVFS); ok && s.vfsDir != nil { + // Use VFS-aware merge with relative filename + newDocIDs, _, err = vfsPlugin.MergeVFS(s.vfsDir, filename, + segsBatch, dropsBatch, s.closeCh, s) + } else { + // Legacy path-based merge + path := s.path + string(os.PathSeparator) + filename + newDocIDs, _, err = s.segPlugin.Merge(segsBatch, dropsBatch, path, s.closeCh, s) + } + if err != nil { em.Lock() errs = append(errs, err) @@ -536,6 +574,7 @@ func (s *Scorch) mergeAndPersistInMemorySegments(snapshot *IndexSnapshot, atomic.AddUint64(&s.stats.TotMemMergeErr, 1) return } + // to prevent accidental cleanup of this newly created file, mark it // as ineligible for removal. this will be flipped back when the bolt // is updated - which is valid, since the snapshot updated in bolt is @@ -543,7 +582,16 @@ func (s *Scorch) mergeAndPersistInMemorySegments(snapshot *IndexSnapshot, s.markIneligibleForRemoval(filename) newMergedSegmentIDs[id] = newSegmentID newDocIDsSet[id] = newDocIDs - newMergedSegments[id], err = s.segPlugin.Open(path) + + // Open the newly merged segment + if vfsPlugin, ok := s.segPlugin.(SegmentPluginVFS); ok && s.vfsDir != nil { + // Use VFS-aware open with relative filename + newMergedSegments[id], err = vfsPlugin.OpenVFS(s.vfsDir, filename) + } else { + // Legacy path-based open + path := s.path + string(os.PathSeparator) + filename + newMergedSegments[id], err = s.segPlugin.Open(path) + } if err != nil { em.Lock() errs = append(errs, err) diff --git a/index/scorch/persister.go b/index/scorch/persister.go index d92c3a85b..ab69177bc 100644 --- a/index/scorch/persister.go +++ b/index/scorch/persister.go @@ -34,6 +34,7 @@ import ( "github.com/RoaringBitmap/roaring/v2" "github.com/blevesearch/bleve/v2/util" index "github.com/blevesearch/bleve_index_api" + "github.com/blevesearch/bleve_index_api/vfs" segment "github.com/blevesearch/scorch_segment_api/v2" bolt "go.etcd.io/bbolt" ) @@ -554,59 +555,103 @@ func (s *Scorch) persistSnapshotMaybeMerge(snapshot *IndexSnapshot, po *persiste return true, nil } -func copyToDirectory(srcPath string, d index.Directory) (int64, error) { - if d == nil { - return 0, nil - } - - dest, err := d.GetWriter(filepath.Join("store", filepath.Base(srcPath))) +func copyBetweenDirectories(srcName string, srcDir vfs.Directory, dstDir vfs.Directory) error { + // Open source file from VFS + source, err := srcDir.Open(srcName) if err != nil { - return 0, fmt.Errorf("GetWriter err: %v", err) + return fmt.Errorf("VFS source open %s: %w", srcName, err) } + defer source.Close() - sourceFileStat, err := os.Stat(srcPath) + // Create destination file in VFS + dest, err := dstDir.Create(srcName) if err != nil { - return 0, err + return fmt.Errorf("VFS dest create %s: %w", srcName, err) } + defer dest.Close() - if !sourceFileStat.Mode().IsRegular() { - return 0, fmt.Errorf("%s is not a regular file", srcPath) + // Use 1MB buffer for better performance with remote storage + buf := make([]byte, 1024*1024) + if _, err := io.CopyBuffer(dest, source, buf); err != nil { + return fmt.Errorf("VFS copy %s: %w", srcName, err) } - source, err := os.Open(srcPath) - if err != nil { - return 0, err + // Critical: Sync destination for durability + if err := dest.Sync(); err != nil { + return fmt.Errorf("VFS dest sync %s: %w", srcName, err) } - defer source.Close() - defer dest.Close() - return io.Copy(dest, source) + + return nil } -func persistToDirectory(seg segment.UnpersistedSegment, d index.Directory, - path string, +func persistToDirectory(seg segment.UnpersistedSegment, d vfs.Directory, + name string, ) error { - if d == nil { - return seg.Persist(path) - } - + // Segments must implement io.WriterTo for VFS persistence sg, ok := seg.(io.WriterTo) if !ok { - return fmt.Errorf("no io.WriterTo segment implementation found") + return fmt.Errorf("segment doesn't implement io.WriterTo for VFS persistence") } - w, err := d.GetWriter(filepath.Join("store", filepath.Base(path))) + w, err := d.Create(name) if err != nil { - return err + return fmt.Errorf("VFS create %s: %w", name, err) } + defer w.Close() - _, err = sg.WriteTo(w) - w.Close() + n, err := sg.WriteTo(w) + if err != nil { + return fmt.Errorf("segment write to %s: %w", name, err) + } + if n == 0 { + return fmt.Errorf("segment write to %s produced 0 bytes", name) + } - return err + // Critical: Sync before close for durability + if err := w.Sync(); err != nil { + return fmt.Errorf("VFS sync %s: %w", name, err) + } + + return nil +} + +// copySegmentFile copies a segment file from source VFS to destination VFS. +func copySegmentFile(src, dst vfs.Directory, filename string) error { + return copySegmentFileWithPath(src, dst, filename, filename) +} + +// copySegmentFileWithPath copies a segment file from source VFS to destination VFS, +// allowing different paths for source and destination. +func copySegmentFileWithPath(src, dst vfs.Directory, srcFilename, dstFilename string) error { + // Open source file for reading + r, err := src.Open(srcFilename) + if err != nil { + return fmt.Errorf("failed to open source file %s: %w", srcFilename, err) + } + defer r.Close() + + // Create destination file for writing + w, err := dst.Create(dstFilename) + if err != nil { + return fmt.Errorf("failed to create dest file %s: %w", dstFilename, err) + } + defer w.Close() + + // Copy the file contents + if _, err := io.Copy(w, r); err != nil { + return fmt.Errorf("failed to copy data from %s to %s: %w", srcFilename, dstFilename, err) + } + + // Sync to ensure durability + if err := w.Sync(); err != nil { + return fmt.Errorf("failed to sync dest file %s: %w", dstFilename, err) + } + + return nil } func prepareBoltSnapshot(snapshot *IndexSnapshot, tx *bolt.Tx, path string, - segPlugin SegmentPlugin, exclude map[uint64]struct{}, d index.Directory) ( + segPlugin SegmentPlugin, exclude map[uint64]struct{}, d vfs.Directory) ( []string, map[uint64]string, error) { snapshotsBucket, err := tx.CreateBucketIfNotExists(util.BoltSnapshotsBucket) if err != nil { @@ -683,12 +728,24 @@ func prepareBoltSnapshot(snapshot *IndexSnapshot, tx *bolt.Tx, path string, } switch seg := segmentSnapshot.segment.(type) { case segment.PersistedSegment: + // Persisted segments are already in the source VFS directory segPath := seg.Path() - _, err = copyToDirectory(segPath, d) - if err != nil { - return nil, nil, fmt.Errorf("segment: %s copy err: %v", segPath, err) - } filename := filepath.Base(segPath) + + // If destination VFS is provided and different from source, copy the segment file + if d != nil && snapshot.parent != nil && snapshot.parent.vfsDir != nil && d != snapshot.parent.vfsDir { + // Source and destination are different - copy the segment file + // If path is specified (e.g. "store"), prepend it to the filename for destination + destFilename := filename + if path != "" { + destFilename = filepath.Join(path, filename) + } + err := copySegmentFileWithPath(snapshot.parent.vfsDir, d, filename, destFilename) + if err != nil { + return nil, nil, fmt.Errorf("failed to copy segment %s: %w", filename, err) + } + } + err = snapshotSegmentBucket.Put(util.BoltPathKey, []byte(filename)) if err != nil { return nil, nil, err @@ -699,12 +756,21 @@ func prepareBoltSnapshot(snapshot *IndexSnapshot, tx *bolt.Tx, path string, // restricts which in-memory segment to be persisted to disk) if _, ok := exclude[segmentSnapshot.id]; !ok { filename := zapFileName(segmentSnapshot.id) - path := filepath.Join(path, filename) - err := persistToDirectory(seg, d, path) + + // VFS is now mandatory for segment persistence + if d == nil { + return nil, nil, fmt.Errorf("VFS directory required for segment persistence") + } + + err := persistToDirectory(seg, d, filename) if err != nil { - return nil, nil, fmt.Errorf("segment: %s persist err: %v", path, err) + return nil, nil, fmt.Errorf("segment %s persist err: %v", filename, err) } - newSegmentPaths[segmentSnapshot.id] = path + + // Store the full path for backwards compatibility (used by segment opening code) + fullPath := filepath.Join(path, filename) + newSegmentPaths[segmentSnapshot.id] = fullPath + err = snapshotSegmentBucket.Put(util.BoltPathKey, []byte(filename)) if err != nil { return nil, nil, err @@ -768,7 +834,7 @@ func (s *Scorch) persistSnapshotDirect(snapshot *IndexSnapshot, exclude map[uint } }() - filenames, newSegmentPaths, err := prepareBoltSnapshot(snapshot, tx, s.path, s.segPlugin, exclude, nil) + filenames, newSegmentPaths, err := prepareBoltSnapshot(snapshot, tx, s.path, s.segPlugin, exclude, s.vfsDir) if err != nil { return err } @@ -793,9 +859,20 @@ func (s *Scorch) persistSnapshotDirect(snapshot *IndexSnapshot, exclude map[uint } }() for segmentID, path := range newSegmentPaths { - newSegments[segmentID], err = s.segPlugin.Open(path) - if err != nil { - return fmt.Errorf("error opening new segment at %s, %v", path, err) + // Try VFS-aware plugin first, fall back to legacy path-based + if vfsPlugin, ok := s.segPlugin.(SegmentPluginVFS); ok && s.vfsDir != nil { + // Use VFS-aware method with relative filename + filename := filepath.Base(path) + newSegments[segmentID], err = vfsPlugin.OpenVFS(s.vfsDir, filename) + if err != nil { + return fmt.Errorf("error opening segment %s via VFS, %v", filename, err) + } + } else { + // Legacy path-based opening + newSegments[segmentID], err = s.segPlugin.Open(path) + if err != nil { + return fmt.Errorf("error opening new segment at %s, %v", path, err) + } } } @@ -1004,10 +1081,24 @@ func (s *Scorch) loadSegment(segmentBucket *bolt.Bucket) (*SegmentSnapshot, erro if pathBytes == nil { return nil, fmt.Errorf("segment path missing") } - segmentPath := s.path + string(os.PathSeparator) + string(pathBytes) - seg, err := s.segPlugin.Open(segmentPath) - if err != nil { - return nil, fmt.Errorf("error opening bolt segment: %v", err) + filename := string(pathBytes) + + // Try VFS-aware plugin first, fall back to legacy path-based + var seg segment.Segment + var err error + if vfsPlugin, ok := s.segPlugin.(SegmentPluginVFS); ok && s.vfsDir != nil { + // Use VFS-aware method with relative filename + seg, err = vfsPlugin.OpenVFS(s.vfsDir, filename) + if err != nil { + return nil, fmt.Errorf("error opening segment %s via VFS: %v", filename, err) + } + } else { + // Legacy path-based opening + segmentPath := s.path + string(os.PathSeparator) + filename + seg, err = s.segPlugin.Open(segmentPath) + if err != nil { + return nil, fmt.Errorf("error opening bolt segment: %v", err) + } } rv := &SegmentSnapshot{ @@ -1251,7 +1342,11 @@ func (s *Scorch) removeOldBoltSnapshots() (numRemoved int, err error) { } func (s *Scorch) maxSegmentIDOnDisk() (uint64, error) { - files, err := os.ReadDir(s.path) + if s.vfsDir == nil { + return 0, nil + } + + files, err := s.vfsDir.ReadDir(".") if err != nil { return 0, err } @@ -1280,18 +1375,23 @@ func (s *Scorch) removeOldZapFiles() error { return err } - files, err := os.ReadDir(s.path) + if s.vfsDir == nil { + return nil + } + + files, err := s.vfsDir.ReadDir(".") if err != nil { return err } s.rootLock.RLock() + defer s.rootLock.RUnlock() for _, f := range files { fname := f.Name() if filepath.Ext(fname) == ".zap" { if _, exists := liveFileNames[fname]; !exists && !s.ineligibleForRemoval[fname] && (s.copyScheduled[fname] <= 0) { - err := os.Remove(s.path + string(os.PathSeparator) + fname) + err := s.vfsDir.Remove(fname) if err != nil { log.Printf("got err removing file: %s, err: %v", fname, err) } @@ -1299,8 +1399,6 @@ func (s *Scorch) removeOldZapFiles() error { } } - s.rootLock.RUnlock() - return nil } diff --git a/index/scorch/rollback_test.go b/index/scorch/rollback_test.go index c0facfedf..154bd941e 100644 --- a/index/scorch/rollback_test.go +++ b/index/scorch/rollback_test.go @@ -23,6 +23,7 @@ import ( "time" "github.com/blevesearch/bleve/v2/document" + "github.com/blevesearch/bleve/v2/index/scorch/vfs" index "github.com/blevesearch/bleve_index_api" ) @@ -545,7 +546,12 @@ func TestBackupRacingWithPurge(t *testing.T) { }() // if the latest snapshot was purged, the following will return error - err = copyReader.CopyTo(testFSDirector(backupidxConfig["path"].(string))) + backupPath := backupidxConfig["path"].(string) + backupVFSDir, err := vfs.NewFSDirectory(filepath.Join(backupPath, "store")) + if err != nil { + t.Fatalf("error creating backup VFS directory: %v", err) + } + err = copyReader.CopyTo(backupVFSDir) if err != nil { t.Fatalf("error copying the index: %v", err) } diff --git a/index/scorch/scorch.go b/index/scorch/scorch.go index 83924978e..035f2f582 100644 --- a/index/scorch/scorch.go +++ b/index/scorch/scorch.go @@ -24,9 +24,11 @@ import ( "time" "github.com/RoaringBitmap/roaring/v2" + "github.com/blevesearch/bleve/v2/index/scorch/vfs" "github.com/blevesearch/bleve/v2/registry" "github.com/blevesearch/bleve/v2/util" index "github.com/blevesearch/bleve_index_api" + apivfs "github.com/blevesearch/bleve_index_api/vfs" segment "github.com/blevesearch/scorch_segment_api/v2" bolt "go.etcd.io/bbolt" ) @@ -48,6 +50,10 @@ type Scorch struct { analysisQueue *index.AnalysisQueue path string + // vfsDir is the pluggable directory for segment storage + // If nil, falls back to filesystem operations at 'path' + vfsDir apivfs.Directory + unsafeBatch bool rootLock sync.RWMutex @@ -145,6 +151,12 @@ func NewScorch(storeName string, } rv.root = &IndexSnapshot{parent: rv, refs: 1, creator: "NewScorch"} + + // Check if a custom VFS directory is provided + if dir, ok := config["vfsDirectory"].(apivfs.Directory); ok { + rv.vfsDir = dir + } + ro, ok := config["read_only"].(bool) if ok { rv.readOnly = ro @@ -248,6 +260,15 @@ func (s *Scorch) openBolt() error { s.unsafeBatch = true } + // Initialize VFS directory if not already set + if s.vfsDir == nil && s.path != "" { + var err error + s.vfsDir, err = vfs.NewFSDirectory(s.path) + if err != nil { + return fmt.Errorf("failed to create VFS directory: %w", err) + } + } + rootBoltOpt := *bolt.DefaultOptions if s.readOnly { rootBoltOpt.ReadOnly = true @@ -261,10 +282,21 @@ func (s *Scorch) openBolt() error { return os.OpenFile(path, os.O_RDONLY, mode) } } else { + // Create base directory for BoltDB (local filesystem) if s.path != "" { err := os.MkdirAll(s.path, 0o700) if err != nil { - return err + return fmt.Errorf("create local directory: %w", err) + } + } + + // Ensure VFS directory exists for segments + // For FSDirectory, this is the same as the local directory + // For remote VFS (S3, etc.), this creates the necessary storage structure + if s.vfsDir != nil { + err := s.vfsDir.MkdirAll(".", 0o700) + if err != nil { + return fmt.Errorf("create VFS directory: %w", err) } } } @@ -600,19 +632,17 @@ func (s *Scorch) diskFileStats(rootSegmentPaths map[string]struct{}) (uint64, uint64, uint64, ) { var numFilesOnDisk, numBytesUsedDisk, numBytesOnDiskByRoot uint64 - if s.path != "" { - files, err := os.ReadDir(s.path) + if s.vfsDir != nil { + files, err := s.vfsDir.ReadDir(".") if err == nil { for _, f := range files { if !f.IsDir() { - if finfo, err := f.Info(); err == nil { - numBytesUsedDisk += uint64(finfo.Size()) - numFilesOnDisk++ - if rootSegmentPaths != nil { - fname := s.path + string(os.PathSeparator) + finfo.Name() - if _, fileAtRoot := rootSegmentPaths[fname]; fileAtRoot { - numBytesOnDiskByRoot += uint64(finfo.Size()) - } + numBytesUsedDisk += uint64(f.Size()) + numFilesOnDisk++ + if rootSegmentPaths != nil { + fname := s.path + string(os.PathSeparator) + f.Name() + if _, fileAtRoot := rootSegmentPaths[fname]; fileAtRoot { + numBytesOnDiskByRoot += uint64(f.Size()) } } } diff --git a/index/scorch/segment_plugin.go b/index/scorch/segment_plugin.go index 790a8008a..fed185d43 100644 --- a/index/scorch/segment_plugin.go +++ b/index/scorch/segment_plugin.go @@ -20,6 +20,7 @@ import ( "github.com/RoaringBitmap/roaring/v2" "github.com/blevesearch/bleve/v2/geo" index "github.com/blevesearch/bleve_index_api" + "github.com/blevesearch/bleve_index_api/vfs" segment "github.com/blevesearch/scorch_segment_api/v2" zapv11 "github.com/blevesearch/zapx/v11" @@ -68,6 +69,45 @@ type SegmentPlugin interface { [][]uint64, uint64, error) } +// SegmentPluginVFS extends SegmentPlugin with VFS-aware methods. +// Plugins that implement this interface can work with the vfs.Directory +// abstraction, enabling support for remote storage backends (S3, GCS, etc.) +// in addition to local filesystem. +// +// The VFS-aware methods use relative file names instead of absolute paths, +// and all I/O operations go through the vfs.Directory interface. +// +// Implementations should maintain backwards compatibility by continuing to +// implement the base SegmentPlugin interface for legacy path-based operations. +type SegmentPluginVFS interface { + SegmentPlugin + + // OpenVFS attempts to open a segment file through the VFS directory + // and return the corresponding Segment. + // The name parameter is a relative filename (e.g., "000000000001.zap"), + // not an absolute path. + OpenVFS(dir vfs.Directory, name string) (segment.Segment, error) + + // MergeVFS takes a set of Segments and creates a new segment through + // the VFS directory at the specified relative name. + // This is the VFS-aware version of Merge() that works with the + // vfs.Directory abstraction instead of filesystem paths. + // + // Parameters: + // - dir: The VFS directory to write the merged segment to + // - name: Relative filename for the new segment (e.g., "000000000001.zap") + // - segments: Input segments to merge + // - drops: Bitmaps indicating which documents to drop during merge + // - closeCh: Channel to signal merge cancellation + // - s: Optional stats reporter for merge progress + // + // Returns: Same as Merge() - document mappings, bytes written, and error + MergeVFS(dir vfs.Directory, name string, + segments []segment.Segment, drops []*roaring.Bitmap, + closeCh chan struct{}, s segment.StatsReporter) ( + [][]uint64, uint64, error) +} + var supportedSegmentPlugins map[string]map[uint32]SegmentPlugin var defaultSegmentPlugin SegmentPlugin diff --git a/index/scorch/snapshot_index.go b/index/scorch/snapshot_index.go index 981640710..807d29c63 100644 --- a/index/scorch/snapshot_index.go +++ b/index/scorch/snapshot_index.go @@ -19,6 +19,7 @@ import ( "context" "encoding/binary" "fmt" + "io" "os" "path/filepath" "reflect" @@ -30,6 +31,7 @@ import ( "github.com/RoaringBitmap/roaring/v2" "github.com/blevesearch/bleve/v2/document" index "github.com/blevesearch/bleve_index_api" + "github.com/blevesearch/bleve_index_api/vfs" segment "github.com/blevesearch/scorch_segment_api/v2" "github.com/blevesearch/vellum" lev "github.com/blevesearch/vellum/levenshtein" @@ -992,46 +994,112 @@ OUTER: } func (is *IndexSnapshot) CopyTo(d index.Directory) error { - // get the root bolt file. - w, err := d.GetWriter(filepath.Join("store", "root.bolt")) - if err != nil || w == nil { - return fmt.Errorf("failed to create the root.bolt file, err: %v", err) + // Convert index.Directory to vfs.Directory if possible + // For backwards compatibility, we need to support both old-style index.Directory + // and new VFS-aware destinations + var destVFS vfs.Directory + + // Check if destination implements vfs.Directory + if vfsDir, ok := d.(vfs.Directory); ok { + destVFS = vfsDir + } else { + // Legacy path: destination is old-style index.Directory (e.g., FileSystemDirectory) + // We need to extract the base path and create a VFS directory from it + // This is for backwards compatibility with existing code + return fmt.Errorf("CopyTo requires a vfs.Directory implementation; legacy index.Directory is no longer supported for copying segment files") } - rootFile, ok := w.(*os.File) - if !ok { - return fmt.Errorf("invalid root.bolt file found") + + // Create the store subdirectory in the destination + // The index structure has metadata at the root and scorch data in "store/" + storeDir := "store" + + // get the root bolt file writer in the store subdirectory + w, err := destVFS.Create(filepath.Join(storeDir, "root.bolt")) + if err != nil { + return fmt.Errorf("failed to create the root.bolt file: %w", err) } - copyBolt, err := bolt.Open(rootFile.Name(), 0o600, nil) + // For BoltDB, we need a real file path, so we need to use a temporary file approach + // Create a temporary file, populate it with BoltDB data, then copy to VFS + tmpFile, err := os.CreateTemp("", "backup-root-*.bolt") if err != nil { - return err + w.Close() + return fmt.Errorf("failed to create temp bolt file: %w", err) } - defer func() { + tmpPath := tmpFile.Name() + tmpFile.Close() + defer os.Remove(tmpPath) + + copyBolt, err := bolt.Open(tmpPath, 0o600, nil) + if err != nil { w.Close() - if cerr := copyBolt.Close(); cerr != nil && err == nil { - err = cerr + return fmt.Errorf("failed to open temp bolt: %w", err) + } + + var txErr error + func() { + defer copyBolt.Close() + + // start a write transaction + tx, err := copyBolt.Begin(true) + if err != nil { + txErr = err + return + } + + // Prepare the snapshot in BoltDB and copy segment files to destination VFS + // Pass "store" as the path so segments are copied to the store subdirectory + _, _, err = prepareBoltSnapshot(is, tx, storeDir, is.parent.segPlugin, nil, destVFS) + if err != nil { + _ = tx.Rollback() + txErr = fmt.Errorf("error backing up index snapshot: %w", err) + return + } + + // commit bolt data + err = tx.Commit() + if err != nil { + txErr = fmt.Errorf("error commit tx to backup root bolt: %w", err) + return + } + + err = copyBolt.Sync() + if err != nil { + txErr = fmt.Errorf("error syncing bolt: %w", err) + return } }() - // start a write transaction - tx, err := copyBolt.Begin(true) - if err != nil { - return err + if txErr != nil { + w.Close() + return txErr } - _, _, err = prepareBoltSnapshot(is, tx, "", is.parent.segPlugin, nil, d) + // Now copy the temp bolt file to destination VFS + tmpBolt, err := os.Open(tmpPath) if err != nil { - _ = tx.Rollback() - return fmt.Errorf("error backing up index snapshot: %v", err) + w.Close() + return fmt.Errorf("failed to open temp bolt for reading: %w", err) } + defer tmpBolt.Close() - // commit bolt data - err = tx.Commit() + _, err = io.Copy(w, tmpBolt) if err != nil { - return fmt.Errorf("error commit tx to backup root bolt: %v", err) + w.Close() + return fmt.Errorf("failed to copy bolt to destination: %w", err) } - return copyBolt.Sync() + // Sync and close the destination writer + if err := w.Sync(); err != nil { + w.Close() + return fmt.Errorf("failed to sync destination bolt: %w", err) + } + + if err := w.Close(); err != nil { + return fmt.Errorf("failed to close destination bolt: %w", err) + } + + return nil } func (is *IndexSnapshot) UpdateIOStats(val uint64) { diff --git a/index/scorch/vfs/directory_test.go b/index/scorch/vfs/directory_test.go new file mode 100644 index 000000000..18c98129b --- /dev/null +++ b/index/scorch/vfs/directory_test.go @@ -0,0 +1,249 @@ +// Copyright (c) 2025 Couchbase, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package vfs + +import ( + "io" + "testing" + + apivfs "github.com/blevesearch/bleve_index_api/vfs" +) + +// directoryTestSuite runs a standard set of tests against any Directory implementation. +func directoryTestSuite(t *testing.T, dir apivfs.Directory) { + t.Run("CreateAndRead", func(t *testing.T) { + testData := []byte("test data") + testFile := "test.dat" + + // Create and write + w, err := dir.Create(testFile) + if err != nil { + t.Fatalf("Failed to create file: %v", err) + } + + if _, err := w.Write(testData); err != nil { + t.Fatalf("Failed to write data: %v", err) + } + + if err := w.Close(); err != nil { + t.Fatalf("Failed to close writer: %v", err) + } + + // Read and verify + r, err := dir.Open(testFile) + if err != nil { + t.Fatalf("Failed to open file: %v", err) + } + defer r.Close() + + readData, err := io.ReadAll(r) + if err != nil { + t.Fatalf("Failed to read data: %v", err) + } + + if string(readData) != string(testData) { + t.Errorf("Data mismatch: got %q, want %q", readData, testData) + } + + // Clean up + if err := dir.Remove(testFile); err != nil { + t.Fatalf("Failed to remove file: %v", err) + } + }) + + t.Run("Stat", func(t *testing.T) { + testData := []byte("stat test") + testFile := "stat.dat" + + // Create file + w, err := dir.Create(testFile) + if err != nil { + t.Fatalf("Failed to create file: %v", err) + } + if _, err := w.Write(testData); err != nil { + t.Fatalf("Failed to write data: %v", err) + } + if err := w.Close(); err != nil { + t.Fatalf("Failed to close writer: %v", err) + } + + // Stat file + fi, err := dir.Stat(testFile) + if err != nil { + t.Fatalf("Failed to stat file: %v", err) + } + + if fi.Size() != int64(len(testData)) { + t.Errorf("Size mismatch: got %d, want %d", fi.Size(), len(testData)) + } + + if fi.IsDir() { + t.Error("Expected file, got directory") + } + + // Clean up + if err := dir.Remove(testFile); err != nil { + t.Fatalf("Failed to remove file: %v", err) + } + }) + + t.Run("Rename", func(t *testing.T) { + oldName := "old.dat" + newName := "new.dat" + testData := []byte("rename test") + + // Create file + w, err := dir.Create(oldName) + if err != nil { + t.Fatalf("Failed to create file: %v", err) + } + if _, err := w.Write(testData); err != nil { + t.Fatalf("Failed to write data: %v", err) + } + if err := w.Close(); err != nil { + t.Fatalf("Failed to close writer: %v", err) + } + + // Rename + if err := dir.Rename(oldName, newName); err != nil { + t.Fatalf("Failed to rename file: %v", err) + } + + // Verify new file exists and has correct content + r, err := dir.Open(newName) + if err != nil { + t.Fatalf("Failed to open renamed file: %v", err) + } + defer r.Close() + + readData, err := io.ReadAll(r) + if err != nil { + t.Fatalf("Failed to read renamed file: %v", err) + } + + if string(readData) != string(testData) { + t.Errorf("Data mismatch after rename") + } + + // Clean up + if err := dir.Remove(newName); err != nil { + t.Fatalf("Failed to remove file: %v", err) + } + }) + + t.Run("Remove", func(t *testing.T) { + testFile := "remove.dat" + + // Create file + w, err := dir.Create(testFile) + if err != nil { + t.Fatalf("Failed to create file: %v", err) + } + if _, err := w.Write([]byte("remove test")); err != nil { + t.Fatalf("Failed to write data: %v", err) + } + if err := w.Close(); err != nil { + t.Fatalf("Failed to close writer: %v", err) + } + + // Remove file + if err := dir.Remove(testFile); err != nil { + t.Fatalf("Failed to remove file: %v", err) + } + + // Verify file doesn't exist + if _, err := dir.Stat(testFile); err == nil { + t.Error("File still exists after remove") + } + }) + + t.Run("OpenAt", func(t *testing.T) { + testData := []byte("random access test data with some content") + testFile := "openat.dat" + + // Create file + w, err := dir.Create(testFile) + if err != nil { + t.Fatalf("Failed to create file: %v", err) + } + if _, err := w.Write(testData); err != nil { + t.Fatalf("Failed to write data: %v", err) + } + if err := w.Close(); err != nil { + t.Fatalf("Failed to close writer: %v", err) + } + + // Open for random access + rac, err := dir.OpenAt(testFile) + if err != nil { + t.Fatalf("Failed to open file for random access: %v", err) + } + defer rac.Close() + + // Test reading at different positions + buf := make([]byte, 6) + + // Read from beginning + n, err := rac.ReadAt(buf, 0) + if err != nil && err != io.EOF { + t.Fatalf("Failed to read at position 0: %v", err) + } + if n != 6 { + t.Errorf("Expected to read 6 bytes, got %d", n) + } + if string(buf) != "random" { + t.Errorf("Data mismatch at position 0: got %q, want %q", buf, "random") + } + + // Read from middle + n, err = rac.ReadAt(buf, 7) + if err != nil && err != io.EOF { + t.Fatalf("Failed to read at position 7: %v", err) + } + if n != 6 { + t.Errorf("Expected to read 6 bytes, got %d", n) + } + if string(buf) != "access" { + t.Errorf("Data mismatch at position 7: got %q, want %q", buf, "access") + } + + // Test AsFd returns valid file descriptor + fd := rac.AsFd() + if fd == 0 { + t.Error("Expected valid file descriptor, got 0") + } + + // Clean up + if err := dir.Remove(testFile); err != nil { + t.Fatalf("Failed to remove file: %v", err) + } + }) +} + +func TestDirectoryCompliance_FSDirectory(t *testing.T) { + tmpDir := t.TempDir() + dir, err := NewFSDirectory(tmpDir) + if err != nil { + t.Fatalf("Failed to create FSDirectory: %v", err) + } + + // Lock directory for tests + if err := dir.Lock(); err != nil { + t.Fatalf("Failed to lock directory: %v", err) + } + defer dir.Unlock() + + directoryTestSuite(t, dir) +} diff --git a/index/scorch/vfs/fs_directory.go b/index/scorch/vfs/fs_directory.go new file mode 100644 index 000000000..cf54f25af --- /dev/null +++ b/index/scorch/vfs/fs_directory.go @@ -0,0 +1,245 @@ +// Copyright (c) 2025 Couchbase, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package vfs + +import ( + "fmt" + "io" + "io/fs" + "os" + "path/filepath" + "sync" + + apivfs "github.com/blevesearch/bleve_index_api/vfs" +) + +// FSDirectory is a Directory implementation that uses the local filesystem. +type FSDirectory struct { + basePath string + lockFile *os.File + mu sync.Mutex +} + +// NewFSDirectory creates a new filesystem-based Directory at the given path. +func NewFSDirectory(path string) (*FSDirectory, error) { + absPath, err := filepath.Abs(path) + if err != nil { + return nil, fmt.Errorf("failed to get absolute path: %w", err) + } + + return &FSDirectory{ + basePath: absPath, + }, nil +} + +// Open opens the named file for reading. +func (d *FSDirectory) Open(name string) (io.ReadCloser, error) { + fullPath := d.FullPath(name) + return os.Open(fullPath) +} + +// OpenAt opens the named file for random access reading. +// This is used for memory-mapped segments. +func (d *FSDirectory) OpenAt(name string) (apivfs.ReaderAtCloser, error) { + fullPath := d.FullPath(name) + f, err := os.Open(fullPath) + if err != nil { + return nil, err + } + return apivfs.NewFileReaderAtCloser(f), nil +} + +// Create creates or truncates the named file for writing. +func (d *FSDirectory) Create(name string) (apivfs.WriteCloser, error) { + fullPath := d.FullPath(name) + + // Ensure parent directory exists + dir := filepath.Dir(fullPath) + if err := os.MkdirAll(dir, 0755); err != nil { + return nil, fmt.Errorf("failed to create parent directory: %w", err) + } + + f, err := os.OpenFile(fullPath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644) + if err != nil { + return nil, err + } + + return &fsWriteCloser{File: f}, nil +} + +// Remove removes the named file. +func (d *FSDirectory) Remove(name string) error { + fullPath := d.FullPath(name) + return os.Remove(fullPath) +} + +// Rename renames (moves) oldpath to newpath. +func (d *FSDirectory) Rename(oldpath, newpath string) error { + oldFullPath := d.FullPath(oldpath) + newFullPath := d.FullPath(newpath) + + // Ensure parent directory of new path exists + dir := filepath.Dir(newFullPath) + if err := os.MkdirAll(dir, 0755); err != nil { + return fmt.Errorf("failed to create parent directory: %w", err) + } + + return os.Rename(oldFullPath, newFullPath) +} + +// Stat returns FileInfo describing the named file. +func (d *FSDirectory) Stat(name string) (apivfs.FileInfo, error) { + fullPath := d.FullPath(name) + fi, err := os.Stat(fullPath) + if err != nil { + return nil, err + } + return &fsFileInfo{FileInfo: fi}, nil +} + +// ReadDir reads the named directory and returns a list of directory entries. +func (d *FSDirectory) ReadDir(name string) ([]apivfs.FileInfo, error) { + fullPath := d.FullPath(name) + entries, err := os.ReadDir(fullPath) + if err != nil { + return nil, err + } + + result := make([]apivfs.FileInfo, 0, len(entries)) + for _, entry := range entries { + info, err := entry.Info() + if err != nil { + continue // skip entries we can't stat + } + result = append(result, &fsFileInfo{FileInfo: info}) + } + return result, nil +} + +// MkdirAll creates a directory named path, along with any necessary parents. +func (d *FSDirectory) MkdirAll(path string, perm fs.FileMode) error { + fullPath := d.FullPath(path) + return os.MkdirAll(fullPath, perm) +} + +// Sync is a no-op for FSDirectory as file syncs happen on close. +func (d *FSDirectory) Sync() error { + // For filesystem directories, we don't need to do anything special here. + // Individual file syncs happen when files are closed. + return nil +} + +// GetWriter implements index.Directory for backwards compatibility. +// This allows FSDirectory to be used where index.Directory is expected. +func (d *FSDirectory) GetWriter(filePath string) (io.WriteCloser, error) { + // Create any necessary parent directories + dir := filepath.Dir(filePath) + if dir != "" && dir != "." { + if err := d.MkdirAll(dir, 0755); err != nil { + return nil, fmt.Errorf("failed to create parent directories: %w", err) + } + } + + // Use Create to get a WriteCloser + return d.Create(filePath) +} + +// Lock acquires an exclusive lock on the directory. +func (d *FSDirectory) Lock() error { + d.mu.Lock() + defer d.mu.Unlock() + + if d.lockFile != nil { + return fmt.Errorf("directory is already locked") + } + + lockPath := filepath.Join(d.basePath, "write.lock") + + // Ensure base directory exists + if err := os.MkdirAll(d.basePath, 0755); err != nil { + return fmt.Errorf("failed to create base directory: %w", err) + } + + f, err := os.OpenFile(lockPath, os.O_CREATE|os.O_RDWR, 0644) + if err != nil { + return fmt.Errorf("failed to create lock file: %w", err) + } + + // Try to acquire an exclusive lock (non-blocking) + // Uses platform-specific implementation (flock on Unix, LockFileEx on Windows) + err = flock(f, true) + if err != nil { + if closeErr := f.Close(); closeErr != nil { + return fmt.Errorf("failed to acquire lock (another process may have the index open): %w, and failed to close lock file: %v", err, closeErr) + } + return fmt.Errorf("failed to acquire lock (another process may have the index open): %w", err) + } + + d.lockFile = f + return nil +} + +// Unlock releases the lock acquired by Lock. +func (d *FSDirectory) Unlock() error { + d.mu.Lock() + defer d.mu.Unlock() + + if d.lockFile == nil { + return nil // not locked + } + + // Release the lock + // Uses platform-specific implementation (flock on Unix, UnlockFileEx on Windows) + if err := funlock(d.lockFile); err != nil { + return fmt.Errorf("failed to release lock: %w", err) + } + + // Close and remove the lock file + lockPath := d.lockFile.Name() + if err := d.lockFile.Close(); err != nil { + return fmt.Errorf("failed to close lock file: %w", err) + } + + // Try to remove the lock file (best effort, ignore errors) + _ = os.Remove(lockPath) + + d.lockFile = nil + return nil +} + +// FullPath returns the full filesystem path for a given name. +func (d *FSDirectory) FullPath(name string) string { + if filepath.IsAbs(name) { + return name + } + return filepath.Join(d.basePath, name) +} + +// fsWriteCloser wraps os.File to implement WriteCloser with Sync. +type fsWriteCloser struct { + *os.File +} + +func (w *fsWriteCloser) Sync() error { + return w.File.Sync() +} + +// fsFileInfo wraps os.FileInfo to implement our FileInfo interface. +type fsFileInfo struct { + os.FileInfo +} + +// Ensure FSDirectory implements apivfs.Directory +var _ apivfs.Directory = (*FSDirectory)(nil) diff --git a/index/scorch/vfs/fs_directory_lock_unix.go b/index/scorch/vfs/fs_directory_lock_unix.go new file mode 100644 index 000000000..7ded31f63 --- /dev/null +++ b/index/scorch/vfs/fs_directory_lock_unix.go @@ -0,0 +1,39 @@ +//go:build !windows && !plan9 && !solaris && !aix && !android + +// Copyright (c) 2025 Couchbase, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package vfs + +import ( + "os" + "syscall" +) + +// flock acquires an advisory lock on a file using flock(2). +// This is the Unix/Linux/macOS implementation. +func flock(f *os.File, exclusive bool) error { + flag := syscall.LOCK_NB // Non-blocking + if exclusive { + flag |= syscall.LOCK_EX // Exclusive lock + } else { + flag |= syscall.LOCK_SH // Shared lock + } + return syscall.Flock(int(f.Fd()), flag) +} + +// funlock releases the advisory lock on a file. +func funlock(f *os.File) error { + return syscall.Flock(int(f.Fd()), syscall.LOCK_UN) +} diff --git a/index/scorch/vfs/fs_directory_lock_windows.go b/index/scorch/vfs/fs_directory_lock_windows.go new file mode 100644 index 000000000..8a4122af1 --- /dev/null +++ b/index/scorch/vfs/fs_directory_lock_windows.go @@ -0,0 +1,68 @@ +//go:build windows + +// Copyright (c) 2025 Couchbase, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package vfs + +import ( + "os" + + "golang.org/x/sys/windows" +) + +// flock acquires an advisory lock on a file using LockFileEx. +// This is the Windows implementation. +// Following the bbolt pattern: uses byte-range lock at offset -1..0 +func flock(f *os.File, exclusive bool) error { + // Flags for immediate failure if lock cannot be acquired + flags := uint32(windows.LOCKFILE_FAIL_IMMEDIATELY) + if exclusive { + flags |= windows.LOCKFILE_EXCLUSIVE_LOCK + } + + // Use byte-range -1..0 as the lock range (bbolt pattern) + // This avoids conflicts with actual file content + var m1 uint32 = (1 << 32) - 1 // -1 in a uint32 + + err := windows.LockFileEx( + windows.Handle(f.Fd()), + flags, + 0, // reserved, must be 0 + 1, // number of bytes to lock (low DWORD) + 0, // number of bytes to lock (high DWORD) + &windows.Overlapped{ + Offset: m1, + OffsetHigh: m1, + }, + ) + + return err +} + +// funlock releases the advisory lock on a file. +func funlock(f *os.File) error { + var m1 uint32 = (1 << 32) - 1 + + return windows.UnlockFileEx( + windows.Handle(f.Fd()), + 0, // reserved, must be 0 + 1, // number of bytes to unlock (low DWORD) + 0, // number of bytes to unlock (high DWORD) + &windows.Overlapped{ + Offset: m1, + OffsetHigh: m1, + }, + ) +} diff --git a/index/scorch/vfs/fs_directory_test.go b/index/scorch/vfs/fs_directory_test.go new file mode 100644 index 000000000..79d55f77c --- /dev/null +++ b/index/scorch/vfs/fs_directory_test.go @@ -0,0 +1,261 @@ +// Copyright (c) 2025 Couchbase, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package vfs + +import ( + "io" + "os" + "path/filepath" + "testing" +) + +func TestFSDirectory_BasicOperations(t *testing.T) { + // Create temporary directory + tmpDir := t.TempDir() + + // Create FSDirectory + dir, err := NewFSDirectory(tmpDir) + if err != nil { + t.Fatalf("Failed to create FSDirectory: %v", err) + } + + // Test Lock/Unlock + if err := dir.Lock(); err != nil { + t.Fatalf("Failed to lock directory: %v", err) + } + defer dir.Unlock() + + // Test Create and Write + testData := []byte("Hello, Firebug!") + testFile := "test.txt" + + w, err := dir.Create(testFile) + if err != nil { + t.Fatalf("Failed to create file: %v", err) + } + + if _, err := w.Write(testData); err != nil { + t.Fatalf("Failed to write data: %v", err) + } + + if err := w.Close(); err != nil { + t.Fatalf("Failed to close writer: %v", err) + } + + // Test Open and Read + r, err := dir.Open(testFile) + if err != nil { + t.Fatalf("Failed to open file: %v", err) + } + defer r.Close() + + readData, err := io.ReadAll(r) + if err != nil { + t.Fatalf("Failed to read data: %v", err) + } + + if string(readData) != string(testData) { + t.Errorf("Read data mismatch: got %q, want %q", readData, testData) + } + + // Test Stat + fi, err := dir.Stat(testFile) + if err != nil { + t.Fatalf("Failed to stat file: %v", err) + } + + if fi.Name() != testFile { + t.Errorf("File name mismatch: got %q, want %q", fi.Name(), testFile) + } + + if fi.Size() != int64(len(testData)) { + t.Errorf("File size mismatch: got %d, want %d", fi.Size(), len(testData)) + } + + // Test Rename + newName := "renamed.txt" + if err := dir.Rename(testFile, newName); err != nil { + t.Fatalf("Failed to rename file: %v", err) + } + + // Verify renamed file exists + if _, err := dir.Stat(newName); err != nil { + t.Fatalf("Failed to stat renamed file: %v", err) + } + + // Verify old file doesn't exist + if _, err := dir.Stat(testFile); !os.IsNotExist(err) { + t.Errorf("Old file still exists after rename") + } + + // Test Remove + if err := dir.Remove(newName); err != nil { + t.Fatalf("Failed to remove file: %v", err) + } + + // Verify file is removed + if _, err := dir.Stat(newName); !os.IsNotExist(err) { + t.Errorf("File still exists after remove") + } +} + +func TestFSDirectory_DirectoryOperations(t *testing.T) { + tmpDir := t.TempDir() + + dir, err := NewFSDirectory(tmpDir) + if err != nil { + t.Fatalf("Failed to create FSDirectory: %v", err) + } + + // Test MkdirAll + subdir := filepath.Join("a", "b", "c") + if err := dir.MkdirAll(subdir, 0755); err != nil { + t.Fatalf("Failed to create subdirectories: %v", err) + } + + // Create some test files + testFiles := []string{"file1.txt", "file2.txt", "file3.dat"} + for _, name := range testFiles { + w, err := dir.Create(name) + if err != nil { + t.Fatalf("Failed to create file %s: %v", name, err) + } + if _, err := w.Write([]byte("test")); err != nil { + t.Fatalf("Failed to write to file %s: %v", name, err) + } + if err := w.Close(); err != nil { + t.Fatalf("Failed to close file %s: %v", name, err) + } + } + + // Test ReadDir + entries, err := dir.ReadDir(".") + if err != nil { + t.Fatalf("Failed to read directory: %v", err) + } + + // Check that we have the expected number of entries + // (3 files + 1 subdirectory + possibly lock file) + if len(entries) < 4 { + t.Errorf("Expected at least 4 entries, got %d", len(entries)) + } + + // Verify our test files are in the list + found := make(map[string]bool) + for _, entry := range entries { + found[entry.Name()] = true + } + + for _, name := range testFiles { + if !found[name] { + t.Errorf("File %s not found in directory listing", name) + } + } +} + +func TestFSDirectory_Locking(t *testing.T) { + tmpDir := t.TempDir() + + dir1, err := NewFSDirectory(tmpDir) + if err != nil { + t.Fatalf("Failed to create first FSDirectory: %v", err) + } + + // Acquire lock with first directory + if err := dir1.Lock(); err != nil { + t.Fatalf("Failed to lock first directory: %v", err) + } + + // Try to acquire lock with second directory (should fail) + dir2, err := NewFSDirectory(tmpDir) + if err != nil { + t.Fatalf("Failed to create second FSDirectory: %v", err) + } + + if err := dir2.Lock(); err == nil { + t.Error("Expected lock to fail, but it succeeded") + dir2.Unlock() + } + + // Release first lock + if err := dir1.Unlock(); err != nil { + t.Fatalf("Failed to unlock first directory: %v", err) + } + + // Now second directory should be able to acquire lock + if err := dir2.Lock(); err != nil { + t.Fatalf("Failed to lock second directory after first was released: %v", err) + } + + if err := dir2.Unlock(); err != nil { + t.Fatalf("Failed to unlock second directory: %v", err) + } +} + +func TestFSDirectory_ConcurrentReads(t *testing.T) { + tmpDir := t.TempDir() + + dir, err := NewFSDirectory(tmpDir) + if err != nil { + t.Fatalf("Failed to create FSDirectory: %v", err) + } + + // Create test file + testFile := "concurrent.txt" + testData := []byte("Concurrent read test data") + + w, err := dir.Create(testFile) + if err != nil { + t.Fatalf("Failed to create file: %v", err) + } + if _, err := w.Write(testData); err != nil { + t.Fatalf("Failed to write to file: %v", err) + } + if err := w.Close(); err != nil { + t.Fatalf("Failed to close file: %v", err) + } + + // Perform concurrent reads + const numReaders = 10 + done := make(chan bool, numReaders) + + for i := 0; i < numReaders; i++ { + go func() { + defer func() { done <- true }() + + r, err := dir.Open(testFile) + if err != nil { + t.Errorf("Failed to open file: %v", err) + return + } + defer r.Close() + + data, err := io.ReadAll(r) + if err != nil { + t.Errorf("Failed to read file: %v", err) + return + } + + if string(data) != string(testData) { + t.Errorf("Data mismatch in concurrent read") + } + }() + } + + // Wait for all readers to complete + for i := 0; i < numReaders; i++ { + <-done + } +} diff --git a/index_test.go b/index_test.go index 7ed27ff86..93a1eab16 100644 --- a/index_test.go +++ b/index_test.go @@ -41,6 +41,7 @@ import ( index "github.com/blevesearch/bleve_index_api" "github.com/blevesearch/bleve/v2/index/scorch" + "github.com/blevesearch/bleve/v2/index/scorch/vfs" "github.com/blevesearch/bleve/v2/index/upsidedown" ) @@ -3125,7 +3126,13 @@ func TestCopyIndex(t *testing.T) { backupIndexPath := createTmpIndexPath(t) defer cleanupTmpIndexPath(t, backupIndexPath) - err = copyableIndex.CopyTo(FileSystemDirectory(backupIndexPath)) + // Create a VFS-compatible directory for the backup + backupVFSDir, err := vfs.NewFSDirectory(backupIndexPath) + if err != nil { + t.Fatalf("error creating backup VFS directory: %v", err) + } + + err = copyableIndex.CopyTo(backupVFSDir) if err != nil { t.Fatalf("error copying the index: %v", err) }