From 143832344a71fd60df503d58fc6582b253845452 Mon Sep 17 00:00:00 2001 From: Steffen Siering Date: Mon, 18 Feb 2019 12:42:45 +0100 Subject: [PATCH] Fix Registrar not removing files when clean_removed is configured (#10747) This PR includes some changes that also did help to clean/stabilize the `text_clean_removed*` tests. Often times the tests have been stopped to early, due to races between the input file states, registrar states, and test code. After stabilizing the tests, I reliably (still needs a few runs) can trigger a data-synchronisation issue between the prospectors state and the registry. This is clearly a bug and results in state not being removed from the registry file at all. After some more testing (yay, heisenbug :/) I found a race condition between harvester shutdown, prospector state cleaning (e.g. if clean_removed is set) and registry updates. The sequence of events goes like this: ``` 1. harvester closes file 2. harvester updates prospectors local file states (marks file as finished) 3. prospector starts GC, cleaning old states 4. prospector sends 'remove state event' to the registrar (TTL=0, Finished=True). Note: state is not removed by prospector until next scan completes. 5. harvestar sends 'finished state event' to the registrar (TTL=-1, Finished=True) 6. registrar reads applies state updates: 1. change state to TTL=0, Finished=True 2. change state to TTL=-1, Finished=True 3. clean local state -> file is not removed from registry, due to TTL=-1 4. write registry ``` The change proposed changes the order the harvester cleanup propagates updates by sending the 'finished event' first before updating the prospectors state. This way we postpone the file being cleaned by the prospector and guarantee the 'finished event' and 'remove event' order (The prospector can not remove the state yet, because Finished is not set until after the 'finished event' has been published). Additions: - Add Registry class (accessed via `self.registry()`). This class is used to check/read the registry - Add InputLogs class for creating/apppending/removing log files used to drive the tests. This reduces path manipulations and reduces repetitive `with open(...) as f:` blocks. - Add LogState class. This class keeps an offset to start reading from. The offset can be advanced via `checkpoint` or is automatically advanced if `next` succeeds. Using `LogState` we can wait for a particular message to appear in the log, by ignoring old messages. This removes the need to have a count on actual messages. - Thanks to `LogState` and some minor adjustments to timings I did shave of like 10s from test_registrar.py on my machine. FB success count: - test_clean_removed: 5 - test_clean_removed_with_clean_inactive: 5 --- CHANGELOG.next.asciidoc | 2 + filebeat/harvester/source.go | 1 + filebeat/input/log/file.go | 7 +- filebeat/input/log/harvester.go | 6 +- filebeat/input/log/log.go | 5 +- filebeat/input/log/stdin.go | 1 + filebeat/registrar/registrar.go | 2 +- filebeat/tests/system/filebeat.py | 155 ++++++++++++++---- filebeat/tests/system/test_registrar.py | 202 +++++++++--------------- libbeat/common/file/file_other.go | 6 + libbeat/common/file/file_windows.go | 39 +++++ 11 files changed, 260 insertions(+), 166 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index d297377ce0c..5cb24e52e0d 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -186,6 +186,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fixed data types for roles and indices fields in `elasticsearch/audit` fileset {pull}10307[10307] - Ensure `source.address` is always populated by the nginx module (ECS). {pull}10418[10418] - Cover empty request data, url and version in Apache2 module{pull}10730[10730] +- Fix registry entries not being cleaned due to race conditions. {pull}10747[10747] +- Improve detection of file deletion on Windows. {pull}10747[10747] *Heartbeat* diff --git a/filebeat/harvester/source.go b/filebeat/harvester/source.go index 6c739ccacce..f7e3c0c6405 100644 --- a/filebeat/harvester/source.go +++ b/filebeat/harvester/source.go @@ -25,6 +25,7 @@ import ( type Source interface { io.ReadCloser Name() string + Removed() bool // check if source has been removed Stat() (os.FileInfo, error) Continuable() bool // can we continue processing after EOF? HasState() bool // does this source have a state? diff --git a/filebeat/input/log/file.go b/filebeat/input/log/file.go index 8c513e0e9d8..59d408c87ad 100644 --- a/filebeat/input/log/file.go +++ b/filebeat/input/log/file.go @@ -17,7 +17,11 @@ package log -import "os" +import ( + "os" + + "github.com/elastic/beats/libbeat/common/file" +) type File struct { *os.File @@ -25,3 +29,4 @@ type File struct { func (File) Continuable() bool { return true } func (File) HasState() bool { return true } +func (f File) Removed() bool { return file.IsRemoved(f.File) } diff --git a/filebeat/input/log/harvester.go b/filebeat/input/log/harvester.go index 07408ffe316..9197793c718 100644 --- a/filebeat/input/log/harvester.go +++ b/filebeat/input/log/harvester.go @@ -391,12 +391,12 @@ func (h *Harvester) SendStateUpdate() { return } - logp.Debug("harvester", "Update state: %s, offset: %v", h.state.Source, h.state.Offset) - h.states.Update(h.state) - d := util.NewData() d.SetState(h.state) h.publishState(d) + + logp.Debug("harvester", "Update state: %s, offset: %v", h.state.Source, h.state.Offset) + h.states.Update(h.state) } // shouldExportLine decides if the line is exported or not based on diff --git a/filebeat/input/log/log.go b/filebeat/input/log/log.go index ba13d91bbda..d0c14c45b81 100644 --- a/filebeat/input/log/log.go +++ b/filebeat/input/log/log.go @@ -152,10 +152,7 @@ func (f *Log) errorChecks(err error) error { if f.config.CloseRemoved { // Check if the file name exists. See https://github.com/elastic/filebeat/issues/93 - _, statErr := os.Stat(f.fs.Name()) - - // Error means file does not exist. - if statErr != nil { + if f.fs.Removed() { return ErrRemoved } } diff --git a/filebeat/input/log/stdin.go b/filebeat/input/log/stdin.go index aff8c2b43d7..07aeb008adb 100644 --- a/filebeat/input/log/stdin.go +++ b/filebeat/input/log/stdin.go @@ -44,3 +44,4 @@ func (p Pipe) Name() string { return p.File.Name() } func (p Pipe) Stat() (os.FileInfo, error) { return p.File.Stat() } func (p Pipe) Continuable() bool { return false } func (p Pipe) HasState() bool { return false } +func (p Pipe) Removed() bool { return false } diff --git a/filebeat/registrar/registrar.go b/filebeat/registrar/registrar.go index f128107b5fc..22fdd1b6066 100644 --- a/filebeat/registrar/registrar.go +++ b/filebeat/registrar/registrar.go @@ -408,7 +408,7 @@ func (r *Registrar) writeRegistry() error { } func writeTmpFile(baseName string, perm os.FileMode, states []file.State) (string, error) { - logp.Debug("registrar", "Write registry file: %s", baseName) + logp.Debug("registrar", "Write registry file: %s (%v)", baseName, len(states)) tempfile := baseName + ".new" f, err := os.OpenFile(tempfile, os.O_RDWR|os.O_CREATE|os.O_TRUNC|os.O_SYNC, perm) diff --git a/filebeat/tests/system/filebeat.py b/filebeat/tests/system/filebeat.py index df2b8483855..39c14518476 100644 --- a/filebeat/tests/system/filebeat.py +++ b/filebeat/tests/system/filebeat.py @@ -6,7 +6,7 @@ curdir = os.path.dirname(__file__) sys.path.append(os.path.join(curdir, '../../../libbeat/tests/system')) -from beat.beat import TestCase +from beat.beat import TestCase, TimeoutError, REGEXP_TYPE default_registry_file = 'registry/filebeat/data.json' @@ -22,27 +22,33 @@ def setUpClass(self): super(BaseTest, self).setUpClass() - def has_registry(self, name=None, data_path=None): - if not name: - name = default_registry_file - if not data_path: - data_path = self.working_dir + @property + def registry(self): + return self.access_registry() - dotFilebeat = os.path.join(data_path, name) - return os.path.isfile(dotFilebeat) + @property + def input_logs(self): + return InputLogs(os.path.join(self.working_dir, "log")) - def get_registry(self, name=None, data_path=None): - if not name: - name = default_registry_file - if not data_path: - data_path = self.working_dir + @property + def logs(self): + return self.log_access() - # Returns content of the registry file - dotFilebeat = os.path.join(data_path, name) - self.wait_until(cond=lambda: os.path.isfile(dotFilebeat)) + def access_registry(self, name=None, data_path=None): + data_path = data_path if data_path else self.working_dir + return Registry(data_path, name) - with open(dotFilebeat) as file: - return json.load(file) + def log_access(self, file=None): + file = file if file else self.beat_name + ".log" + return LogState(os.path.join(self.working_dir, file)) + + def has_registry(self, name=None, data_path=None): + return self.access_registry(name, data_path).exists() + + def get_registry(self, name=None, data_path=None, filter=None): + reg = self.access_registry(name, data_path) + self.wait_until(reg.exists) + return reg.load(filter=filter) def get_registry_entry_by_path(self, path): """ @@ -51,21 +57,112 @@ def get_registry_entry_by_path(self, path): If a path exists multiple times (which is possible because of file rotation) the most recent version is returned """ - registry = self.get_registry() - tmp_entry = None + def hasPath(entry): + return entry["source"] == path - # Checks all entries and returns the most recent one - for entry in registry: - if entry["source"] == path: - if tmp_entry == None: - tmp_entry = entry - else: - if tmp_entry["timestamp"] < entry["timestamp"]: - tmp_entry = entry + entries = self.get_registry(filter=hasPath) + entries.sort(key=lambda x: x["timestamp"]) - return tmp_entry + # return entry with largest timestamp + return None if len(entries) == 0 else entries[-1] def file_permissions(self, path): full_path = os.path.join(self.working_dir, path) return oct(stat.S_IMODE(os.lstat(full_path).st_mode)) + + +class InputLogs: + """ InputLogs is used to write and append to files which are read by filebeat. """ + + def __init__(self, home): + self.home = home + if not os.path.isdir(self.home): + os.mkdir(self.home) + + def write(self, name, contents): + self._write_to(name, 'w', contents) + + def append(self, name, contents): + self._write_to(name, 'a', contents) + + def size(self, name): + return os.path.getsize(self.path_of(name)) + + def _write_to(self, name, mode, contents): + with open(self.path_of(name), mode) as f: + f.write(contents) + + def remove(self, name): + os.remove(self.path_of(name)) + + def path_of(self, name): + return os.path.join(self.home, name) + + +class Registry: + """ Registry provides access to the registry used by filebeat to store its progress """ + + def __init__(self, home, name=None): + if not name: + name = default_registry_file + self.path = os.path.join(home, name) + + def exists(self): + return os.path.isfile(self.path) + + def load(self, filter=None): + with open(self.path) as f: + entries = json.load(f) + + if filter: + entries = [x for x in entries if filter(x)] + return entries + + def count(self, filter=None): + if not self.exists(): + return 0 + return len(self.load(filter=filter)) + + +class LogState: + def __init__(self, path): + self.path = path + self.off = 0 + + def checkpoint(self): + self.off = os.path.getsize(self.path) + + def lines(self, filter=None): + if not filter: + def filter(x): return True + with open(self.path, "r") as f: + f.seek(self.off) + return [l for l in f if filter(l)] + + def contains(self, msg, ignore_case=False, count=1): + if ignore_case: + msg = msg.lower() + + if type(msg) == REGEXP_TYPE: + def match(x): return msg.search(x) is not None + else: + def match(x): return x.find(msg) >= 0 + + pred = match + if ignore_case: + def pred(x): return match(x.lower()) + + return len(self.lines(filter=pred)) >= count + + def next(self, msg, ignore_case=False, count=1): + ok = self.contains(msg, ignore_case, count) + if ok: + self.checkpoint() + return ok + + def nextCheck(self, msg, ignore_case=False, count=1): + return lambda: self.next(msg, ignore_case, count) + + def check(self, msg, ignore_case=False, count=1): + return lambda: self.contains(msg, ignore_case, count) diff --git a/filebeat/tests/system/test_registrar.py b/filebeat/tests/system/test_registrar.py index fc1579745fc..f988867c3d4 100644 --- a/filebeat/tests/system/test_registrar.py +++ b/filebeat/tests/system/test_registrar.py @@ -3,9 +3,10 @@ import os import platform -import time +import re import shutil import stat +import time import unittest from filebeat import BaseTest @@ -778,73 +779,46 @@ def test_clean_inactive(self): """ self.render_config_template( path=os.path.abspath(self.working_dir) + "/log/input*", - clean_inactive="4s", + clean_inactive="3s", ignore_older="2s", close_inactive="0.2s", scan_frequency="0.1s" ) - os.mkdir(self.working_dir + "/log/") - testfile_path1 = self.working_dir + "/log/input1" - testfile_path2 = self.working_dir + "/log/input2" - testfile_path3 = self.working_dir + "/log/input3" - - with open(testfile_path1, 'w') as testfile1: - testfile1.write("first file\n") + file1 = "input1" + file2 = "input2" + file3 = "input3" - with open(testfile_path2, 'w') as testfile2: - testfile2.write("second file\n") + self.input_logs.write(file1, "first file\n") + self.input_logs.write(file2, "second file\n") filebeat = self.start_beat() - self.wait_until( - lambda: self.output_has(lines=2), - max_timeout=10) + self.wait_until(lambda: self.output_has(lines=2), max_timeout=10) # Wait until registry file is created - self.wait_until( - lambda: self.log_contains_count("Registry file updated") > 1, - max_timeout=15) - - # Syncing file on disk is always susceptible to timing issues. - time.sleep(1) - - data = self.get_registry() - assert len(data) == 2 + self.wait_until(lambda: self.registry.exists(), max_timeout=15) + assert self.registry.count() == 2 # Wait until states are removed from inputs - self.wait_until( - lambda: self.log_contains_count( - "State removed for") == 2, - max_timeout=15) - - with open(testfile_path3, 'w') as testfile3: - testfile3.write("2\n") + self.wait_until(self.logs.nextCheck("State removed for", count=2), max_timeout=15) # Write new file to make sure registrar is flushed again - self.wait_until( - lambda: self.output_has(lines=3), - max_timeout=30) + self.input_logs.write(file3, "third file\n") + self.wait_until(lambda: self.output_has(lines=3), max_timeout=30) - # Wait until states are removed from inputs - self.wait_until( - lambda: self.log_contains_count( - "State removed for") >= 3, - max_timeout=15) + # Wait until state of new file is removed + self.wait_until(self.logs.nextCheck("State removed for"), max_timeout=15) filebeat.check_kill_and_wait() # Check that the first two files were removed from the registry - data = self.get_registry() + data = self.registry.load() assert len(data) == 1, "Expected a single file but got: %s" % data # Make sure the last file in the registry is the correct one and has the correct offset - if os.name == "nt": - assert data[0]["offset"] == 3 - else: - assert data[0]["offset"] == 2 + assert data[0]["offset"] == self.input_logs.size(file3) - @unittest.skip("Skipped as flaky: https://github.com/elastic/beats/issues/7690") def test_clean_removed(self): """ Checks that files which were removed, the state is removed @@ -857,58 +831,43 @@ def test_clean_removed(self): close_removed=True ) - os.mkdir(self.working_dir + "/log/") - testfile_path1 = self.working_dir + "/log/input1" - testfile_path2 = self.working_dir + "/log/input2" + file1 = "input1" + file2 = "input2" - with open(testfile_path1, 'w') as testfile1: - testfile1.write("file to be removed\n") - - with open(testfile_path2, 'w') as testfile2: - testfile2.write("2\n") + self.input_logs.write(file1, "file to be removed\n") + self.input_logs.write(file2, "2\n") filebeat = self.start_beat() - self.wait_until( - lambda: self.output_has(lines=2), - max_timeout=10) + self.wait_until(lambda: self.output_has(lines=2), max_timeout=10) # Wait until registry file is created - self.wait_until(lambda: self.log_contains_count("Registry file updated") > 1) + self.wait_until(self.registry.exists) # Wait until registry is updated - self.wait_until(lambda: len(self.get_registry()) == 2) - assert len(self.get_registry()) == 2 + self.wait_until(lambda: self.registry.count() == 2) - os.remove(testfile_path1) + self.input_logs.remove(file1) # Wait until states are removed from inputs - self.wait_until(lambda: self.log_contains("Remove state for file as file removed")) + self.wait_until(self.logs.check("Remove state for file as file removed")) # Add one more line to make sure registry is written - with open(testfile_path2, 'a') as testfile2: - testfile2.write("make sure registry is written\n") - - self.wait_until( - lambda: self.output_has(lines=3), - max_timeout=10) + self.input_logs.append(file2, "make sure registry is written\n") + self.wait_until(lambda: self.output_has(lines=3), max_timeout=10) # Make sure all states are cleaned up - self.wait_until(lambda: self.log_contains("Before: 1, After: 1, Pending: 0")) + self.wait_until(self.logs.nextCheck(re.compile("Registrar.*After: 1"))) filebeat.check_kill_and_wait() # Check that the first to files were removed from the registry - data = self.get_registry() + data = self.registry.load() assert len(data) == 1 # Make sure the last file in the registry is the correct one and has the correct offset - if os.name == "nt": - assert data[0]["offset"] == len("make sure registry is written\n" + "2\n") + 2 - else: - assert data[0]["offset"] == len("make sure registry is written\n" + "2\n") + assert data[0]["offset"] == self.input_logs.size(file2) - @unittest.skip('flaky test https://github.com/elastic/beats/issues/9215') def test_clean_removed_with_clean_inactive(self): """ Checks that files which were removed, the state is removed @@ -918,59 +877,58 @@ def test_clean_removed_with_clean_inactive(self): path=os.path.abspath(self.working_dir) + "/log/input*", scan_frequency="0.1s", clean_removed=True, - clean_inactive="20s", + clean_inactive="60s", ignore_older="15s", close_removed=True ) - os.mkdir(self.working_dir + "/log/") - testfile_path1 = self.working_dir + "/log/input1" - testfile_path2 = self.working_dir + "/log/input2" - - with open(testfile_path1, 'w') as testfile1: - testfile1.write("file to be removed\n") - - with open(testfile_path2, 'w') as testfile2: - testfile2.write("2\n") + file1 = "input1" + file2 = "input2" + contents2 = [ + "2\n", + "make sure registry is written\n", + ] + self.input_logs.write(file1, "file to be removed\n") + self.input_logs.write(file2, contents2[0]) filebeat = self.start_beat() - self.wait_until( - lambda: self.output_has(lines=2), - max_timeout=10) + self.wait_until(lambda: self.output_has(lines=2), max_timeout=10) # Wait until registry file is created self.wait_until( - lambda: self.log_contains_count("Registry file updated. 2 states written.") > 0, + self.logs.nextCheck("Registry file updated. 2 states written."), max_timeout=15) - data = self.get_registry() - assert len(data) == 2 + count = self.registry.count() + print("registry size: {}".format(count)) + assert count == 2 - os.remove(testfile_path1) + self.input_logs.remove(file1) # Wait until states are removed from inputs - self.wait_until(lambda: self.log_contains("Remove state for file as file removed")) + self.wait_until(self.logs.nextCheck("Remove state for file as file removed")) # Add one more line to make sure registry is written - with open(testfile_path2, 'a') as testfile2: - testfile2.write("make sure registry is written\n") + self.input_logs.append(file2, contents2[1]) self.wait_until(lambda: self.output_has(lines=3)) - # Check is > as the same log line might happen before but afterwards it is repeated - self.wait_until(lambda: self.log_contains_count("Before: 1, After: 1, Pending: 1") > 5) + + # wait until next gc and until registry file has been updated + self.wait_until(self.logs.check("Before: 1, After: 1, Pending: 1")) + self.wait_until(self.logs.nextCheck("Registry file updated. 1 states written.")) + count = self.registry.count() + print("registry size after remove: {}".format(count)) + assert count == 1 filebeat.check_kill_and_wait() - # Check that the first to files were removed from the registry - data = self.get_registry() + # Check that the first two files were removed from the registry + data = self.registry.load() assert len(data) == 1 # Make sure the last file in the registry is the correct one and has the correct offset - if os.name == "nt": - assert data[0]["offset"] == len("make sure registry is written\n" + "2\n") + 2 - else: - assert data[0]["offset"] == len("make sure registry is written\n" + "2\n") + assert data[0]["offset"] == self.input_logs.size(file2) def test_symlink_failure(self): """ @@ -1044,56 +1002,44 @@ def test_restart_state(self): self.render_config_template( path=os.path.abspath(self.working_dir) + "/log/*", - close_inactive="1s", - ignore_older="3s", + close_inactive="200ms", + ignore_older="2000ms", ) - os.mkdir(self.working_dir + "/log/") - testfile_path1 = self.working_dir + "/log/test1.log" - testfile_path2 = self.working_dir + "/log/test2.log" - testfile_path3 = self.working_dir + "/log/test3.log" - testfile_path4 = self.working_dir + "/log/test4.log" + init_files = ["test"+str(i)+".log" for i in range(3)] + restart_files = ["test"+str(i+3)+".log" for i in range(1)] - with open(testfile_path1, 'w') as testfile1: - testfile1.write("Hello World\n") - with open(testfile_path2, 'w') as testfile2: - testfile2.write("Hello World\n") - with open(testfile_path3, 'w') as testfile3: - testfile3.write("Hello World\n") + for name in init_files: + self.input_logs.write(name, "Hello World\n") filebeat = self.start_beat() # Make sure states written appears one more time self.wait_until( - lambda: self.log_contains("Ignore file because ignore_older"), + self.logs.check("Ignore file because ignore_older"), max_timeout=10) filebeat.check_kill_and_wait() self.render_config_template( path=os.path.abspath(self.working_dir) + "/log/*", - close_inactive="1s", - ignore_older="3s", - clean_inactive="5s", + close_inactive="200ms", + ignore_older="2000ms", + clean_inactive="3s", ) filebeat = self.start_beat(output="filebeat2.log") + logs = self.log_access("filebeat2.log") # Write additional file - with open(testfile_path4, 'w') as testfile4: - testfile4.write("Hello World\n") + for name in restart_files: + self.input_logs.write(name, "Hello World\n") # Make sure all 4 states are persisted - self.wait_until( - lambda: self.log_contains( - "input states cleaned up. Before: 4, After: 4", logfile="filebeat2.log"), - max_timeout=10) + self.wait_until(logs.nextCheck("input states cleaned up. Before: 4, After: 4")) # Wait until registry file is cleaned - self.wait_until( - lambda: self.log_contains( - "input states cleaned up. Before: 0, After: 0", logfile="filebeat2.log"), - max_timeout=10) + self.wait_until(logs.nextCheck("input states cleaned up. Before: 0, After: 0")) filebeat.check_kill_and_wait() diff --git a/libbeat/common/file/file_other.go b/libbeat/common/file/file_other.go index 1ac4ecb2cd4..894183cee32 100644 --- a/libbeat/common/file/file_other.go +++ b/libbeat/common/file/file_other.go @@ -62,3 +62,9 @@ func ReadOpen(path string) (*os.File, error) { perm := os.FileMode(0) return os.OpenFile(path, flag, perm) } + +// IsRemoved checks wheter the file held by f is removed. +func IsRemoved(f *os.File) bool { + _, err := os.Stat(f.Name()) + return err != nil +} diff --git a/libbeat/common/file/file_windows.go b/libbeat/common/file/file_windows.go index 6572fa78862..55f6ef4e1b2 100644 --- a/libbeat/common/file/file_windows.go +++ b/libbeat/common/file/file_windows.go @@ -23,6 +23,9 @@ import ( "reflect" "strconv" "syscall" + "unsafe" + + "golang.org/x/sys/windows" ) type StateOS struct { @@ -31,6 +34,12 @@ type StateOS struct { Vol uint64 `json:"vol,"` } +var ( + modkernel32 = windows.NewLazyDLL("kernel32.dll") + + procGetFileInformationByHandleEx = modkernel32.NewProc("GetFileInformationByHandleEx") +) + // GetOSState returns the platform specific StateOS func GetOSState(info os.FileInfo) StateOS { // os.SameFile must be called to populate the id fields. Otherwise in case for example @@ -107,3 +116,33 @@ func ReadOpen(path string) (*os.File, error) { return os.NewFile(uintptr(handle), path), nil } + +// IsRemoved checks wheter the file held by f is removed. +// On Windows IsRemoved reads the DeletePending flags using the GetFileInformationByHandleEx. +// A file is not removed/unlinked as long as at least one process still own a +// file handle. A delete file is only marked as deleted, and file attributes +// can still be read. Only opening a file marked with 'DeletePending' will +// fail. +func IsRemoved(f *os.File) bool { + hdl := f.Fd() + if hdl == uintptr(syscall.InvalidHandle) { + return false + } + + info := struct { + AllocationSize int64 + EndOfFile int64 + NumberOfLinks int32 + DeletePending bool + Directory bool + }{} + infoSz := unsafe.Sizeof(info) + + const class = 1 // FileStandardInfo + r1, _, _ := syscall.Syscall6( + procGetFileInformationByHandleEx.Addr(), 4, uintptr(hdl), class, uintptr(unsafe.Pointer(&info)), infoSz, 0, 0) + if r1 == 0 { + return true // assume file is removed if syscall errors + } + return info.DeletePending +}