#!/usr/bin/python3 ############################################################################## # # (C) Alexis Huxley 2019-2024 - distributed under GPL v3 # # This program searches for processes belonging to the person running this # script that have video files open and being read. If it finds any, then # it calls 'xscreensaver-command -activate', which makes xscreensaver think # that the user is active and that therefore the screensaver should not # blank/power-off the screen. # # It is primarily intended for XFCE users using mediaplayers that don't # themselves send xscreensaver such 'keepalive' messages. # ############################################################################## # Modules import sys import os import re import subprocess import time import getopt # Globals progname = os.path.split(sys.argv[0])[1] compiled_re_pid = re.compile('^\d+$') compiled_re_xset_output = re.compile('.*\n DPMS is (Enabled|Disabled)\n.*', flags=re.MULTILINE|re.DOTALL) compiled_re_display_envvar = re.compile('^:(\d+)(?:\.\d+)?$') # Instantiate options global verboselevel, re_player_filename, re_movie_filename, compiled_re_player_filename, compiled_re_movie_filename, desired_dpms_state, check_period_movie, check_period_parent, check_period_dpms def main(): global verboselevel, compiled_re_movie_filename, compiled_re_player_filename, desired_dpms_state, re_movie_filename, re_player_filename # Instantiate empty arrays and dicts. checks_due_timestamps = {} check_periods = {} # Defaults for options verboselevel = 2 re_movie_filename = '^(?:/dev/sr\d|.*\.(?:avi|bup|flv|m4v|mkv|mp4|mpg|ogv|vob|wmv|mov))$' re_player_filename = '^.*$' desired_dpms_state = None check_period_movie = 30 check_period_parent = 5 check_period_dpms = 30 # Process options try: opts, args = getopt.getopt(sys.argv[1:], "hd:vm:p:", ["help", "debug=", "verbose", "re-movie=", "re-player=", "dpms="]) except getopt.GetoptError as err: usage() for opt, optarg in opts: if opt in ("-v", "--verbose"): verboselevel = 3 elif opt in ("-h", "--help"): usage(0) elif opt in ("-d", "--debug"): try: verboselevel = int(optarg) except ValueError: usage() elif opt in ("-m", "--re-movie"): re_movie_filename = optarg elif opt in ("-p", "--re-player"): re_player_filename = optarg elif opt in ("--dpms"): if optarg == "enabled": desired_dpms_state = True elif optarg == "disabled": desired_dpms_state = False else: usage() else: assert False, "unhandled option" sys.argv = args # Process arguments debug(10, 'checking arguments ...') if len(sys.argv) != 0: usage() # Sanity checks and derivations if os.getuid == 0: error('don\'t run this as root') # Get the PID of the X server (this must be done before locking since the lock file name is based on that PID) xserver_pid = get_xserver_pid() # Set the control for the loop below very early; specifically before all sleeps and before the # setting up of the signal handler that might toggle it. do_loop = True # Check we can talk to X server debug(10,'main: checking we talk to X server ...') child_sp = subprocess.Popen('xlsfonts'.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE) output = child_sp.communicate()[0] rc = child_sp.returncode if rc != 0: error('can\'t talk to X server') compiled_re_movie_filename = re.compile(re_movie_filename) compiled_re_player_filename = re.compile(re_player_filename) # Guts # Initialise loop checks_due_timestamps['parent'] = 0 check_periods['parent'] = check_period_parent checks_due_timestamps['movie'] = 0 check_periods['movie'] = check_period_movie if desired_dpms_state is not None: checks_due_timestamps['dpms'] = 0 check_periods['dpms'] = check_period_dpms previous_running_movie_offsets = {} have_previous_running_movie_offsets_flag = False current_timestamp = 0 # Enter loop debug(10, 'main: entering monitoring loop ...') while do_loop: # Work out which check is due next and when and wait until then. soonest_check = min(checks_due_timestamps, key=checks_due_timestamps.get) if checks_due_timestamps[soonest_check] > current_timestamp: sleep_period = checks_due_timestamps[soonest_check] - current_timestamp else: sleep_period = 0 debug(10, 'main: will check %s in %fs ...' % (soonest_check, sleep_period)) time.sleep(sleep_period) # If checking if movies are running ... if soonest_check == 'movie': # ... get position in all open movies ... running_movie_offsets = get_running_movie_offsets() debug(10, 'main: running_movie_offsets is %s' % (running_movie_offsets)) # ... and if we have already collected info on open movie positions, # and they differ from last time, then nudge screensaver, so it doesn't kick in. if have_previous_running_movie_offsets_flag and running_movie_offsets != previous_running_movie_offsets: debug(10, 'main: movie offsets have changed; nudging xscreensaver ...') child_sp = subprocess.Popen('xscreensaver-command -deactivate'.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE) output = child_sp.communicate()[0] rc = child_sp.returncode if rc != 0: error('xscreensaver-command: failed') else: debug(10, 'main: movie offsets have *not* changed; no action necessary') previous_running_movie_offsets = running_movie_offsets have_previous_running_movie_offsets_flag = True # If checking if parent is still alive ... elif soonest_check == 'parent': # ... check if parent still alive and flag to exit loop if not. if not os.path.isdir('/proc/' + str(xserver_pid)): do_loop = False else: debug(10, 'main: parent process is still running; no action necessary') # If checking if somebody re-enabled DPMS ... elif soonest_check == 'dpms': # ... extract answer from output of 'xset q' ... child_sp = subprocess.Popen('xset q'.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE) output = child_sp.communicate()[0] rc = child_sp.returncode restuff = compiled_re_xset_output.search(output) if restuff is None: error('xset: output unparsable') # ... and re-disable it if necessary. (X should not turn monitor off; xscreensaver should.) if restuff.group(1) == 'Enabled' and not desired_dpms_state: warning('main: DPMS is enabled! re-disabling it ...') child_sp = subprocess.Popen('xset -dpms'.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE) output = child_sp.communicate()[0] rc = child_sp.returncode if rc != 0: error('xset -dpms: failed') elif restuff.group(1) == 'Disabled' and desired_dpms_state: warning('main: DPMS is disabled! re-enabling it ...') child_sp = subprocess.Popen('xset +dpms'.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE) output = child_sp.communicate()[0] rc = child_sp.returncode if rc != 0: error('xset +dpms: failed') # Set the time when the just-done check should next be done. current_timestamp = time.time() checks_due_timestamps[soonest_check] = current_timestamp + check_periods[soonest_check] return 0 def get_xserver_pid(): # Check DISPLAY set and sensible. if not 'DISPLAY' in os.environ: error('DISPLAY: not set') if len(os.environ['DISPLAY']) == 0: error('DISPLAY: set but empty') restuff = compiled_re_display_envvar.search(os.environ['DISPLAY']) if restuff is None: error('DISPLAY: failed to parse') # Read X server pid from lock file debug(10, 'get_xserver_pid: display no = %s' % (restuff.group(1))) x_lock_file = '/tmp/.X' + restuff.group(1) + '-lock' try: fh = open(x_lock_file, 'r') except IOError: error('%s: can\'t open' % (x_lock_file)) pid = int(fh.readline()[:-1].lstrip(' ')) fh.close() debug(10, 'get_xserver_pid: pid=%d' % (pid)) # Check lock file not stale. if not os.path.isdir(os.path.join('/proc', str(pid))): error('stale X lock file?') # Return pid to caller. return pid def get_running_movie_offsets(): global compiled_re_pid, compiled_re_movie_filename running_movie_offsets = { } # For each running process ... for pid in os.listdir('/proc'): # ... filter and canonicalise ... if compiled_re_pid.search(pid) is None: continue pathname = os.path.join('/proc', pid) try: pathname_stat = os.stat(pathname) except OSError: continue if pathname_stat.st_uid != os.getuid(): continue # Skip if not directory. (stat module has convenience functions; os.stat # seemingly does not.) if not oct(pathname_stat.st_mode) and 0o040000: continue # Compare compiled_re_player_filename with target of /proc/$PID/exe symlink. try: exe = os.readlink(os.path.join(pathname, 'exe')) except OSError: continue if compiled_re_player_filename.search(exe) is None: continue # ... look at what files it has open ... try: fds = os.listdir(os.path.join(pathname, 'fd')) except OSError: fds = [] for fd in fds: try: openfilename = os.readlink(os.path.join(pathname, 'fd', fd)) except OSError: continue # ... ignore non-movies ... if compiled_re_movie_filename.search(openfilename) is None: continue # ... get how much read from movie file ... try: fh = open(os.path.join(pathname,'fdinfo',fd)) except OSError: continue # ... and record that information. running_movie_offsets[openfilename] = int(fh.readline()[5:][:-1]) fh.close() # Pass back to caller a dictionary of movie-files/offsets. return running_movie_offsets # Messaging functions. def usage(rc = 1): global re_movie_filename, re_player_filename, verbosellevel usage_text = u'''\ Usage: %s [ ] Options: -v | --verbose be verbose -d | --debug= be more verbose (default: %d) -h | --help display help -m | --re-movie= set regexp for movie files (default: %s) -p | --re-player= set regexp for player commands (default: %s) --dpms={enabled|disabled} keep DPMS in specified state (default: not monitored) ''' % (progname, verboselevel, re_movie_filename, re_player_filename) if rc == 0: sys.stdout.write(usage_text) else: sys.stderr.write(usage_text) exit(rc) def debug(level, message): global verboselevel if verboselevel >= level: sys.stderr.write(u'%s: DEBUG[%d]: %s\n' % (progname, level, message)) return 0 def info(message): global verboselevel if verboselevel >= 3: sys.stderr.write(u'%s: INFO: %s\n' % (progname, message)) return 0 def warning(message): global verboselevel if verboselevel >= 2: sys.stderr.write(u'%s: WARNING: %s\n' % (progname, message)) return 0 def error(message): global verboselevel if verboselevel >= 1: sys.stderr.write(u'%s: ERROR: %s\n' % (progname, message)) exit(2) # Entry point if __name__ == "__main__": main()