Refactored
This commit is contained in:
195
wo/core/logwatch.py
Normal file
195
wo/core/logwatch.py
Normal file
@@ -0,0 +1,195 @@
|
||||
|
||||
"""
|
||||
Real time log files watcher supporting log rotation.
|
||||
"""
|
||||
|
||||
import os
|
||||
import time
|
||||
import errno
|
||||
import stat
|
||||
from wo.core.logging import Log
|
||||
|
||||
|
||||
class LogWatcher(object):
|
||||
"""Looks for changes in all files of a directory.
|
||||
This is useful for watching log file changes in real-time.
|
||||
It also supports files rotation.
|
||||
|
||||
Example:
|
||||
|
||||
>>> def callback(filename, lines):
|
||||
... print filename, lines
|
||||
...
|
||||
>>> l = LogWatcher("/var/www/example.com/logs", callback)
|
||||
>>> l.loop()
|
||||
"""
|
||||
|
||||
def __init__(self, filelist, callback, extensions=["log"], tail_lines=0):
|
||||
"""Arguments:
|
||||
|
||||
(str) @folder:
|
||||
the folder to watch
|
||||
|
||||
(callable) @callback:
|
||||
a function which is called every time a new line in a
|
||||
file being watched is found;
|
||||
this is called with "filename" and "lines" arguments.
|
||||
|
||||
(list) @extensions:
|
||||
only watch files with these extensions
|
||||
|
||||
(int) @tail_lines:
|
||||
read last N lines from files being watched before starting
|
||||
"""
|
||||
self.files_map = {}
|
||||
self.filelist = filelist
|
||||
self.callback = callback
|
||||
# self.folder = os.path.realpath(folder)
|
||||
self.extensions = extensions
|
||||
# assert (os.path.isdir(self.folder), "%s does not exists"
|
||||
# % self.folder)
|
||||
for file in self.filelist:
|
||||
assert (os.path.isfile(file))
|
||||
assert callable(callback)
|
||||
self.update_files()
|
||||
# The first time we run the script we move all file markers at EOF.
|
||||
# In case of files created afterwards we don't do this.
|
||||
for id, file in list(iter(self.files_map.items())):
|
||||
file.seek(os.path.getsize(file.name)) # EOF
|
||||
if tail_lines:
|
||||
lines = self.tail(file.name, tail_lines)
|
||||
if lines:
|
||||
self.callback(file.name, lines)
|
||||
|
||||
def __del__(self):
|
||||
self.close()
|
||||
|
||||
def loop(self, interval=0.1, async=False):
|
||||
"""Start the loop.
|
||||
If async is True make one loop then return.
|
||||
"""
|
||||
while 1:
|
||||
self.update_files()
|
||||
for fid, file in list(iter(self.files_map.items())):
|
||||
self.readfile(file)
|
||||
if async:
|
||||
return
|
||||
time.sleep(interval)
|
||||
|
||||
def log(self, line):
|
||||
"""Log when a file is un/watched"""
|
||||
print(line)
|
||||
|
||||
# def listdir(self):
|
||||
# """List directory and filter files by extension.
|
||||
# You may want to override this to add extra logic or
|
||||
# globbling support.
|
||||
# """
|
||||
# ls = os.listdir(self.folder)
|
||||
# if self.extensions:
|
||||
# return ([x for x in ls if os.path.splitext(x)[1][1:]
|
||||
# in self.extensions])
|
||||
# else:
|
||||
# return ls
|
||||
|
||||
@staticmethod
|
||||
def tail(fname, window):
|
||||
"""Read last N lines from file fname."""
|
||||
try:
|
||||
f = open(fname, encoding='utf-8', mode='r')
|
||||
except IOError as err:
|
||||
if err.errno == errno.ENOENT:
|
||||
return []
|
||||
else:
|
||||
raise
|
||||
else:
|
||||
BUFSIZ = 1024
|
||||
f.seek(0, os.SEEK_END)
|
||||
fsize = f.tell()
|
||||
block = -1
|
||||
data = ""
|
||||
exit = False
|
||||
while not exit:
|
||||
step = (block * BUFSIZ)
|
||||
if abs(step) >= fsize:
|
||||
f.seek(0)
|
||||
exit = True
|
||||
else:
|
||||
f.seek(step, os.SEEK_END)
|
||||
data = f.read().strip()
|
||||
if data.count('\n') >= window:
|
||||
break
|
||||
else:
|
||||
block -= 1
|
||||
return data.splitlines()[-window:]
|
||||
|
||||
def update_files(self):
|
||||
ls = []
|
||||
for name in self.filelist:
|
||||
absname = os.path.realpath(os.path.join(name))
|
||||
try:
|
||||
st = os.stat(absname)
|
||||
except EnvironmentError as err:
|
||||
if err.errno != errno.ENOENT:
|
||||
raise
|
||||
else:
|
||||
if not stat.S_ISREG(st.st_mode):
|
||||
continue
|
||||
fid = self.get_file_id(st)
|
||||
ls.append((fid, absname))
|
||||
|
||||
# check existent files
|
||||
for fid, file in list(iter(self.files_map.items())):
|
||||
# next(iter(graph.items()))
|
||||
try:
|
||||
st = os.stat(file.name)
|
||||
except EnvironmentError as err:
|
||||
if err.errno == errno.ENOENT:
|
||||
self.unwatch(file, fid)
|
||||
else:
|
||||
raise
|
||||
else:
|
||||
if fid != self.get_file_id(st):
|
||||
# same name but different file (rotation); reload it.
|
||||
self.unwatch(file, fid)
|
||||
self.watch(file.name)
|
||||
|
||||
# add new ones
|
||||
for fid, fname in ls:
|
||||
if fid not in self.files_map:
|
||||
self.watch(fname)
|
||||
|
||||
def readfile(self, file):
|
||||
lines = file.readlines()
|
||||
if lines:
|
||||
self.callback(file.name, lines)
|
||||
|
||||
def watch(self, fname):
|
||||
try:
|
||||
file = open(fname, encoding='utf-8', mode='r')
|
||||
fid = self.get_file_id(os.stat(fname))
|
||||
except EnvironmentError as err:
|
||||
if err.errno != errno.ENOENT:
|
||||
raise
|
||||
else:
|
||||
self.log("watching logfile %s" % fname)
|
||||
self.files_map[fid] = file
|
||||
|
||||
def unwatch(self, file, fid):
|
||||
# file no longer exists; if it has been renamed
|
||||
# try to read it for the last time in case the
|
||||
# log rotator has written something in it.
|
||||
lines = self.readfile(file)
|
||||
self.log("un-watching logfile %s" % file.name)
|
||||
del self.files_map[fid]
|
||||
if lines:
|
||||
self.callback(file.name, lines)
|
||||
|
||||
@staticmethod
|
||||
def get_file_id(st):
|
||||
return "%xg%x" % (st.st_dev, st.st_ino)
|
||||
|
||||
def close(self):
|
||||
for id, file in list(iter(self.files_map.items())):
|
||||
file.close()
|
||||
self.files_map.clear()
|
||||
Reference in New Issue
Block a user