Diff from https://github.com/prometheus/prometheus/issues/8799 and https://github.com/prometheus/prometheus/pull/9085 to make tsdb only use mmap and work around missing UBC support. diff --git go.mod go.mod index 39c3fcb5b..760b39a8b 100644 --- go.mod +++ go.mod @@ -13,7 +13,6 @@ require ( github.com/dgryski/go-sip13 v0.0.0-20200911182023-62edffca9245 github.com/digitalocean/godo v1.81.0 github.com/docker/docker v20.10.24+incompatible - github.com/edsrzf/mmap-go v1.1.0 github.com/envoyproxy/go-control-plane v0.10.3 github.com/envoyproxy/protoc-gen-validate v0.6.7 github.com/fsnotify/fsnotify v1.5.4 diff --git go.sum go.sum index e7aee4a9b..6b323945d 100644 --- go.sum +++ go.sum @@ -202,8 +202,6 @@ github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5m github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M= -github.com/edsrzf/mmap-go v1.1.0 h1:6EUwBLQ/Mcr1EYLE4Tn1VdW1A4ckqCQWZBw8Hr0kjpQ= -github.com/edsrzf/mmap-go v1.1.0/go.mod h1:19H/e8pUPLicwkyNgOykDXkJ9F0MHE+Z52B8EIth78Q= github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc= github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs= github.com/emicklei/go-restful v2.9.5+incompatible/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs= diff --git promql/query_logger.go promql/query_logger.go index 716e7749b..8eb1afce0 100644 --- promql/query_logger.go +++ promql/query_logger.go @@ -22,13 +22,13 @@ import ( "time" "unicode/utf8" - "github.com/edsrzf/mmap-go" "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/prometheus/prometheus/tsdb/fileutil" ) type ActiveQueryTracker struct { - mmapedFile []byte + mw *fileutil.MmapWriter getNextIndex chan int logger log.Logger maxConcurrent int @@ -81,7 +81,7 @@ func logUnfinishedQueries(filename string, filesize int, logger log.Logger) { } } -func getMMapedFile(filename string, filesize int, logger log.Logger) ([]byte, error) { +func getMMapedFile(filename string, filesize int, logger log.Logger) (*fileutil.MmapWriter, error) { file, err := os.OpenFile(filename, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0o666) if err != nil { absPath, pathErr := filepath.Abs(filename) @@ -92,19 +92,13 @@ func getMMapedFile(filename string, filesize int, logger log.Logger) ([]byte, er return nil, err } - err = file.Truncate(int64(filesize)) - if err != nil { - level.Error(logger).Log("msg", "Error setting filesize.", "filesize", filesize, "err", err) - return nil, err - } - - fileAsBytes, err := mmap.Map(file, mmap.RDWR, 0) + mw, err := fileutil.NewMmapWriterWithSize(file, filesize) if err != nil { level.Error(logger).Log("msg", "Failed to mmap", "file", filename, "Attempted size", filesize, "err", err) return nil, err } - return fileAsBytes, err + return mw, err } func NewActiveQueryTracker(localStoragePath string, maxConcurrent int, logger log.Logger) *ActiveQueryTracker { @@ -116,14 +110,17 @@ func NewActiveQueryTracker(localStoragePath string, maxConcurrent int, logger lo filename, filesize := filepath.Join(localStoragePath, "queries.active"), 1+maxConcurrent*entrySize logUnfinishedQueries(filename, filesize, logger) - fileAsBytes, err := getMMapedFile(filename, filesize, logger) + mw, err := getMMapedFile(filename, filesize, logger) if err != nil { panic("Unable to create mmap-ed active query log") } - copy(fileAsBytes, "[") + _, err = mw.Write([]byte("[")) + if err != nil { + panic("Unable to write mmap-ed active query log") + } activeQueryTracker := ActiveQueryTracker{ - mmapedFile: fileAsBytes, + mw: mw, getNextIndex: make(chan int, maxConcurrent), logger: logger, maxConcurrent: maxConcurrent, @@ -180,19 +177,27 @@ func (tracker ActiveQueryTracker) GetMaxConcurrent() int { } func (tracker ActiveQueryTracker) Delete(insertIndex int) { - copy(tracker.mmapedFile[insertIndex:], strings.Repeat("\x00", entrySize)) + _, err := tracker.mw.WriteAt([]byte(strings.Repeat("\x00", entrySize)), int64(insertIndex)) + if err != nil { + panic("Unable to write mmap-ed active query log") + } tracker.getNextIndex <- insertIndex } func (tracker ActiveQueryTracker) Insert(ctx context.Context, query string) (int, error) { select { case i := <-tracker.getNextIndex: - fileBytes := tracker.mmapedFile entry := newJSONEntry(query, tracker.logger) start, end := i, i+entrySize - copy(fileBytes[start:], entry) - copy(fileBytes[end-1:], ",") + _, err := tracker.mw.WriteAt(entry, int64(start)) + if err != nil { + return 0, err + } + _, err = tracker.mw.WriteAt([]byte(","), int64(end-1)) + if err != nil { + return 0, err + } return i, nil case <-ctx.Done(): return 0, ctx.Err() diff --git promql/query_logger_test.go promql/query_logger_test.go index ad76fb992..bd92b81af 100644 --- promql/query_logger_test.go +++ promql/query_logger_test.go @@ -19,13 +19,22 @@ import ( "testing" "github.com/grafana/regexp" + "github.com/prometheus/prometheus/tsdb/fileutil" "github.com/stretchr/testify/require" ) func TestQueryLogging(t *testing.T) { - fileAsBytes := make([]byte, 4096) + file, err := ioutil.TempFile("", "mmapedFile") + require.NoError(t, err) + + filename := file.Name() + defer os.Remove(filename) + + mw, err := fileutil.NewMmapWriterWithSize(file, 4096) + require.NoError(t, err) + queryLogger := ActiveQueryTracker{ - mmapedFile: fileAsBytes, + mw: mw, logger: nil, getNextIndex: make(chan int, 4), } @@ -45,6 +54,7 @@ func TestQueryLogging(t *testing.T) { `^{"query":"","timestamp_sec":\d+}\x00*,$`, `^{"query":"SpecialCharQuery{host=\\"2132132\\", id=123123}","timestamp_sec":\d+}\x00*,$`, } + fileAsBytes := mw.Bytes() // Check for inserts of queries. for i := 0; i < 4; i++ { @@ -67,9 +77,17 @@ func TestQueryLogging(t *testing.T) { } func TestIndexReuse(t *testing.T) { - queryBytes := make([]byte, 1+3*entrySize) + file, err := ioutil.TempFile("", "mmapedFile") + require.NoError(t, err) + + filename := file.Name() + defer os.Remove(filename) + + mw, err := fileutil.NewMmapWriterWithSize(file, 1+3*entrySize) + require.NoError(t, err) + queryLogger := ActiveQueryTracker{ - mmapedFile: queryBytes, + mw: mw, logger: nil, getNextIndex: make(chan int, 3), } @@ -91,6 +109,7 @@ func TestIndexReuse(t *testing.T) { `^{"query":"ThisShouldBeInsertedAtIndex2","timestamp_sec":\d+}\x00*,$`, `^{"query":"TestQuery3","timestamp_sec":\d+}\x00*,$`, } + queryBytes := mw.Bytes() // Check all bytes and verify new query was inserted at index 2 for i := 0; i < 3; i++ { @@ -110,10 +129,12 @@ func TestMMapFile(t *testing.T) { filename := file.Name() defer os.Remove(filename) - fileAsBytes, err := getMMapedFile(filename, 2, nil) + mw, err := getMMapedFile(filename, 2, nil) + require.NoError(t, err) + fileAsBytes := mw.Bytes() + _, err = mw.Write([]byte("ab")) require.NoError(t, err) - copy(fileAsBytes, "ab") f, err := os.Open(filename) require.NoError(t, err) diff --git tsdb/fileutil/mmap.go tsdb/fileutil/mmap.go index 4dbca4f97..e1c522472 100644 --- tsdb/fileutil/mmap.go +++ tsdb/fileutil/mmap.go @@ -20,8 +20,31 @@ import ( ) type MmapFile struct { - f *os.File - b []byte + f *os.File + b []byte + rw bool + closeFile bool +} + +func OpenRwMmapFromFile(f *os.File, size int) (mf *MmapFile, retErr error) { + defer func() { + if retErr != nil { + f.Close() + } + }() + if size <= 0 { + info, err := f.Stat() + if err != nil { + return nil, errors.Wrap(err, "stat") + } + size = int(info.Size()) + } + + b, err := mmapRw(f, size) + if err != nil { + return nil, errors.Wrapf(err, "mmap, size %d", size) + } + return &MmapFile{f: f, b: b, rw: true}, nil } func OpenMmapFile(path string) (*MmapFile, error) { @@ -46,22 +69,53 @@ func OpenMmapFileWithSize(path string, size int) (mf *MmapFile, retErr error) { size = int(info.Size()) } - b, err := mmap(f, size) + b, err := mmapRo(f, size) if err != nil { return nil, errors.Wrapf(err, "mmap, size %d", size) } + return &MmapFile{f: f, b: b, closeFile: true}, nil +} - return &MmapFile{f: f, b: b}, nil +func (f *MmapFile) resize(size int) error { + err := f.Sync() + if err != nil { + return errors.Wrap(err, "resize sync") + } + err = munmap(f.b) + if err != nil { + return errors.Wrap(err, "resize munmap") + } + var b []byte + if f.rw { + b, err = mmapRw(f.f, size) + } else { + b, err = mmapRo(f.f, size) + } + if err != nil { + return errors.Wrap(err, "resize mmap") + } + f.b = b + return nil } func (f *MmapFile) Close() error { - err0 := munmap(f.b) - err1 := f.f.Close() + err0 := f.Sync() + err1 := munmap(f.b) + var err2 error + if f.closeFile { + err2 = f.f.Close() + } if err0 != nil { - return err0 + return errors.Wrap(err0, "close sync") + } + if err1 != nil { + return errors.Wrap(err1, "close munmap") + } + if err2 != nil { + return errors.Wrap(err2, "close file") } - return err1 + return nil } func (f *MmapFile) File() *os.File { diff --git tsdb/fileutil/mmap_openbsd.go tsdb/fileutil/mmap_openbsd.go new file mode 100644 index 000000000..39b590ee5 --- /dev/null +++ tsdb/fileutil/mmap_openbsd.go @@ -0,0 +1,25 @@ +// Copyright 2021 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build openbsd +// +build openbsd + +package fileutil + +import ( + "golang.org/x/sys/unix" +) + +func (f *MmapFile) Sync() error { + return unix.Msync(f.b, unix.MS_SYNC) +} diff --git tsdb/fileutil/mmap_sync.go tsdb/fileutil/mmap_sync.go new file mode 100644 index 000000000..31fd98e6d --- /dev/null +++ tsdb/fileutil/mmap_sync.go @@ -0,0 +1,21 @@ +// Copyright 2021 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build !openbsd +// +build !openbsd + +package fileutil + +func (f *MmapFile) Sync() error { + return nil +} diff --git tsdb/fileutil/mmap_unix.go tsdb/fileutil/mmap_unix.go index 1fd7f48ff..c83a32011 100644 --- tsdb/fileutil/mmap_unix.go +++ tsdb/fileutil/mmap_unix.go @@ -22,10 +22,14 @@ import ( "golang.org/x/sys/unix" ) -func mmap(f *os.File, length int) ([]byte, error) { +func mmapRo(f *os.File, length int) ([]byte, error) { return unix.Mmap(int(f.Fd()), 0, length, unix.PROT_READ, unix.MAP_SHARED) } +func mmapRw(f *os.File, length int) ([]byte, error) { + return unix.Mmap(int(f.Fd()), 0, length, unix.PROT_READ|unix.PROT_WRITE, unix.MAP_SHARED) +} + func munmap(b []byte) (err error) { return unix.Munmap(b) } diff --git tsdb/fileutil/mmap_windows.go tsdb/fileutil/mmap_windows.go index b94226412..9caf36622 100644 --- tsdb/fileutil/mmap_windows.go +++ tsdb/fileutil/mmap_windows.go @@ -19,7 +19,26 @@ import ( "unsafe" ) -func mmap(f *os.File, size int) ([]byte, error) { +func mmapRw(f *os.File, size int) ([]byte, error) { + low, high := uint32(size), uint32(size>>32) + h, errno := syscall.CreateFileMapping(syscall.Handle(f.Fd()), nil, syscall.PAGE_READWRITE, high, low, nil) + if h == 0 { + return nil, os.NewSyscallError("CreateFileMapping", errno) + } + + addr, errno := syscall.MapViewOfFile(h, syscall.FILE_MAP_READ|syscall.FILE_MAP_WRITE, 0, 0, uintptr(size)) + if addr == 0 { + return nil, os.NewSyscallError("MapViewOfFile", errno) + } + + if err := syscall.CloseHandle(syscall.Handle(h)); err != nil { + return nil, os.NewSyscallError("CloseHandle", err) + } + + return (*[maxMapSize]byte)(unsafe.Pointer(addr))[:size], nil +} + +func mmapRo(f *os.File, size int) ([]byte, error) { low, high := uint32(size), uint32(size>>32) h, errno := syscall.CreateFileMapping(syscall.Handle(f.Fd()), nil, syscall.PAGE_READONLY, high, low, nil) if h == 0 { diff --git tsdb/fileutil/writer.go tsdb/fileutil/writer.go new file mode 100644 index 000000000..86c1504e4 --- /dev/null +++ tsdb/fileutil/writer.go @@ -0,0 +1,156 @@ +// Copyright 2021 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fileutil + +import ( + "errors" + "io" + "os" + "sync" +) + +type MmapWriter struct { + sync.Mutex + f *os.File + mf *MmapFile + buf []byte + wpos int + rpos int +} + +func NewMmapWriter(f *os.File) *MmapWriter { + return &MmapWriter{f: f} +} + +func NewMmapWriterWithSize(f *os.File, size int) (*MmapWriter, error) { + mw := NewMmapWriter(f) + err := mw.mmap(size) + return mw, err +} + +func (mw *MmapWriter) Bytes() []byte { + return mw.buf +} + +func (mw *MmapWriter) Sync() error { + if mw.mf != nil { + return mw.mf.Sync() + } + return nil +} + +func (mw *MmapWriter) Close() error { + mw.buf = nil + if mw.mf != nil { + return mw.mf.Close() + } + return nil +} + +func (mw *MmapWriter) mmap(size int) error { + err := mw.f.Truncate(int64(size)) + if err != nil { + return err + } + mf, err := OpenRwMmapFromFile(mw.f, size) + if err != nil { + return err + } + mw.mf = mf + mw.buf = mf.Bytes() + return nil +} + +func (mw *MmapWriter) resize(size int) error { + err := mw.f.Truncate(int64(size)) + if err != nil { + return err + } + err = mw.mf.resize(size) + if err != nil { + return err + } + mw.buf = mw.mf.Bytes() + return nil +} + +func (mw *MmapWriter) Seek(offset int64, whence int) (ret int64, err error) { + var abs int + switch whence { + case io.SeekStart: + abs = int(offset) + default: + return 0, errors.New("invalid whence") + } + if abs < 0 { + return 0, errors.New("negative position") + } + mw.Lock() + defer mw.Unlock() + mw.rpos = abs + return offset, nil +} + +func (mw *MmapWriter) Read(p []byte) (n int, err error) { + mw.Lock() + defer mw.Unlock() + if mw.rpos >= len(mw.buf) { + return 0, io.EOF + } + n = copy(p, mw.buf[mw.rpos:]) + mw.rpos += n + return +} + +func (mw *MmapWriter) Write(p []byte) (n int, err error) { + mw.Lock() + defer mw.Unlock() + if mw.mf == nil { + err = mw.mmap(len(p)) + if err != nil { + return + } + } + if len(p) > len(mw.buf)-mw.wpos { + err = mw.resize(mw.wpos + len(p)) + if err != nil { + return + } + } + + n = copy(mw.buf[mw.wpos:], p) + mw.wpos += n + err = mw.Sync() + return +} + +func (mw *MmapWriter) WriteAt(p []byte, pos int64) (n int, err error) { + mw.Lock() + defer mw.Unlock() + if mw.mf == nil { + err = mw.mmap(len(p) + int(pos)) + if err != nil { + return + } + } + if len(p)+int(pos) > len(mw.buf) { + err = mw.resize(len(p) + int(pos)) + if err != nil { + return + } + } + n = copy(mw.buf[pos:], p) + err = mw.Sync() + return +} diff --git tsdb/index/index.go tsdb/index/index.go index 29295c45f..451c80582 100644 --- tsdb/index/index.go +++ tsdb/index/index.go @@ -257,6 +257,7 @@ func (w *Writer) addPadding(size int) error { type FileWriter struct { f *os.File fbuf *bufio.Writer + mw *fileutil.MmapWriter pos uint64 name string } @@ -266,14 +267,20 @@ func NewFileWriter(name string) (*FileWriter, error) { if err != nil { return nil, err } + mw := fileutil.NewMmapWriter(f) return &FileWriter{ f: f, - fbuf: bufio.NewWriterSize(f, 1<<22), + fbuf: bufio.NewWriterSize(mw, 1<<22), + mw: mw, pos: 0, name: name, }, nil } +func (fw *FileWriter) Bytes() []byte { + return fw.mw.Bytes() +} + func (fw *FileWriter) Pos() uint64 { return fw.pos } @@ -304,7 +311,7 @@ func (fw *FileWriter) WriteAt(buf []byte, pos uint64) error { if err := fw.Flush(); err != nil { return err } - _, err := fw.f.WriteAt(buf, int64(pos)) + _, err := fw.mw.WriteAt(buf, int64(pos)) return err } @@ -326,7 +333,7 @@ func (fw *FileWriter) Close() error { if err := fw.Flush(); err != nil { return err } - if err := fw.f.Sync(); err != nil { + if err := fw.mw.Close(); err != nil { return err } return fw.f.Close() @@ -987,11 +994,11 @@ func (w *Writer) writePostings() error { if err := w.fP.Flush(); err != nil { return err } - if _, err := w.fP.f.Seek(0, 0); err != nil { + if _, err := w.fP.mw.Seek(0, 0); err != nil { return err } // Don't need to calculate a checksum, so can copy directly. - n, err := io.CopyBuffer(w.f.fbuf, w.fP.f, make([]byte, 1<<20)) + n, err := io.CopyBuffer(w.f.fbuf, w.fP.mw, make([]byte, 1<<20)) if err != nil { return err }