fileutil, wal: refactor file locking
File lock interface was more verbose than it needed to be while simultaneously making it difficult to support systems (e.g., Windows) that only permit locked writes on a single fd holding the lock.
This commit is contained in:
@ -14,16 +14,13 @@
|
|||||||
|
|
||||||
package fileutil
|
package fileutil
|
||||||
|
|
||||||
type Lock interface {
|
import (
|
||||||
// Name returns the name of the file.
|
"errors"
|
||||||
Name() string
|
"os"
|
||||||
// TryLock acquires exclusivity on the lock without blocking.
|
)
|
||||||
TryLock() error
|
|
||||||
// Lock acquires exclusivity on the lock.
|
var (
|
||||||
Lock() error
|
ErrLocked = errors.New("fileutil: file already locked")
|
||||||
// Unlock unlocks the lock.
|
)
|
||||||
Unlock() error
|
|
||||||
// Destroy should be called after Unlock to clean up
|
type LockedFile struct{ *os.File }
|
||||||
// the resources.
|
|
||||||
Destroy() error
|
|
||||||
}
|
|
||||||
|
@ -15,65 +15,31 @@
|
|||||||
package fileutil
|
package fileutil
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
|
||||||
"os"
|
"os"
|
||||||
"syscall"
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
func TryLockFile(path string, flag int, perm os.FileMode) (*LockedFile, error) {
|
||||||
ErrLocked = errors.New("file already locked")
|
if err := os.Chmod(path, syscall.DMEXCL|0600); err != nil {
|
||||||
)
|
return nil, err
|
||||||
|
}
|
||||||
type lock struct {
|
f, err := os.Open(path, flag, perm)
|
||||||
fname string
|
if err != nil {
|
||||||
file *os.File
|
return nil, ErrLocked
|
||||||
|
}
|
||||||
|
return &LockedFile{f}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *lock) Name() string {
|
func LockFile(path string, flag int, perm os.FileMode) (*LockedFile, error) {
|
||||||
return l.fname
|
if err := os.Chmod(path, syscall.DMEXCL|0600); err != nil {
|
||||||
}
|
return nil, err
|
||||||
|
|
||||||
func (l *lock) TryLock() error {
|
|
||||||
err := os.Chmod(l.fname, syscall.DMEXCL|0600)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
f, err := os.Open(l.fname)
|
|
||||||
if err != nil {
|
|
||||||
return ErrLocked
|
|
||||||
}
|
|
||||||
|
|
||||||
l.file = f
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *lock) Lock() error {
|
|
||||||
err := os.Chmod(l.fname, syscall.DMEXCL|0600)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
for {
|
for {
|
||||||
f, err := os.Open(l.fname)
|
f, err := os.OpenFile(path, flag, perm)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
l.file = f
|
return &LockedFile{f}, nil
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
time.Sleep(10 * time.Millisecond)
|
time.Sleep(10 * time.Millisecond)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *lock) Unlock() error {
|
|
||||||
return l.file.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *lock) Destroy() error {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewLock(file string) (Lock, error) {
|
|
||||||
l := &lock{fname: file}
|
|
||||||
return l, nil
|
|
||||||
}
|
|
||||||
|
@ -17,25 +17,11 @@
|
|||||||
package fileutil
|
package fileutil
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
|
||||||
"os"
|
"os"
|
||||||
"syscall"
|
"syscall"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
func TryLockFile(path string, flag int, perm os.FileMode) (*LockedFile, error) {
|
||||||
ErrLocked = errors.New("file already locked")
|
|
||||||
)
|
|
||||||
|
|
||||||
type lock struct {
|
|
||||||
fd int
|
|
||||||
file *os.File
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *lock) Name() string {
|
|
||||||
return l.file.Name()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *lock) TryLock() error {
|
|
||||||
var lock syscall.Flock_t
|
var lock syscall.Flock_t
|
||||||
lock.Start = 0
|
lock.Start = 0
|
||||||
lock.Len = 0
|
lock.Len = 0
|
||||||
@ -43,45 +29,34 @@ func (l *lock) TryLock() error {
|
|||||||
lock.Type = syscall.F_WRLCK
|
lock.Type = syscall.F_WRLCK
|
||||||
lock.Whence = 0
|
lock.Whence = 0
|
||||||
lock.Pid = 0
|
lock.Pid = 0
|
||||||
err := syscall.FcntlFlock(uintptr(l.fd), syscall.F_SETLK, &lock)
|
f, err := os.OpenFile(path, flag, perm)
|
||||||
if err != nil && err == syscall.EAGAIN {
|
|
||||||
return ErrLocked
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *lock) Lock() error {
|
|
||||||
var lock syscall.Flock_t
|
|
||||||
lock.Start = 0
|
|
||||||
lock.Len = 0
|
|
||||||
lock.Type = syscall.F_WRLCK
|
|
||||||
lock.Whence = 0
|
|
||||||
lock.Pid = 0
|
|
||||||
return syscall.FcntlFlock(uintptr(l.fd), syscall.F_SETLK, &lock)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *lock) Unlock() error {
|
|
||||||
var lock syscall.Flock_t
|
|
||||||
lock.Start = 0
|
|
||||||
lock.Len = 0
|
|
||||||
lock.Type = syscall.F_UNLCK
|
|
||||||
lock.Whence = 0
|
|
||||||
err := syscall.FcntlFlock(uintptr(l.fd), syscall.F_SETLK, &lock)
|
|
||||||
if err != nil && err == syscall.EAGAIN {
|
|
||||||
return ErrLocked
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *lock) Destroy() error {
|
|
||||||
return l.file.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewLock(file string) (Lock, error) {
|
|
||||||
f, err := os.OpenFile(file, os.O_WRONLY, 0600)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
l := &lock{int(f.Fd()), f}
|
if err := syscall.FcntlFlock(f.Fd(), syscall.F_SETLK, &lock); err != nil {
|
||||||
return l, nil
|
f.Close()
|
||||||
|
if err == syscall.EAGAIN {
|
||||||
|
err = ErrLocked
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &LockedFile{f}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func LockFile(path string, flag int, perm os.FileMode) (*LockedFile, error) {
|
||||||
|
var lock syscall.Flock_t
|
||||||
|
lock.Start = 0
|
||||||
|
lock.Len = 0
|
||||||
|
lock.Pid = 0
|
||||||
|
lock.Type = syscall.F_WRLCK
|
||||||
|
lock.Whence = 0
|
||||||
|
f, err := os.OpenFile(path, flag, perm)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if err = syscall.FcntlFlock(f.Fd(), syscall.F_SETLKW, &lock); err != nil {
|
||||||
|
f.Close()
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &LockedFile{f}, nil
|
||||||
}
|
}
|
||||||
|
@ -35,44 +35,38 @@ func TestLockAndUnlock(t *testing.T) {
|
|||||||
}()
|
}()
|
||||||
|
|
||||||
// lock the file
|
// lock the file
|
||||||
l, err := NewLock(f.Name())
|
l, err := LockFile(f.Name(), os.O_WRONLY, 0600)
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
defer l.Destroy()
|
|
||||||
err = l.Lock()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// try lock a locked file
|
// try lock a locked file
|
||||||
dupl, err := NewLock(f.Name())
|
if _, err = TryLockFile(f.Name(), os.O_WRONLY, 0600); err != ErrLocked {
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
err = dupl.TryLock()
|
|
||||||
if err != ErrLocked {
|
|
||||||
t.Errorf("err = %v, want %v", err, ErrLocked)
|
|
||||||
}
|
|
||||||
|
|
||||||
// unlock the file
|
// unlock the file
|
||||||
err = l.Unlock()
|
if err = l.Close(); err != nil {
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// try lock the unlocked file
|
// try lock the unlocked file
|
||||||
err = dupl.TryLock()
|
dupl, err := TryLockFile(f.Name(), os.O_WRONLY, 0600)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("err = %v, want %v", err, nil)
|
t.Errorf("err = %v, want %v", err, nil)
|
||||||
}
|
}
|
||||||
defer dupl.Destroy()
|
|
||||||
|
|
||||||
// blocking on locked file
|
// blocking on locked file
|
||||||
locked := make(chan struct{}, 1)
|
locked := make(chan struct{}, 1)
|
||||||
go func() {
|
go func() {
|
||||||
l.Lock()
|
bl, blerr := LockFile(f.Name(), os.O_WRONLY, 0600)
|
||||||
|
if blerr != nil {
|
||||||
|
t.Fatal(blerr)
|
||||||
|
}
|
||||||
locked <- struct{}{}
|
locked <- struct{}{}
|
||||||
|
if blerr = bl.Close(); blerr != nil {
|
||||||
|
t.Fatal(blerr)
|
||||||
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
select {
|
select {
|
||||||
@ -82,8 +76,7 @@ func TestLockAndUnlock(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// unlock
|
// unlock
|
||||||
err = dupl.Unlock()
|
if err = dupl.Close(); err != nil {
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -17,49 +17,33 @@
|
|||||||
package fileutil
|
package fileutil
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
|
||||||
"os"
|
"os"
|
||||||
"syscall"
|
"syscall"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
func TryLockFile(path string, flag int, perm os.FileMode) (*LockedFile, error) {
|
||||||
ErrLocked = errors.New("file already locked")
|
f, err := os.OpenFile(path, flag, perm)
|
||||||
)
|
|
||||||
|
|
||||||
type lock struct {
|
|
||||||
fd int
|
|
||||||
file *os.File
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *lock) Name() string {
|
|
||||||
return l.file.Name()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *lock) TryLock() error {
|
|
||||||
err := syscall.Flock(l.fd, syscall.LOCK_EX|syscall.LOCK_NB)
|
|
||||||
if err != nil && err == syscall.EWOULDBLOCK {
|
|
||||||
return ErrLocked
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *lock) Lock() error {
|
|
||||||
return syscall.Flock(l.fd, syscall.LOCK_EX)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *lock) Unlock() error {
|
|
||||||
return syscall.Flock(l.fd, syscall.LOCK_UN)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *lock) Destroy() error {
|
|
||||||
return l.file.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewLock(file string) (Lock, error) {
|
|
||||||
f, err := os.Open(file)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
l := &lock{int(f.Fd()), f}
|
if err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX|syscall.LOCK_NB); err != nil {
|
||||||
return l, nil
|
f.Close()
|
||||||
|
if err == syscall.EWOULDBLOCK {
|
||||||
|
err = ErrLocked
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &LockedFile{f}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func LockFile(path string, flag int, perm os.FileMode) (*LockedFile, error) {
|
||||||
|
f, err := os.OpenFile(path, flag, perm)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX); err != nil {
|
||||||
|
f.Close()
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &LockedFile{f}, err
|
||||||
}
|
}
|
||||||
|
@ -16,45 +16,17 @@
|
|||||||
|
|
||||||
package fileutil
|
package fileutil
|
||||||
|
|
||||||
import (
|
import "os"
|
||||||
"errors"
|
|
||||||
"os"
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
func TryLockFile(path string, flag int, perm os.FileMode) (*LockedFile, error) {
|
||||||
ErrLocked = errors.New("file already locked")
|
return LockFile(path, flag, perm)
|
||||||
)
|
|
||||||
|
|
||||||
type lock struct {
|
|
||||||
fd int
|
|
||||||
file *os.File
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *lock) Name() string {
|
func LockFile(path string, flag int, perm os.FileMode) (*LockedFile, error) {
|
||||||
return l.file.Name()
|
// TODO make this actually work
|
||||||
}
|
f, err := os.OpenFile(path, flag, perm)
|
||||||
|
|
||||||
func (l *lock) TryLock() error {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *lock) Lock() error {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *lock) Unlock() error {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *lock) Destroy() error {
|
|
||||||
return l.file.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewLock(file string) (Lock, error) {
|
|
||||||
f, err := os.Open(file)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
l := &lock{int(f.Fd()), f}
|
return &LockedFile{f}, nil
|
||||||
return l, nil
|
|
||||||
}
|
}
|
||||||
|
@ -40,32 +40,19 @@ func PurgeFile(dirname string, suffix string, max uint, interval time.Duration,
|
|||||||
sort.Strings(newfnames)
|
sort.Strings(newfnames)
|
||||||
for len(newfnames) > int(max) {
|
for len(newfnames) > int(max) {
|
||||||
f := path.Join(dirname, newfnames[0])
|
f := path.Join(dirname, newfnames[0])
|
||||||
l, err := NewLock(f)
|
l, err := TryLockFile(f, os.O_WRONLY, 0600)
|
||||||
if err != nil {
|
|
||||||
errC <- err
|
|
||||||
return
|
|
||||||
}
|
|
||||||
err = l.TryLock()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
err = os.Remove(f)
|
if err = os.Remove(f); err != nil {
|
||||||
if err != nil {
|
|
||||||
errC <- err
|
errC <- err
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
err = l.Unlock()
|
if err = l.Close(); err != nil {
|
||||||
if err != nil {
|
|
||||||
plog.Errorf("error unlocking %s when purging file (%v)", l.Name(), err)
|
plog.Errorf("error unlocking %s when purging file (%v)", l.Name(), err)
|
||||||
errC <- err
|
errC <- err
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
err = l.Destroy()
|
|
||||||
if err != nil {
|
|
||||||
plog.Errorf("error destroying lock %s when purging file (%v)", l.Name(), err)
|
|
||||||
errC <- err
|
|
||||||
return
|
|
||||||
}
|
|
||||||
plog.Infof("purged file %s successfully", f)
|
plog.Infof("purged file %s successfully", f)
|
||||||
newfnames = newfnames[1:]
|
newfnames = newfnames[1:]
|
||||||
}
|
}
|
||||||
|
@ -80,7 +80,7 @@ func TestPurgeFile(t *testing.T) {
|
|||||||
close(stop)
|
close(stop)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestPurgeFileHoldingLock(t *testing.T) {
|
func TestPurgeFileHoldingLockFile(t *testing.T) {
|
||||||
dir, err := ioutil.TempDir("", "purgefile")
|
dir, err := ioutil.TempDir("", "purgefile")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
@ -95,8 +95,8 @@ func TestPurgeFileHoldingLock(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// create a purge barrier at 5
|
// create a purge barrier at 5
|
||||||
l, err := NewLock(path.Join(dir, fmt.Sprintf("%d.test", 5)))
|
p := path.Join(dir, fmt.Sprintf("%d.test", 5))
|
||||||
err = l.Lock()
|
l, err := LockFile(p, os.O_WRONLY, 0600)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -127,12 +127,7 @@ func TestPurgeFileHoldingLock(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// remove the purge barrier
|
// remove the purge barrier
|
||||||
err = l.Unlock()
|
if err = l.Close(); err != nil {
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
err = l.Destroy()
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -31,14 +31,12 @@ type decoder struct {
|
|||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
br *bufio.Reader
|
br *bufio.Reader
|
||||||
|
|
||||||
c io.Closer
|
|
||||||
crc hash.Hash32
|
crc hash.Hash32
|
||||||
}
|
}
|
||||||
|
|
||||||
func newDecoder(rc io.ReadCloser) *decoder {
|
func newDecoder(r io.Reader) *decoder {
|
||||||
return &decoder{
|
return &decoder{
|
||||||
br: bufio.NewReader(rc),
|
br: bufio.NewReader(r),
|
||||||
c: rc,
|
|
||||||
crc: crc.New(0, crcTable),
|
crc: crc.New(0, crcTable),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -80,10 +78,6 @@ func (d *decoder) lastCRC() uint32 {
|
|||||||
return d.crc.Sum32()
|
return d.crc.Sum32()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *decoder) close() error {
|
|
||||||
return d.c.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
func mustUnmarshalEntry(d []byte) raftpb.Entry {
|
func mustUnmarshalEntry(d []byte) raftpb.Entry {
|
||||||
var e raftpb.Entry
|
var e raftpb.Entry
|
||||||
pbutil.MustUnmarshal(&e, d)
|
pbutil.MustUnmarshal(&e, d)
|
||||||
|
@ -36,7 +36,6 @@ func Repair(dirpath string) bool {
|
|||||||
rec := &walpb.Record{}
|
rec := &walpb.Record{}
|
||||||
|
|
||||||
decoder := newDecoder(f)
|
decoder := newDecoder(f)
|
||||||
defer decoder.close()
|
|
||||||
for {
|
for {
|
||||||
err := decoder.decode(rec)
|
err := decoder.decode(rec)
|
||||||
switch err {
|
switch err {
|
||||||
|
192
wal/wal.go
192
wal/wal.go
@ -70,16 +70,15 @@ type WAL struct {
|
|||||||
metadata []byte // metadata recorded at the head of each WAL
|
metadata []byte // metadata recorded at the head of each WAL
|
||||||
state raftpb.HardState // hardstate recorded at the head of WAL
|
state raftpb.HardState // hardstate recorded at the head of WAL
|
||||||
|
|
||||||
start walpb.Snapshot // snapshot to start reading
|
start walpb.Snapshot // snapshot to start reading
|
||||||
decoder *decoder // decoder to decode records
|
decoder *decoder // decoder to decode records
|
||||||
|
readClose io.Closer // closer for decode reader
|
||||||
|
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
f *os.File // underlay file opened for appending, sync
|
|
||||||
seq uint64 // sequence of the wal file currently used for writes
|
|
||||||
enti uint64 // index of the last entry saved to the wal
|
enti uint64 // index of the last entry saved to the wal
|
||||||
encoder *encoder // encoder to encode records
|
encoder *encoder // encoder to encode records
|
||||||
|
|
||||||
locks []fileutil.Lock // the file locks the WAL is holding (the name is increasing)
|
locks []*fileutil.LockedFile // the locked files the WAL holds (the name is increasing)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create creates a WAL ready for appending records. The given metadata is
|
// Create creates a WAL ready for appending records. The given metadata is
|
||||||
@ -94,26 +93,17 @@ func Create(dirpath string, metadata []byte) (*WAL, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
p := path.Join(dirpath, walName(0, 0))
|
p := path.Join(dirpath, walName(0, 0))
|
||||||
f, err := os.OpenFile(p, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0600)
|
f, err := fileutil.LockFile(p, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0600)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
l, err := fileutil.NewLock(f.Name())
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
if err = l.Lock(); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
w := &WAL{
|
w := &WAL{
|
||||||
dir: dirpath,
|
dir: dirpath,
|
||||||
metadata: metadata,
|
metadata: metadata,
|
||||||
seq: 0,
|
|
||||||
f: f,
|
|
||||||
encoder: newEncoder(f, 0),
|
encoder: newEncoder(f, 0),
|
||||||
}
|
}
|
||||||
w.locks = append(w.locks, l)
|
w.locks = append(w.locks, f)
|
||||||
if err := w.saveCrc(0); err != nil {
|
if err := w.saveCrc(0); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -157,60 +147,56 @@ func openAtIndex(dirpath string, snap walpb.Snapshot, write bool) (*WAL, error)
|
|||||||
return nil, ErrFileNotFound
|
return nil, ErrFileNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
// open the wal files for reading
|
// open the wal files
|
||||||
rcs := make([]io.ReadCloser, 0)
|
rcs := make([]io.ReadCloser, 0)
|
||||||
ls := make([]fileutil.Lock, 0)
|
ls := make([]*fileutil.LockedFile, 0)
|
||||||
for _, name := range names[nameIndex:] {
|
for _, name := range names[nameIndex:] {
|
||||||
f, err := os.Open(path.Join(dirpath, name))
|
p := path.Join(dirpath, name)
|
||||||
if err != nil {
|
if write {
|
||||||
return nil, err
|
l, err := fileutil.TryLockFile(p, os.O_RDWR, 0600)
|
||||||
}
|
if err != nil {
|
||||||
l, err := fileutil.NewLock(f.Name())
|
MultiReadCloser(rcs...).Close()
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
err = l.TryLock()
|
|
||||||
if err != nil {
|
|
||||||
if write {
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
ls = append(ls, l)
|
||||||
|
rcs = append(rcs, l)
|
||||||
|
} else {
|
||||||
|
rf, err := os.OpenFile(p, os.O_RDONLY, 0600)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
ls = append(ls, nil)
|
||||||
|
rcs = append(rcs, rf)
|
||||||
}
|
}
|
||||||
rcs = append(rcs, f)
|
|
||||||
ls = append(ls, l)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
rc := MultiReadCloser(rcs...)
|
rc := MultiReadCloser(rcs...)
|
||||||
|
c := rc
|
||||||
|
if write {
|
||||||
|
// write reuses the file descriptors from read; don't close so
|
||||||
|
// WAL can append without dropping the file lock
|
||||||
|
c = nil
|
||||||
|
}
|
||||||
|
|
||||||
// create a WAL ready for reading
|
// create a WAL ready for reading
|
||||||
w := &WAL{
|
w := &WAL{
|
||||||
dir: dirpath,
|
dir: dirpath,
|
||||||
start: snap,
|
start: snap,
|
||||||
decoder: newDecoder(rc),
|
decoder: newDecoder(rc),
|
||||||
locks: ls,
|
readClose: c,
|
||||||
|
locks: ls,
|
||||||
}
|
}
|
||||||
|
|
||||||
if write {
|
if write {
|
||||||
// open the last wal file for appending
|
if _, _, err := parseWalName(path.Base(w.tail().Name())); err != nil {
|
||||||
seq, _, err := parseWalName(names[len(names)-1])
|
|
||||||
if err != nil {
|
|
||||||
rc.Close()
|
rc.Close()
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
last := path.Join(dirpath, names[len(names)-1])
|
if err := fileutil.Preallocate(w.tail().File, segmentSizeBytes); err != nil {
|
||||||
|
|
||||||
f, err := os.OpenFile(last, os.O_WRONLY|os.O_APPEND, 0)
|
|
||||||
if err != nil {
|
|
||||||
rc.Close()
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
err = fileutil.Preallocate(f, segmentSizeBytes)
|
|
||||||
if err != nil {
|
|
||||||
rc.Close()
|
rc.Close()
|
||||||
plog.Errorf("failed to allocate space when creating new wal file (%v)", err)
|
plog.Errorf("failed to allocate space when creating new wal file (%v)", err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
w.f = f
|
|
||||||
w.seq = seq
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return w, nil
|
return w, nil
|
||||||
@ -275,7 +261,7 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
switch w.f {
|
switch w.tail() {
|
||||||
case nil:
|
case nil:
|
||||||
// We do not have to read out all entries in read mode.
|
// We do not have to read out all entries in read mode.
|
||||||
// The last record maybe a partial written one, so
|
// The last record maybe a partial written one, so
|
||||||
@ -298,17 +284,20 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
|
|||||||
}
|
}
|
||||||
|
|
||||||
// close decoder, disable reading
|
// close decoder, disable reading
|
||||||
w.decoder.close()
|
if w.readClose != nil {
|
||||||
|
w.readClose.Close()
|
||||||
|
w.readClose = nil
|
||||||
|
}
|
||||||
w.start = walpb.Snapshot{}
|
w.start = walpb.Snapshot{}
|
||||||
|
|
||||||
w.metadata = metadata
|
w.metadata = metadata
|
||||||
|
|
||||||
if w.f != nil {
|
if w.tail() != nil {
|
||||||
// create encoder (chain crc with the decoder), enable appending
|
// create encoder (chain crc with the decoder), enable appending
|
||||||
w.encoder = newEncoder(w.f, w.decoder.lastCRC())
|
w.encoder = newEncoder(w.tail(), w.decoder.lastCRC())
|
||||||
w.decoder = nil
|
|
||||||
lastIndexSaved.Set(float64(w.enti))
|
lastIndexSaved.Set(float64(w.enti))
|
||||||
}
|
}
|
||||||
|
w.decoder = nil
|
||||||
|
|
||||||
return metadata, state, ents, err
|
return metadata, state, ents, err
|
||||||
}
|
}
|
||||||
@ -321,23 +310,20 @@ func (w *WAL) cut() error {
|
|||||||
if err := w.sync(); err != nil {
|
if err := w.sync(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err := w.f.Close(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
fpath := path.Join(w.dir, walName(w.seq+1, w.enti+1))
|
fpath := path.Join(w.dir, walName(w.seq()+1, w.enti+1))
|
||||||
ftpath := fpath + ".tmp"
|
ftpath := fpath + ".tmp"
|
||||||
|
|
||||||
// create a temp wal file with name sequence + 1, or truncate the existing one
|
// create a temp wal file with name sequence + 1, or truncate the existing one
|
||||||
ft, err := os.OpenFile(ftpath, os.O_WRONLY|os.O_APPEND|os.O_CREATE|os.O_TRUNC, 0600)
|
newTail, err := fileutil.LockFile(ftpath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// update writer and save the previous crc
|
// update writer and save the previous crc
|
||||||
w.f = ft
|
w.locks = append(w.locks, newTail)
|
||||||
prevCrc := w.encoder.crc.Sum32()
|
prevCrc := w.encoder.crc.Sum32()
|
||||||
w.encoder = newEncoder(w.f, prevCrc)
|
w.encoder = newEncoder(w.tail(), prevCrc)
|
||||||
if err = w.saveCrc(prevCrc); err != nil {
|
if err = w.saveCrc(prevCrc); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -347,47 +333,28 @@ func (w *WAL) cut() error {
|
|||||||
if err = w.saveState(&w.state); err != nil {
|
if err = w.saveState(&w.state); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// close temp wal file
|
// atomically move temp wal file to wal file
|
||||||
if err = w.sync(); err != nil {
|
if err = w.sync(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err = w.f.Close(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// atomically move temp wal file to wal file
|
|
||||||
if err = os.Rename(ftpath, fpath); err != nil {
|
if err = os.Rename(ftpath, fpath); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
newTail.Close()
|
||||||
|
|
||||||
// open the wal file and update writer again
|
if newTail, err = fileutil.LockFile(fpath, os.O_WRONLY|os.O_APPEND, 0600); err != nil {
|
||||||
f, err := os.OpenFile(fpath, os.O_WRONLY|os.O_APPEND, 0600)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err = fileutil.Preallocate(f, segmentSizeBytes); err != nil {
|
w.locks[len(w.locks)-1] = newTail
|
||||||
|
|
||||||
|
prevCrc = w.encoder.crc.Sum32()
|
||||||
|
w.encoder = newEncoder(w.tail(), prevCrc)
|
||||||
|
|
||||||
|
if err = fileutil.Preallocate(w.tail().File, segmentSizeBytes); err != nil {
|
||||||
plog.Errorf("failed to allocate space when creating new wal file (%v)", err)
|
plog.Errorf("failed to allocate space when creating new wal file (%v)", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
w.f = f
|
|
||||||
prevCrc = w.encoder.crc.Sum32()
|
|
||||||
w.encoder = newEncoder(w.f, prevCrc)
|
|
||||||
|
|
||||||
// lock the new wal file
|
|
||||||
l, err := fileutil.NewLock(f.Name())
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := l.Lock(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
w.locks = append(w.locks, l)
|
|
||||||
|
|
||||||
// increase the wal seq
|
|
||||||
w.seq++
|
|
||||||
|
|
||||||
plog.Infof("segmented wal file %v is created", fpath)
|
plog.Infof("segmented wal file %v is created", fpath)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -399,7 +366,7 @@ func (w *WAL) sync() error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
err := fileutil.Fdatasync(w.f)
|
err := fileutil.Fdatasync(w.tail().File)
|
||||||
syncDurations.Observe(float64(time.Since(start)) / float64(time.Second))
|
syncDurations.Observe(float64(time.Since(start)) / float64(time.Second))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -438,8 +405,10 @@ func (w *WAL) ReleaseLockTo(index uint64) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for i := 0; i < smaller; i++ {
|
for i := 0; i < smaller; i++ {
|
||||||
w.locks[i].Unlock()
|
if w.locks[i] == nil {
|
||||||
w.locks[i].Destroy()
|
continue
|
||||||
|
}
|
||||||
|
w.locks[i].Close()
|
||||||
}
|
}
|
||||||
w.locks = w.locks[smaller:]
|
w.locks = w.locks[smaller:]
|
||||||
|
|
||||||
@ -450,22 +419,17 @@ func (w *WAL) Close() error {
|
|||||||
w.mu.Lock()
|
w.mu.Lock()
|
||||||
defer w.mu.Unlock()
|
defer w.mu.Unlock()
|
||||||
|
|
||||||
if w.f != nil {
|
if w.tail() != nil {
|
||||||
if err := w.sync(); err != nil {
|
if err := w.sync(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err := w.f.Close(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
for _, l := range w.locks {
|
for _, l := range w.locks {
|
||||||
err := l.Unlock()
|
if l == nil {
|
||||||
if err != nil {
|
continue
|
||||||
plog.Errorf("failed to unlock during closing wal: %s", err)
|
|
||||||
}
|
}
|
||||||
err = l.Destroy()
|
if err := l.Close(); err != nil {
|
||||||
if err != nil {
|
plog.Errorf("failed to unlock during closing wal: %s", err)
|
||||||
plog.Errorf("failed to destroy lock during closing wal: %s", err)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
@ -514,7 +478,7 @@ func (w *WAL) Save(st raftpb.HardState, ents []raftpb.Entry) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
fstat, err := w.f.Stat()
|
fstat, err := w.tail().Stat()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -524,6 +488,7 @@ func (w *WAL) Save(st raftpb.HardState, ents []raftpb.Entry) error {
|
|||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: add a test for this code path when refactoring the tests
|
// TODO: add a test for this code path when refactoring the tests
|
||||||
return w.cut()
|
return w.cut()
|
||||||
}
|
}
|
||||||
@ -549,6 +514,25 @@ func (w *WAL) saveCrc(prevCrc uint32) error {
|
|||||||
return w.encoder.encode(&walpb.Record{Type: crcType, Crc: prevCrc})
|
return w.encoder.encode(&walpb.Record{Type: crcType, Crc: prevCrc})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (w *WAL) tail() *fileutil.LockedFile {
|
||||||
|
if len(w.locks) > 0 {
|
||||||
|
return w.locks[len(w.locks)-1]
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *WAL) seq() uint64 {
|
||||||
|
t := w.tail()
|
||||||
|
if t == nil {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
seq, _, err := parseWalName(path.Base(t.Name()))
|
||||||
|
if err != nil {
|
||||||
|
plog.Fatalf("bad wal name %s (%v)", t.Name(), err)
|
||||||
|
}
|
||||||
|
return seq
|
||||||
|
}
|
||||||
|
|
||||||
func mustSync(st, prevst raftpb.HardState, entsnum int) bool {
|
func mustSync(st, prevst raftpb.HardState, entsnum int) bool {
|
||||||
// Persistent state on all servers:
|
// Persistent state on all servers:
|
||||||
// (Updated on stable storage before responding to RPCs)
|
// (Updated on stable storage before responding to RPCs)
|
||||||
|
@ -38,11 +38,11 @@ func TestNew(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err = %v, want nil", err)
|
t.Fatalf("err = %v, want nil", err)
|
||||||
}
|
}
|
||||||
if g := path.Base(w.f.Name()); g != walName(0, 0) {
|
if g := path.Base(w.tail().Name()); g != walName(0, 0) {
|
||||||
t.Errorf("name = %+v, want %+v", g, walName(0, 0))
|
t.Errorf("name = %+v, want %+v", g, walName(0, 0))
|
||||||
}
|
}
|
||||||
defer w.Close()
|
defer w.Close()
|
||||||
gd, err := ioutil.ReadFile(w.f.Name())
|
gd, err := ioutil.ReadFile(w.tail().Name())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err = %v, want nil", err)
|
t.Fatalf("err = %v, want nil", err)
|
||||||
}
|
}
|
||||||
@ -100,11 +100,11 @@ func TestOpenAtIndex(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err = %v, want nil", err)
|
t.Fatalf("err = %v, want nil", err)
|
||||||
}
|
}
|
||||||
if g := path.Base(w.f.Name()); g != walName(0, 0) {
|
if g := path.Base(w.tail().Name()); g != walName(0, 0) {
|
||||||
t.Errorf("name = %+v, want %+v", g, walName(0, 0))
|
t.Errorf("name = %+v, want %+v", g, walName(0, 0))
|
||||||
}
|
}
|
||||||
if w.seq != 0 {
|
if w.seq() != 0 {
|
||||||
t.Errorf("seq = %d, want %d", w.seq, 0)
|
t.Errorf("seq = %d, want %d", w.seq(), 0)
|
||||||
}
|
}
|
||||||
w.Close()
|
w.Close()
|
||||||
|
|
||||||
@ -119,11 +119,11 @@ func TestOpenAtIndex(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err = %v, want nil", err)
|
t.Fatalf("err = %v, want nil", err)
|
||||||
}
|
}
|
||||||
if g := path.Base(w.f.Name()); g != wname {
|
if g := path.Base(w.tail().Name()); g != wname {
|
||||||
t.Errorf("name = %+v, want %+v", g, wname)
|
t.Errorf("name = %+v, want %+v", g, wname)
|
||||||
}
|
}
|
||||||
if w.seq != 2 {
|
if w.seq() != 2 {
|
||||||
t.Errorf("seq = %d, want %d", w.seq, 2)
|
t.Errorf("seq = %d, want %d", w.seq(), 2)
|
||||||
}
|
}
|
||||||
w.Close()
|
w.Close()
|
||||||
|
|
||||||
@ -160,7 +160,7 @@ func TestCut(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
wname := walName(1, 1)
|
wname := walName(1, 1)
|
||||||
if g := path.Base(w.f.Name()); g != wname {
|
if g := path.Base(w.tail().Name()); g != wname {
|
||||||
t.Errorf("name = %s, want %s", g, wname)
|
t.Errorf("name = %s, want %s", g, wname)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -176,7 +176,7 @@ func TestCut(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
wname = walName(2, 2)
|
wname = walName(2, 2)
|
||||||
if g := path.Base(w.f.Name()); g != wname {
|
if g := path.Base(w.tail().Name()); g != wname {
|
||||||
t.Errorf("name = %s, want %s", g, wname)
|
t.Errorf("name = %s, want %s", g, wname)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -416,10 +416,10 @@ func TestOpenForRead(t *testing.T) {
|
|||||||
defer os.RemoveAll(p)
|
defer os.RemoveAll(p)
|
||||||
// create WAL
|
// create WAL
|
||||||
w, err := Create(p, nil)
|
w, err := Create(p, nil)
|
||||||
defer w.Close()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
defer w.Close()
|
||||||
// make 10 separate files
|
// make 10 separate files
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
es := []raftpb.Entry{{Index: uint64(i)}}
|
es := []raftpb.Entry{{Index: uint64(i)}}
|
||||||
@ -436,10 +436,10 @@ func TestOpenForRead(t *testing.T) {
|
|||||||
|
|
||||||
// All are available for read
|
// All are available for read
|
||||||
w2, err := OpenForRead(p, walpb.Snapshot{})
|
w2, err := OpenForRead(p, walpb.Snapshot{})
|
||||||
defer w2.Close()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
defer w2.Close()
|
||||||
_, _, ents, err := w2.ReadAll()
|
_, _, ents, err := w2.ReadAll()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err = %v, want nil", err)
|
t.Fatalf("err = %v, want nil", err)
|
||||||
|
Reference in New Issue
Block a user