#!/usr/bin/python3 # vim: nowrap # TODO # Modules import signal import sys import os import syslog import re import time import readline import gpxpy import shutil import copy #import pprint import apsw import geopy.distance import subprocess import calendar import http.server import socketserver import atexit # Globals progname = sys.argv[0].rpartition("/")[2] del sys.argv[0] syslog_opened_flag = False DEVNULL = open(os.devnull, 'wb') our_new_way_ifn_extra_component_separator = '=' our_new_way_ifn_intra_component_separator = '_' city_distance_tolerance = 5.0 no_sources_coords = (48.223319,11.674804) http_server_port = 8000 legend_key_fmtstr = '%-15s' legend_desc_fmtstr = '%s' ui_regexps = {} # anything except (non-alpha-numerics, digits and underscore) possibly followed by hyphen and same again possibly ... ui_regexps['international_word'] = r'[^\W\d_]+(?:|-[^\W\d_]+){0,}' ui_regexps['english_word'] = r'[a-zA-Z]+(?:|-[a-zA-Z]+){0,}' ui_regexps['year'] = r'(?:19|20)\d\d' ui_regexps['mon'] = r'(?:0[1-9]|1[0-2])' ui_regexps['dom'] = r'(?:[0-2][0-9]|3[0-1])' ui_regexps['hour'] = r'(?:[01][0-9]|2[0-3])' ui_regexps['min'] = r'[0-5][0-9]' ui_regexps['sec'] = r'[0-5][0-9]' ui_regexps['serno'] = r'[a-zA-Z0-9]{4,}' ui_regexps['coords'] = r'[0-9]+(?:|\.[0-9]+),[0-9]+(?:|\.[0-9]+)' ui_regexps['country'] = r'(?:ad|ae|af|ag|ai|al|am|ao|aq|ar|as|at|au|aw|ax|az|ba|bb|bd|be|bf|bg|bh|bi|bj|bl|bm|bn|bo|bq|br|bs|bt|bv|bw|by|bz|ca|cc|cd|cf|cg|ch|ci|ck|cl|cm|cn|co|cr|cu|cv|cw|cx|cy|cz|de|dj|dk|dm|do|dz|ec|ee|eg|eh|er|es|et|fi|fj|fk|fm|fo|fr|ga|gb|gd|ge|gf|gg|gh|gi|gl|gm|gn|gp|gq|gr|gs|gt|gu|gw|gy|hk|hm|hn|hr|ht|hu|id|ie|il|im|in|io|iq|ir|is|it|je|jm|jo|jp|ke|kg|kh|ki|km|kn|kp|kr|kw|ky|kz|la|lb|lc|li|lk|lr|ls|lt|lu|lv|ly|ma|mc|md|me|mf|mg|mh|mk|ml|mm|mn|mo|mp|mq|mr|ms|mt|mu|mv|mw|mx|my|mz|na|nc|ne|nf|ng|ni|nl|no|np|nr|nu|nz|om|pa|pe|pf|pg|ph|pk|pl|pm|pn|pr|ps|pt|pw|py|qa|re|ro|rs|ru|rw|sa|sb|sc|sd|se|sg|sh|si|sj|sk|sl|sm|sn|so|sr|ss|st|sv|sx|sy|sz|tc|td|tf|tg|th|tj|tk|tl|tm|tn|to|tr|tt|tv|tw|tz|ua|ug|uk|um|us|uy|uz|va|vc|ve|vg|vi|vn|vu|wf|ws|ye|yt|za|zm|zw)' # people are empty, or a word followed by any number of space+word ui_regexps['city'] = r'%s(?: +%s){0,}' % (ui_regexps['international_word'], ui_regexps['international_word']) ui_regexps['hfts'] = r'%s%s%s%s%s%s' % (ui_regexps['year'], ui_regexps['mon'],ui_regexps['dom'],ui_regexps['hour'],ui_regexps['min'],ui_regexps['sec']) # people are empty, or a word followed by any number of space+word ui_regexps['people'] = r'(?:|%s(?: +%s){0,})' % (ui_regexps['international_word'], ui_regexps['international_word']) # I assume for time being that descriptions do not include non-enlish words. ui_regexps['desc'] = r'(?:|%s(?: +%s){0,})' % (ui_regexps['english_word'], ui_regexps['english_word']) ui_regexps['confirm_flag'] = r'[ynr]' # When parsing filenames, most components are matched by the same regular expression that # is used for validating user input .... fn_regexps = ui_regexps # ... but there are exceptions: fn_regexps['city'] = ui_regexps['city'].replace(' ', our_new_way_ifn_intra_component_separator) fn_regexps['people'] = ui_regexps['people'].replace(' ', our_new_way_ifn_intra_component_separator) fn_regexps['desc'] = ui_regexps['desc'].replace(' ', our_new_way_ifn_intra_component_separator) # Derive tied and capturing regexps. tied_ui_regexps = { k:'^%s$' % (ui_regexps[k]) for k in ui_regexps } capturing_ui_regexps = { k:'(%s)' % (ui_regexps[k]) for k in ui_regexps } #tied_fn_regexps = { k:'^%s$' % (fn_regexps[k]) for k in fn_regexps } #capturing_fn_regexps = { k:'(%s)' % (fn_regexps[k]) for k in fn_regexps } colours = { 'bold':'\x1b[1m', 'clear':'\x1b[0m', 'darkredonblack':'\x1b[0;31;40m', 'red':'\x1b[31;1m', 'green':'\x1b[32;1m', 'yellow':'\x1b[33;1m', 'blue':'\x1b[34;1m', 'purple':'\x1b[35;1m', } def main(): global verboselevel global variable_natures, variable_values, parsers, db_file, browser_cmd, always_help_flag signal.signal(signal.SIGINT, signal_handler) # Defaults for options verboselevel = 2 mode = None db_file = '%s/.%s/%s.sqlite' % (os.environ['HOME'], progname, progname) mode = 'renamer' variable_name = 'new_ifn' browser_cmd = 'sensible-browser' always_help_flag = False # Process options, old school while True: if len(sys.argv) == 0: break # Application-specific options elif sys.argv[0].startswith('--db-file='): db_file = sys.argv[0][len('--db-file='):] elif sys.argv[0].startswith('--eval='): mode = 'evaler' variable_name = sys.argv[0][len('--eval='):] elif sys.argv[0].startswith('--graph='): debug(10, 'main: grapher mode selected') mode = 'grapher' desired_subarrays = sys.argv[0][len('--graph='):].split(',') elif sys.argv[0] == '--always-help': always_help_flag = True # Standard options elif sys.argv[0].startswith('--debug='): verboselevel = sys.argv[0][len('--debug='):] if not re.search('^\d+$', verboselevel): usage() verboselevel = int(verboselevel) elif sys.argv[0] == '--verbose': verboselevel = 3 elif sys.argv[0] == '--help-places': show_help_places() elif sys.argv[0] == '--help': usage(0) elif sys.argv[0] == '-v': verboselevel = 3 elif sys.argv[0] == '-d': if len(sys.argv) < 3: usage() verboselevel = sys.argv[1] del sys.argv[0] if not re.search('^\d+$', verboselevel): usage() verboselevel = int(verboselevel) elif sys.argv[0] == '-h': usage(0) elif sys.argv[0] == '--': del sys.argv[0] break elif sys.argv[0].startswith('-'): usage() else: break del sys.argv[0] # Sanity checks and derivations # (delegated) variable_natures = { # Globals - dependencies will be added dynamically. ('task_idl_flag','global'): { 'deps': [], 'pars': [], 'fnc': lambda rl, *p: True }, ('task_gdl_flag','global'): { 'deps': [], 'pars': [], 'fnc': lambda rl, *p: True }, ('gfns','global'): { 'deps': [], 'pars': [], # Varargs ('*p') are held in a tuple. We want a list. 'fnc': lambda rl, *p: list(p) }, ('gfns_abs','global'): { 'deps': [], 'pars': [('gfns','global')], 'fnc': get_gfns_abs_from_gfns }, ('old_ifns','global'): { 'deps': [], 'pars': [], # Varargs ('*p') are held in a tuple. We want a list. 'fnc': lambda rl, *p: list(p) }, # File-specific stuff from old filename 'old_ifn': { 'deps': [], 'pars': [], 'fnc': lambda rl, *pars: internal('trying to get old_ifn with pars=%s' % (str(pars))) }, 'task_show_image': { 'deps': [], 'pars': [ 'old_ifn' ], 'fnc': get_task_show_image }, 'gfn': { 'deps': [], 'pars': [], 'fnc': lambda rl, *pars: internal('trying to get gfn with pars=%s' % (str(pars))) }, 'old_parser': { 'deps': [], 'pars': ['old_ifn'], 'fnc': get_old_parser_from_old_ifn_silent }, 'old_ifn_ts': { 'deps': [], 'pars': ['old_ifn', 'old_parser' ], 'fnc': get_old_ifn_ts_from_old_ifn_and_old_parser_silent }, 'old_ifn_serno': { 'deps': [], 'pars': ['old_ifn', 'old_parser' ], 'fnc': get_old_ifn_serno_from_old_ifn_and_old_parser_silent }, 'old_ifn_country': { 'deps': [], 'pars': ['old_ifn', 'old_parser' ], 'fnc': get_old_ifn_country_from_old_ifn_and_old_parser_silent }, 'old_ifn_city': { 'deps': [], 'pars': ['old_ifn', 'old_parser' ], 'fnc': get_old_ifn_city_from_old_ifn_and_old_parser_silent }, 'old_ifn_people': { 'deps': [], 'pars': ['old_ifn', 'old_parser' ], 'fnc': get_old_ifn_people_from_old_ifn_and_old_parser_silent }, 'old_ifn_desc': { 'deps': [], 'pars': ['old_ifn', 'old_parser' ], 'fnc': get_old_ifn_desc_from_old_ifn_and_old_parser_silent }, 'old_ifn_coords': { 'deps': [], 'pars': ['old_ifn', 'old_parser' ], 'fnc': lambda rl, *p: None }, 'old_exif': { 'deps': [], 'pars': ['old_ifn'], 'fnc': get_old_exif_from_old_ifn_silent }, 'gpx': { 'deps': [], 'pars': ['gfn'], 'fnc': get_gpx_from_gfn_silent }, 'old_exif_ts': { 'deps': [], 'pars': ['old_exif'], 'fnc': get_old_exif_ts_from_old_exif_silent }, 'gpx_tracks': { 'deps': [], 'pars': ['gpx'], 'fnc': get_gpx_tracks_from_gpx_silent }, 'gpx_routes': { 'deps': [], 'pars': ['gpx'], 'fnc': get_gpx_routes_from_gpx_silent }, 'gpx_waypoints': { 'deps': [], 'pars': ['gpx'], 'fnc': get_gpx_waypoints_from_gpx_silent }, 'old_exif_serno': { 'deps': [], 'pars': ['old_exif'], 'fnc': get_old_exif_serno_from_old_exif_silent }, 'old_exif_coords': { 'deps': [], 'pars': ['old_exif'], 'fnc': get_old_exif_coords_from_old_exif_silent }, # File-specific flag to indicate all data that can be derived without interaction has been derived. 'task_idl_flag': { 'deps': ['old_ifn_country', 'old_ifn_city', 'old_ifn_people', 'old_ifn_desc', 'old_ifn_coords', 'old_exif_coords', 'old_ifn_ts', 'old_exif_ts', 'old_ifn_serno', 'old_exif_serno'], 'pars': [], 'fnc': lambda rl: True }, 'task_gdl_flag': { 'deps': ['gpx_tracks', 'gpx_routes', 'gpx_waypoints'], 'pars': [], 'fnc': lambda rl: True }, # File-specific new things 'hfts_guffs': { 'deps': [('task_idl_flag','global'), 'task_show_image' ], 'pars': ['old_ifn_ts', 'old_exif_ts'], 'fnc': get_hfts_guffs_from_old_ifn_ts_and_old_exif_ts_silent }, 'new_ts': { 'deps': [], 'pars': ['hfts_guffs'], 'fnc': get_new_ts_from_hfts_guffs_prompting }, 'new_exif_ts': { 'deps': [], 'pars': [ 'new_ts' ], 'fnc': lambda rl, new_ts: new_ts }, 'new_exif': { 'deps': [], 'pars': [ 'old_exif', 'new_exif_ts', 'new_exif_serno', 'new_exif_coords' ], 'fnc': get_new_exif_from_old_exif_and_new_exif_ts_and_new_exif_serno_new_exif_coords }, 'serno_guffs': { 'deps': [('task_idl_flag','global'), 'task_show_image' ], 'pars': ['old_ifn_serno', 'old_exif_serno'], 'fnc': get_serno_guffs_from_old_ifn_serno_and_old_exif_serno_silent }, 'new_serno': { 'deps': [], 'pars': [ 'serno_guffs' ], 'fnc': get_new_serno_from_serno_guffs_prompting }, 'new_exif_serno': { 'deps': [], 'pars': [ 'new_serno' ], 'fnc': lambda rl, new_serno: new_serno }, 'coords_guffs': { 'deps': ['task_show_image' ], 'pars': ['old_ifn_coords', 'old_exif_coords', 'nighest_id', 'nighest_tpd', 'new_ts'], 'fnc': get_coords_guffs_from_old_ifn_coords_and_old_exif_coords_and_nighest_id_and_nighest_tpd_and_new_ts_silent }, 'new_coords': { 'deps': [ 'task_show_map' ], 'pars': [ 'coords_guffs' ], 'fnc': get_new_coords_from_coords_guffs_prompting }, 'task_show_map': { 'deps': [], 'pars': [ 'coords_guffs', ('gfns_abs','global') ], 'fnc': get_task_show_map }, 'new_exif_coords': { 'deps': [], 'pars': [ 'new_coords' ], 'fnc': lambda rl, new_coords: new_coords }, 'new_places_dbent': { 'deps': [], 'pars': [ 'new_coords' ], 'fnc': get_new_places_dbent_from_new_coords_silent }, 'country_guffs': { 'deps': ['task_show_image' ], 'pars': ['old_ifn_country', 'new_places_dbent', 'override_country' ], 'fnc': get_country_guffs_from_old_ifn_country_and_new_places_dbent_and_override_country_silent }, 'new_country': { 'deps': [], 'pars': [ 'country_guffs', 'new_coords' ], 'fnc': get_new_country_from_country_guffs_prompting }, 'override_city': { 'deps': [], 'pars': [ 'new_coords' ], 'fnc': get_override_city_from_new_coords_silent }, 'override_country': { 'deps': [], 'pars': [ 'new_coords' ], 'fnc': get_override_country_from_new_coords_silent }, 'city_guffs': { 'deps': ['task_show_image' ], 'pars': ['old_ifn_city', 'new_places_dbent', 'override_city' ], 'fnc': get_city_guffs_from_old_ifn_city_and_new_places_dbent_and_override_city_silent }, 'new_city': { 'deps': [], 'pars': [ 'city_guffs', 'new_coords' ], 'fnc': get_new_city_from_city_guffs_prompting }, 'people_guffs': { 'deps': [ ('task_idl_flag','global'), 'task_show_image' ], 'pars': ['old_ifn_people'], 'fnc': get_people_guffs_from_old_ifn_people_silent }, 'new_people': { 'deps': [], 'pars': [ 'people_guffs' ], 'fnc': get_new_people_from_people_guffs_prompting }, 'desc_guffs': { 'deps': [ ('task_idl_flag','global'), 'task_show_image' ], 'pars': ['old_ifn_desc' ], 'fnc': get_desc_guffs_from_old_ifn_desc_silent }, 'new_desc': { 'deps': [], 'pars': [ 'desc_guffs' ], 'fnc': get_new_desc_from_desc_guffs_prompting }, 'new_ifn': { 'deps': [], 'pars': ['new_ts','new_serno','new_country','new_city','new_people','new_desc'], 'fnc': get_new_ifn_from_new_ts_and_new_serno_and_new_country_and_new_city_and_new_people_and_new_desc }, 'confirm_flag_guffs': { 'deps': [ 'task_show_image' ], 'pars': [], 'fnc': get_confirm_flag_guffs_silent }, 'apply_confirm_flag': { 'deps': [], 'pars': ['confirm_flag_guffs', 'old_exif', 'new_exif', 'old_ifn', 'new_ifn'], 'fnc': get_confirm_flag_prompting }, 'task_apply': { 'deps': [], 'pars': ['apply_confirm_flag', 'old_exif', 'new_exif', 'old_ifn', 'new_ifn'], 'fnc': get_task_apply }, 'nighest_id': { 'deps': [], 'pars': [ 'new_ts', 'old_ifn', ('old_ifns','global') ], 'fnc': get_nighest_id_from_new_ts_and_old_ifn_and_old_ifns_silent }, 'nighest_tpd': { 'deps': [ ('task_gdl_flag','global') ], 'pars': [ 'new_ts', ('gfns','global') ], 'fnc': get_nighest_tpd_from_new_ts_and_gfns_silent }, } variable_values = { } parsers = { 'our_new_way_ifn_parser': { 're': r'^%s%s%s%s%s%s%s%s%s%s%s\.jpg$' % (capturing_ui_regexps['hfts'], our_new_way_ifn_extra_component_separator, capturing_ui_regexps['serno'], our_new_way_ifn_extra_component_separator, capturing_ui_regexps['country'], our_new_way_ifn_extra_component_separator, capturing_ui_regexps['city'], our_new_way_ifn_extra_component_separator, capturing_ui_regexps['people'], our_new_way_ifn_extra_component_separator, capturing_ui_regexps['desc']), 'fnc': our_new_way_ifn_parser }, 'our_old_way_ifn_parser': { 're': r'^(\d{14})-([^-]+)-([a-z]{2})-([^-]+)-((?:(?:%s)_){0,})(.*?)\.jpg$' % ('|'.join(['alexis', 'suzie', 'louise', 'kamzik', 'mačka' ])), 'fnc': our_old_way_ifn_parser }, 'traditional_digicam_ifn_parser': { 're': r'^img_(\d{4})\.jpg$', 'fnc': traditional_digicam_ifn_parser }, 'android_ifn_parser': { 're': r'^%s%s%s_%s%s%s\.jpg$' % (capturing_ui_regexps['year'], capturing_ui_regexps['mon'], capturing_ui_regexps['dom'], capturing_ui_regexps['hour'], capturing_ui_regexps['min'], capturing_ui_regexps['sec']), 'fnc': android_ifn_parser }, 'dumb_ifn_parser': { 're': r'.*', 'fnc': dumb_ifn_parser } } if mode == 'grapher': grapher(desired_subarrays) elif mode == 'renamer': renamer(sys.argv) elif mode == 'evaler': evaler(variable_name, sys.argv) else: internal('main: %s: invalid mode' % (mode)) def renamer(fns): global variable_values, variable_natures, db_file, cursor # Sanity checks and derivations if not os.path.isfile(db_file): error('no database file found') if not db_file.endswith('.sqlite'): error('%s: doesn\'t end with .sqlite (hint: is that definitely an SQLite database?)') debug(10, 'main: connecting to database ...') #apsw.enable_callback_tracebacks(True) conn = apsw.Connection(db_file) cursor = conn.cursor() # Make missing tables # See bottom of http://download.geonames.org/export/dump/ for explanation of table structure. sql_statement = ''' CREATE TABLE IF NOT EXISTS places ( geonameid INT, name CHAR, asciiname CHAR, alternatenames CHAR, latitude FLOAT, longitude FLOAT, featureclass CHAR, featurecode CHAR, countrycode CHAR, cc2 CHAR, admin1code CHAR, admin2code CHAR, admin3code CHAR, admin4code CHAR, population INT, elevation INT, dem CHAR, timezone CHAR, modificationdate CHAR, PRIMARY KEY (geonameid) );''' sqlite_exec_wrapper(sql_statement) sql_statement = ''' CREATE TABLE IF NOT EXISTS coords_aliases ( alias TEXT, latitude FLOAT, longitude FLOAT, PRIMARY KEY (alias) );''' sqlite_exec_wrapper(sql_statement) sql_statement = ''' CREATE TABLE IF NOT EXISTS place_overrides ( latitude FLOAT, longitude FLOAT, country TEXT, city TEXT, PRIMARY KEY (latitude, longitude) );''' sqlite_exec_wrapper(sql_statement) sql_statement = ''' CREATE TABLE IF NOT EXISTS person_aliases ( alias TEXT, person TEXT, PRIMARY KEY (alias) );''' sqlite_exec_wrapper(sql_statement) sql_statement = ''' CREATE TABLE IF NOT EXISTS desc_aliases ( alias TEXT, desc TEXT, PRIMARY KEY (alias) );''' sqlite_exec_wrapper(sql_statement) info('perhaps activate focus stealing prevention (hint: XFCE Settings Manager --> Window Manager Tweaks --> Focus --> Activate focus stealing prevention + Honour standard ICCCM focus hint + When a window raises itself: Do nothing)') # Chrome disables writing to the clipboard if the HTML content is from a file:/// URL. # Therefore we need to run a real webserver. start_http_server() old_ifns = [] gfns = [] for fn in fns: debug(20, 'main: considering %s ...' % (fn)) if not os.path.isfile(fn): error('%s: not accessible' % (fn)) elif fn.lower().endswith('.jpg') or fn.lower().endswith('.jpeg'): if fn not in old_ifns: old_ifns.append(fn) else: error('%s: duplicate file on command line' % (fn)) elif fn.lower().endswith('.gpx'): if fn not in gfns: gfns.append(fn) else: error('%s: duplicate file on command line' % (fn)) else: error('%s: bad extension' % (fn)) debug(5, 'renamer: old_ifns=%s, gfns=%s' % (old_ifns, gfns)) # Add the old image filenames. for old_ifn in old_ifns: variable_values[('old_ifn',old_ifn)] = old_ifn variable_natures[('task_idl_flag','global')]['deps'].append(('task_idl_flag',old_ifn)) variable_natures[('old_ifns','global')]['deps'].append(('old_ifn',old_ifn)) variable_natures[('old_ifns','global')]['pars'].append(('old_ifn',old_ifn)) # Add the GPX filenames. for gfn in gfns: variable_values[('gfn',gfn)] = gfn variable_natures[('task_gdl_flag','global')]['deps'].append(('task_gdl_flag',gfn)) variable_natures[('gfns','global')]['deps'].append(('gfn',gfn)) variable_natures[('gfns','global')]['pars'].append(('gfn',gfn)) # Process each image file. for old_ifn in old_ifns: # Loop while user says they want to redo the questions. confirm_flag = None while True: print('%s%s%s ...' % (colours['purple'], old_ifn, colours['clear'])) confirm_flag = get_value(0, ('task_apply',old_ifn)) if confirm_flag is not None: break # Remove references to the old filename. variable_values = { var:variable_values[var] for var in variable_values if var[1] != old_ifn } # Re-add it but using its old filename. variable_values[('old_ifn',old_ifn)] = old_ifn # If the file did get renamed then there is data to reload. if confirm_flag: # Get the new name out of the cache: we will need it in a moment. new_ifn = get_value(0, ('new_ifn',old_ifn)) # Remove references to the old filename. variable_values = { var:variable_values[var] for var in variable_values if var[1] != old_ifn } variable_natures[('task_idl_flag','global')]['deps'].remove(('task_idl_flag',old_ifn)) variable_natures[('old_ifns','global')]['deps'].remove(('old_ifn',old_ifn)) variable_natures[('old_ifns','global')]['pars'].remove(('old_ifn',old_ifn)) # Re-add it but using its new filename. variable_values[('old_ifn',new_ifn)] = new_ifn variable_natures[('task_idl_flag','global')]['deps'].append(('task_idl_flag',new_ifn)) variable_natures[('old_ifns','global')]['deps'].append(('old_ifn',new_ifn)) variable_natures[('old_ifns','global')]['pars'].append(('old_ifn',new_ifn)) # Force collection of data the file with its new filename. del variable_values[('task_idl_flag','global')] del variable_values[('old_ifns','global')] else: # Remove references to the old filename. variable_values = { var:variable_values[var] for var in variable_values if var[1] != old_ifn } # Re-add it but using its old filename. variable_values[('old_ifn',old_ifn)] = old_ifn def start_http_server(): info('starting http server ...') try: http_server_pid = os.fork() except: internal('unable to fork') if http_server_pid == 0: os.chdir('/') # Create the http request handler (by default handling requests from where?) my_simple_http_request_handler = MySimpleHTTPRequestHandlerClass # Create a listener and tell it to pass what it hears to the request handler. httpd = MyTCPServerClass(('', http_server_port), my_simple_http_request_handler) # Before starting the server make it immune to CTRL-C (which may be sent # to the parent process). signal.signal(signal.SIGINT, signal.SIG_IGN) httpd.serve_forever() # Will it ever get here? sys.exit(0) # Only the parent gets here. def stop_http_server(): info('stopping http server ...') os.kill(http_server_pid, signal.SIGTERM) os.waitpid(http_server_pid, 0) atexit.register(stop_http_server) def grapher(desired_subarrays): debug(10, 'grapher: desired_subarrays=%s' % (desired_subarrays)) global variable_natures # Sanity checks # Guts print('digraph mygraph {') print(' ratio="fill";') print(' size="11.7,16.4!";') #print(' margin=0;') #print(' node [shape=box];') for variable in variable_natures: if isinstance(variable, tuple): src = '%s,%s' % variable else: src = '%s,' % (variable) actual_subarrays = { 'deps':'black', 'pars':'red' } for subarray in actual_subarrays: if subarray not in desired_subarrays: continue if len(variable_natures[variable][subarray]) > 0: for dep in variable_natures[variable][subarray]: if isinstance(dep, tuple): dst = '%s,%s' % dep else: dst = '%s,' % (dep) print(' "%s" -> "%s" [color = %s]' % (dst, src, actual_subarrays[subarray])) else: print(' "%s"' % (src)) print(' "task_idl_flag," -> "task_idl_flag,global"') print(' "old_ifn," -> "old_ifns,global"') print(' "task_gdl_flag," -> "task_gdl_flag,global"') print(' "gfn," -> "gfns,global"') print('}') def evaler(variable_name, old_ifns): global variable_values, variable_natures # Sanity checks # Guts for old_ifn in old_ifns: variable_values[('old_ifn',old_ifn)] = old_ifn variable_natures[('task_idl_flag','global')]['deps'].append(('task_idl_flag',old_ifn)) for old_ifn in old_ifns: print('%s: %s' % ((variable_name, old_ifn), get_value(0, (variable_name,old_ifn)))) def get_gfns_abs_from_gfns(rl, gfns): return [ os.path.abspath(gfn) for gfn in gfns ] def get_confirm_flag_guffs_silent(rl): confirm_flag_guffs = [] # These are in order of preference letter = 'y' legend = 'apply' confirm_flag_guffs.append({ 'p':'%s %s' % (legend_key_fmtstr, legend_desc_fmtstr) % (letter, legend), 'r':letter, 'v':letter }) letter = 'n' legend = 'don\'t apply' confirm_flag_guffs.append({ 'p':'%s %s' % (legend_key_fmtstr, legend_desc_fmtstr) % (letter, legend), 'r':letter, 'v':letter }) letter = 'r' legend = 'redo' confirm_flag_guffs.append({ 'p':'%s %s' % (legend_key_fmtstr, legend_desc_fmtstr) % (letter, legend), 'r':letter, 'v':letter }) return confirm_flag_guffs def get_confirm_flag_prompting(rl, confirm_flag_guffs, old_exif, new_exif, old_ifn, new_ifn): #if old_exif == new_exif and old_ifn == new_ifn: # return False def rationaliser_confirm_flag_or_valid_letter(x): if x not in [ confirm_flag_guff['r'] for confirm_flag_guff in confirm_flag_guffs if confirm_guff['v'] is not None ]: return rationaliser_confirm_flag(x) confirm_flag_guff = [ confirm_flag_guff for confirm_flag_guff in confirm_flag_guffs if confirm_flag_guff['r'] == x ][0] if callable(confirm_flag_guff['v']): debug(20, 'rationaliser_confirm_flag_or_valid_letter: callable') return rationaliser_confirm_flag(confirm_flag_guff['v']()) else: debug(20, 'rationaliser_confirm_flag_or_valid_letter: not callable') return rationaliser_confirm_flag(confirm_flag_guff['v']) def helper_confirm_flag_or_valid_letter(): for confirm_flag_guff in confirm_flag_guffs: print(confirm_flag_guff['p']) #sql_statement = 'SELECT alias, latitude, longitude FROM confirm_flag_aliases;' #results = sqlite_exec_wrapper(sql_statement) #for result in results: # (alias,latitude,longitude) = result # print('%-5s %.6f,%.6f' % (alias,latitude,longitude)) if len(confirm_flag_guffs) == 0: #prompt = format_prompt('confirm', []) rationaliser = rationaliser_confirm_flag else: #prompt = format_prompt('confirm', [confirm_flag_guff['p'] for confirm_flag_guff in confirm_flag_guffs ]) rationaliser = rationaliser_confirm_flag_or_valid_letter prompt = format_prompt('confirm', confirm_flag_guffs) helper = helper_confirm_flag_or_valid_letter validator = validator_confirm_flag default = '' # Special bit of extra text for this question. print('%s%s%s' % (colours['red'], '\n'.join([ 'EXIF: old: %s' % (exif2exifsummary(old_exif)), ' new: %s' % (exif2exifsummary(new_exif)), 'filename: old: %s' % (old_ifn), ' new: %s' % (new_ifn) ]), colours['clear'])) return { 'y':True, 'n':False, 'r':None }[get_answer(prompt, default, validator, rationaliser, 'confirm', helper, always_help_flag)] def exif2exifsummary(exif): d = exif['DateTimeOriginal'] if 'DateTimeOriginal' in exif else None i = exif['ImageUniqueID'] if 'ImageUniqueID' in exif else None if 'GPSLatitude' in exif and 'GPSLongitude' in exif: c = '(%.6f,%.6f)' % get_coords_from_exifgpsstrs(exif['GPSLatitude'], exif['GPSLongitude']) else: c = None return '%s/%s/%s' % (d, i, c) def get_task_apply(rl, apply_confirm_flag, old_exif, new_exif, old_ifn, new_ifn): if apply_confirm_flag is not None and apply_confirm_flag == True: # Do the exif first. if new_exif != old_exif: exiftool_cmdline = 'exiftool -overwrite_original %s %s 2>&1' % (' '.join([ '-%s=\'%s\'' % (k, new_exif[k]) for k in new_exif ]), old_ifn) debug(10, 'get_task_apply: calling [%s] ...' % (exiftool_cmdline)) child = subprocess.Popen(exiftool_cmdline, shell=True, stdout=subprocess.PIPE, stderr=DEVNULL, universal_newlines=True) exiftool_output = child.communicate()[0] exiftool_rc = child.returncode if exiftool_rc != 0: warning('failed to call exiftool (output was [%s])' % (exiftool_output)) else: info('%s: not re-exifing' % (old_ifn)) # Rename second. if new_ifn != old_ifn: info('%s: renaming ...' % (old_ifn)) shutil.move(old_ifn, new_ifn) else: info('%s: not renaming' % (old_ifn)) # We return the confirm flag to save the caller from making a second call to get_value() to # get the confirm flag (which it needs to decide if the file has to be redone. return apply_confirm_flag def get_nighest_id_from_new_ts_and_old_ifn_and_old_ifns_silent(rl, new_ts, old_ifn, old_ifns): # So far, we have no nighest image data. sofar_nighest_id = None # Scan over all files ... for ifn in old_ifns: # A file can't be the nighest file to itself. if ifn == old_ifn: continue coords = get_value(rl+1, ('old_exif_coords', ifn)) if coords is None: continue for ts_source in ['old_ifn_ts', 'old_exif_ts']: ts = get_value(rl+1, (ts_source, ifn)) if ts is None: continue if sofar_nighest_id is not None and abs(new_ts-ts) >= abs(new_ts-sofar_nighest_id['ts']): continue # This is the nighest image so far. Save its details. sofar_nighest_id = { 'ifn': ifn, 'ts': ts, 'coords': coords, 'ts_source': ts_source } return sofar_nighest_id def get_nighest_tpd_from_new_ts_and_gfns_silent(rl, new_ts, gfns): # So far, we have no nighest trackpoint data. sofar_nighest_tpd = None # Scan over all files ... for gfn in gfns: # Get data but skip file if not available. gpx_tracks = get_value(rl+1, ('gpx_tracks', gfn)) for track_id in range(0,len(gpx_tracks)): for segment_id in range(0,len(gpx_tracks[track_id].segments)): for point_id in range(0,len(gpx_tracks[track_id].segments[segment_id].points)): point_ts = get_ts_from_gpxpyts(gpx_tracks[track_id].segments[segment_id].points[point_id].time) if sofar_nighest_tpd is not None and abs(new_ts-point_ts) >= abs(new_ts-sofar_nighest_tpd['ts']): continue # This is the nighest image so far. Save its details. sofar_nighest_tpd = { 'gfn': gfn, 'track_id': track_id, 'segment_id': segment_id, 'point_id': point_id, 'ts': point_ts, 'coords': (gpx_tracks[track_id].segments[segment_id].points[point_id].latitude,gpx_tracks[track_id].segments[segment_id].points[point_id].longitude) } return sofar_nighest_tpd def get_gpx_from_gfn_silent(rl, gfn): debug(10, 'get_gpx_from_gfn_silent: %s: loading all gpx data' % (gfn)) fh = open(gfn, 'r') gpx = gpxpy.parse(fh) fh.close() return gpx def get_old_exif_from_old_ifn_silent(rl, old_ifn): exiftool_cmdline = 'exiftool -args -n %s 2>&1' % (old_ifn) child = subprocess.Popen(exiftool_cmdline, shell=True, stdout=subprocess.PIPE, stderr=DEVNULL, universal_newlines=True) exiftool_output = child.communicate()[0].strip() exiftool_rc = child.returncode if exiftool_rc != 0: error('failed to call exiftool (output was [%s])' % (exiftool_output)) old_exif = dict([ re.search('^-([^=]+)=(.*)', x).group(1,2) for x in exiftool_output.split('\n') ]) # As a simplification, we strip the dict down. old_exif = { k:old_exif[k] for k in old_exif if k in [ 'DateTimeOriginal', 'CreateDate', 'DateTimeDigitized', 'GPSLatitude', 'GPSLongitude', 'ImageUniqueID' ]} return old_exif def get_gpx_tracks_from_gpx_silent(rl, gpx): if gpx is None: return None return gpx.tracks def get_gpx_routes_from_gpx_silent(rl, gpx): if gpx is not None and len(gpx.routes) != 0: error('some files contain routes!') return None def get_gpx_waypoints_from_gpx_silent(rl, gpx): if gpx is not None and len(gpx.waypoints) != 0: error('some files contain waypoints!') return None def get_old_exif_ts_from_old_exif_silent(rl, old_exif): possible_tags = [ 'DateTimeOriginal', 'CreateDate', 'DateTimeDigitized' ] # Just consider the values of those tags that are present in old_exif. xftss = [ old_exif[tag] for tag in possible_tags if tag in old_exif ] # If none of those tags were present then return None. if len(xftss) == 0: return None # If the remaining values are all the same then return that value, converted to an ts. if min(xftss) == max(xftss): debug(10, 'get_old_exif_ts_from_old_exif_silent: some tags are present and their values are all the same') return get_ts_from_xfts(xftss[0]) # If we're here then there are different values present. We could pick the # lowest, but for the time being we'll make an error. warning('different exif tags contain different xftss (%s); taking the earliest one ...' % ', '.join(xftss)) return get_ts_from_xfts(min(xftss)) def get_old_exif_coords_from_old_exif_silent(rl, old_exif): if old_exif is None or 'GPSLatitude' not in old_exif or 'GPSLatitude' not in old_exif: return None #if 'GPSLatitudeRef' not in old_exif or 'GPSLongitudeRef' not in old_exif: # internal('get_old_exif_coords_from_old_exif_silent: coords are in exif but coord-refs are not') old_exif_coords = get_coords_from_exifgpsstrs(old_exif['GPSLatitude'], old_exif['GPSLongitude']) debug(10, 'get_old_exif_coords_from_old_exif_silent: returning %s ...' % (str(old_exif_coords))) return old_exif_coords def get_new_ifn_from_new_ts_and_new_serno_and_new_country_and_new_city_and_new_people_and_new_desc(rl, new_ts, new_serno, new_country, new_city, new_people, new_desc): debug(10, 'get_new_ifn_from_new_ts_and_new_serno_and_new_country_and_new_city_and_new_people_and_new_desc: new_country=%s, new_city=%s' % (new_country, new_city)) if not isinstance(new_ts, int): internal('get_new_ifn_from_new_ts_and_new_serno_and_new_country_and_new_city_and_new_people_and_new_desc: new_ts (%s) is not an integer!' % (new_ts)) else: stringified_new_ifn_hfts = get_hfts_from_ts(new_ts) if not isinstance(new_serno, str): internal('get_new_ifn_from_new_ts_and_new_serno_and_new_country_and_new_city_and_new_people_and_new_desc: new_serno (%s) is not a string!' % (new_serno)) else: stringified_new_serno = new_serno if not isinstance(new_country, str): internal('get_new_ifn_from_new_ts_and_new_serno_and_new_country_and_new_city_and_new_people_and_new_desc: new_country (%s) is not a string!' % (new_country)) else: stringified_new_country = new_country if not isinstance(new_city, str): internal('get_new_ifn_from_new_ts_and_new_serno_and_new_country_and_new_city_and_new_people_and_new_desc: new_city (%s) is not a string!' % (new_city)) else: stringified_new_city = our_new_way_ifn_intra_component_separator.join(new_city.split()) if not isinstance(new_people, list): internal('get_new_ifn_from_new_ts_and_new_serno_and_new_country_and_new_city_and_new_people_and_new_desc: new_people (%s) is not a list!' % (new_people)) elif len([ x for x in new_people if not isinstance(x, str)]) > 0: internal('get_new_ifn_from_new_ts_and_new_serno_and_new_country_and_new_city_and_new_people_and_new_desc: new_people (%s) contains non-strings!' % (new_people)) else: # new_people is a already a list so it doesn't need splitting. stringified_new_people = our_new_way_ifn_intra_component_separator.join(new_people) if not isinstance(new_desc, str): internal('get_new_ifn_from_new_ts_and_new_serno_and_new_country_and_new_city_and_new_people_and_new_desc: new_desc (%s) is not a string!' % (new_desc)) else: stringified_new_desc = our_new_way_ifn_intra_component_separator.join(new_desc.split()) debug(10, 'get_new_ifn_from_new_ts_and_new_serno_and_new_country_and_new_city_and_new_people_and_new_desc: stringified_new_ifn_hfts=%s, stringified_new_serno=%s, stringified_new_country=%s, stringified_new_city=%s, stringified_new_people=%s, stringified_new_desc=%s' % (stringified_new_ifn_hfts, stringified_new_serno, stringified_new_country, stringified_new_city, stringified_new_people, stringified_new_desc)) return '%s%s%s%s%s%s%s%s%s%s%s.jpg' % (stringified_new_ifn_hfts, our_new_way_ifn_extra_component_separator, stringified_new_serno, our_new_way_ifn_extra_component_separator, stringified_new_country, our_new_way_ifn_extra_component_separator, stringified_new_city, our_new_way_ifn_extra_component_separator, stringified_new_people, our_new_way_ifn_extra_component_separator, stringified_new_desc) def get_old_ifn_ts_from_old_ifn_and_old_parser_silent(rl, old_ifn, old_parser): return get_ts_from_hfts(parsers[old_parser]['fnc'](old_ifn, parsers[old_parser]['re'], 'hfts')) def get_old_ifn_serno_from_old_ifn_and_old_parser_silent(rl, old_ifn, old_parser): return parsers[old_parser]['fnc'](old_ifn, parsers[old_parser]['re'], 'serno') def get_old_ifn_country_from_old_ifn_and_old_parser_silent(rl, old_ifn, old_parser): return parsers[old_parser]['fnc'](old_ifn, parsers[old_parser]['re'], 'country') def get_old_ifn_city_from_old_ifn_and_old_parser_silent(rl, old_ifn, old_parser): return parsers[old_parser]['fnc'](old_ifn, parsers[old_parser]['re'], 'city') def get_old_ifn_people_from_old_ifn_and_old_parser_silent(rl, old_ifn, old_parser): return parsers[old_parser]['fnc'](old_ifn, parsers[old_parser]['re'], 'people') def get_old_ifn_desc_from_old_ifn_and_old_parser_silent(rl, old_ifn, old_parser): return parsers[old_parser]['fnc'](old_ifn, parsers[old_parser]['re'], 'desc') def our_new_way_ifn_parser(fn, regexp, thing): m = re.search(regexp, fn) if m is None: internal('somehow the wrong parser got called') # This is the bit specific to our new way of naming files. values = { 'hfts': m.group(1), 'serno': m.group(2), 'country': m.group(3), # Internal representation of city is with spaces. 'city': m.group(4).replace(our_new_way_ifn_intra_component_separator, ' '), # Internal representation of people is a list (or None for unknown). 'people': m.group(5).split(our_new_way_ifn_intra_component_separator) if m.group(5) != 'nobody' else [], # Internal representation of description is with spaces. 'desc': m.group(6).replace(our_new_way_ifn_intra_component_separator, ' ') } return values[thing] def our_old_way_ifn_parser(fn, regexp, thing): m = re.search(regexp, fn) if m is None: internal('somehow the wrong parser got called') # This is the bit specific to our new way of naming files. values = { 'hfts': m.group(1), 'serno': m.group(2), 'country': m.group(3), # Internal representation of city is with spaces. 'city': m.group(4).replace('_', ' '), # Internal representation of people is a list (or None for unknown). 'people': m.group(5).rstrip('_').split() if m.group(5) != '' else [], # Internal representation of description is with spaces. 'desc': m.group(6).replace('_',' ') } return values[thing] def traditional_digicam_ifn_parser(fn, regexp, thing): m = re.search(regexp, fn) if m is None: internal('somehow the wrong parser got called') # This is the bit specific to a traditional digital camera. values = { 'hfts': None, 'serno': m.group(1), 'country': None, 'city': None, 'people': None, 'desc': None } return values[thing] def android_ifn_parser(fn, regexp, thing): m = re.search(regexp, fn) if m is None: internal('somehow the wrong parser got called') # This is the bit specific to a traditional digital camera. values = { 'hfts': '%s%s%s%s%s%s' % (m.group(1),m.group(2),m.group(3),m.group(4),m.group(5),m.group(6)), 'serno': None, 'country': None, 'city': None, 'people': None, 'desc': None } return values[thing] def dumb_ifn_parser(fn, regexp, thing): return None def get_old_parser_from_old_ifn_silent(rl, old_ifn): global parsers for old_parser in parsers: if re.search(parsers[old_parser]['re'], old_ifn): # Note we're returning the regexp and parser function! We need the regexp in order to # do capturing without having to specify the regexp again. return old_parser internal('this should not happen') def get_old_exif_serno_from_old_exif_silent(rl, old_exif): if old_exif is None or 'ImageUniqueID' not in old_exif: return None # 'EXIF ImageUniqueID' should be suitable but a quick test of # several files revealed that this unique ID is not unique: # # lagane$ for X in *.jpg; do echo "$X: $(exiftool $X | grep Unique)"; done # 20180827134427=a5725=cz=liberec=suzie=main_square.jpg: # 20200816114140=0014=de=eagdfsg=nobody=sdfhg.jpg: Image Unique ID : Y13LLKA00AM Y13LLMH01AA. # 20200816_114626.jpg: Image Unique ID : Y13LLKA00AM Y13LLMH01AA. # 20200816_114627.jpg: Image Unique ID : Y13LLKA00AM Y13LLMH01AA. # 20200816_122404.jpg: Image Unique ID : B32QLLA01MM B32QLLA01MM # 20200816_123451.jpg: Image Unique ID : B32QLLA01MM # 20200816_123550.jpg: Image Unique ID : B32QLLA01MM B32QLLA01MM # 20200816_123621.jpg: Image Unique ID : B32QLLA01MM B32QLLA01MM # 20200816_123626.jpg: Image Unique ID : B32QLLA01MM B32QLLA01MM # 20200817_131652.jpg: Image Unique ID : B32QLLA01MM # 20200817_131710.jpg: Image Unique ID : B32QLLA01MM # 20200817_131718.jpg: Image Unique ID : B32QLLA01MM # 20200817_141000.jpg: Image Unique ID : B32QLLA01MM # 20200817_150050.jpg: Image Unique ID : B32QLLA01MM # 20200817_151641.jpg: Image Unique ID : B32QLLA01MM # 20200818_115941.jpg: Image Unique ID : B32QLLA01MM B32QLLA01MM # 20200818_115957.jpg: Image Unique ID : B32QLLA01MM B32QLLA01MM # 20200818_120010.jpg: Image Unique ID : B32QLLA01MM B32QLLA01MM # 20200819_123506.jpg: Image Unique ID : B32QLLA01MM B32QLLA01MM # 20200819_123530.jpg: Image Unique ID : B32QLLA01MM # 20200819_123826.jpg: Image Unique ID : B32QLLA01MM # 20200819_123843.jpg: Image Unique ID : B32QLLA01MM B32QLLA01MM # lagane$ # # But the serial numbers that I invent and put back are ok. So I think I need # a regular expression for *bad* serial numbers. # if re.search('^(?:[A-Z0-9]{11}|[A-Z0-9]{11} [A-Z0-9]{11}\.?)$', old_exif['ImageUniqueID']): debug(10, 'get_old_exif_serno_from_old_exif_silent: ignoring androidy serno (%s) ...' % (old_exif['ImageUniqueID'])) return None else: debug(10, 'get_old_exif_serno_from_old_exif_silent: got sensible looking serno (%s); returning that ...' % (old_exif['ImageUniqueID'])) return old_exif['ImageUniqueID'] def gen_fake_serno(): global progname old_fake_serno_file = '%s/.%s/old-fake-serno.txt' % (os.environ['HOME'], progname) if os.path.isfile(old_fake_serno_file): debug(10, 'gen_fake_serno: file exists; opening ...') fh = open(old_fake_serno_file, 'r') old_fake_serno = int(fh.readline()) fh.close() else: debug(10, 'gen_fake_serno: file does not exist; using zero ...') old_fake_serno = 0 debug(10, 'gen_fake_serno: writing next number back ...') new_fake_serno = old_fake_serno+1 fh = open(old_fake_serno_file, 'w') fh.write('%d\n' % (new_fake_serno)) fh.close() return '%04d' % (new_fake_serno) def get_coords_guffs_from_old_ifn_coords_and_old_exif_coords_and_nighest_id_and_nighest_tpd_and_new_ts_silent(rl, old_ifn_coords, old_exif_coords, nighest_id, nighest_tpd, new_ts): coords_guffs = [] # We consider the possible sources in most- to least-preferred order because it # makes working out the default easier. if old_exif_coords is not None: letter = 'e' legend = '%.6f,%.6f (from exif headers of this file)' % (old_exif_coords[0],old_exif_coords[1]) coords_guffs.append({ 'p':'%s %s' % (legend_key_fmtstr, legend_desc_fmtstr) % (letter, legend), 'r':letter, 'v':old_exif_coords, 'c':'#ff0000', 'l':legend }) if old_ifn_coords is not None: letter = 'f' legend = '%.6f,%.6f (from name of this file)' % (old_ifn_coords) coords_guffs.append({ 'p':'%s %s' % (legend_key_fmtstr, legend_desc_fmtstr) % (letter, legend), 'r':letter, 'v':old_ifn_coords, 'c':'#000000', 'l':legend }) if nighest_tpd is not None: letter = 't' legend = '%.6f,%.6f (from trackpoint %s nigh)' % (nighest_tpd['coords'][0], nighest_tpd['coords'][1], get_dhms_from_s(abs(nighest_tpd['ts']-new_ts))) coords_guffs.append({ 'p':'%s %s' % (legend_key_fmtstr, legend_desc_fmtstr) % (letter, legend), 'r':letter, 'v':nighest_tpd['coords'], 'c':'#00ff00', 'l':legend }) if nighest_id is not None: letter = 'n' legend = '%.6f,%.6f (from exif headers of %s %s nigh)' % (nighest_id['coords'][0], nighest_id['coords'][1], nighest_id['ifn'], get_dhms_from_s(abs(nighest_id['ts']-new_ts))) coords_guffs.append({ 'p':'%s %s' % (legend_key_fmtstr, legend_desc_fmtstr) % (letter, legend), 'r':letter, 'v':nighest_id['coords'], 'c':'#0000ff', 'l':legend }) letter = 'c' legend = 'map click' coords_guffs.append({ 'p':'%s %s' % (legend_key_fmtstr, legend_desc_fmtstr) % (letter, legend), 'r':letter, 'v':lambda: get_coords_from_click(), 'c':None, 'l':None }) coords_guffs.append({ 'p':'%s %s' % (legend_key_fmtstr, legend_desc_fmtstr) % (',', 'explicit latitude and longitude'), 'r':None, 'v':None, 'c':None, 'l':None }) coords_guffs.append({ 'p':'%s %s' % (legend_key_fmtstr, legend_desc_fmtstr) % (',*', 'create alias for latitude and longitude'), 'r':None, 'v':None, 'c':None, 'l':None }) coords_guffs.append({ 'p':'%s %s' % (legend_key_fmtstr, legend_desc_fmtstr) % ('', 'alias of a latitude and longitude'), 'r':None, 'v':None, 'c':None, 'l':None }) return coords_guffs def get_task_show_image(rl, old_ifn): # If geeqie is not already running then 'geeqie -r' will fork one, but # will leave filehandles open, thereby making subprocess.Popen() hang, even # though the process it launches has exited. For this reason we redirect # stdout/stdin/stderr, etc. geeqie_cmdline = 'timeout 5 geeqie -r "%s" >/dev/null 2>&1 < /dev/null' % (old_ifn) debug(4, 'get_task_show_image: calling [%s] ...' % (geeqie_cmdline)) try: child = subprocess.Popen(geeqie_cmdline, shell=True, stdout=subprocess.PIPE, stderr=DEVNULL) geeqie_output = child.communicate()[0] geeqie_rc = child.returncode except: geeqie_rc = 1 geeqie_output = '' if geeqie_rc != 0: warning('failed to call geeqie (output was [%s])' % (geeqie_output)) debug(10, 'get_task_show_image: end of geeqie launch code') return None def get_task_show_map(rl, coords_guffs, gfns_abs): filename = '/tmp/%s.%d.html' % (progname, os.getpid()) fh = open(filename, 'w') fh.write(gen_html(coords_guffs, gfns_abs)) fh.close() url = 'http://localhost:%d%s' % (http_server_port, filename) browser_cmdline = 'timeout 10 %s "%s" 2>&1' % (browser_cmd, url) debug(4, 'get_task_show_map: calling [%s] ...' % (browser_cmdline)) try: child = subprocess.Popen(browser_cmdline, shell=True, stdout=subprocess.PIPE, stderr=DEVNULL) browser_output = child.communicate()[0] browser_rc = child.returncode except: browser_rc = 1 browser_output = 'failed to call browser' if browser_rc == 124: error('it looks like you forgot to start your browser *before* running %s' % (progname)) elif browser_rc != 0: warning('failed to call browser (output was [%s])' % (browser_output)) return True def gen_html(coords_guffs, gfns_abs): global no_sources_coords # Only consider guffs that have a colour (or a legend entry) specified. coords_guffs = [ coords_guff for coords_guff in coords_guffs if coords_guff['c'] is not None ] # For the time being, the javascript in the html can only handle zero or one track file. if len(gfns_abs) > 1: debug(10, 'gen_html: gfns_abs=%s' % (gfns_abs)) error('you specified %d track files (hint: the current implementation can handle only 0 or 1)' % (len(gfns_abs))) # Work out centre of map. if len(coords_guffs) > 0: miny = min([ coords_guff['v'][0] for coords_guff in coords_guffs ]) maxy = max([ coords_guff['v'][0] for coords_guff in coords_guffs ]) minx = min([ coords_guff['v'][1] for coords_guff in coords_guffs ]) maxx = max([ coords_guff['v'][1] for coords_guff in coords_guffs ]) centre_coords = ((miny+maxy)/2, (minx+maxx)/2) else: centre_coords = no_sources_coords # Bigger numbers zoom in. Smaller numbers zoom out. zoom = 12 avoid_overlap_offsets = [(20+3,20+3), (20+3,20-3), (20-3,20+3), (20-3,20-3)] if len(coords_guffs) > 4: internal('the avoid-averlap offsets array only has 4 points in; increase it') expanded_marker_code = '\n'.join(['add_layer_with_marker(%.6f,%.6f,\'%s\',%d,%d)' % (coords_guff['v'][0], coords_guff['v'][1], coords_guff['c'],*avoid_overlap_offsets[i]) for i,coords_guff in enumerate(coords_guffs) ]) expanded_legend_code = '\n'.join([ '''\ %s ''' % (coords_guff['c'],coords_guff['l']) for coords_guff in coords_guffs ]) return '''\ pr4
%s
''' % (centre_coords[0], centre_coords[1], zoom, gfns_abs[0] if len(gfns_abs) == 1 else '', expanded_marker_code, expanded_legend_code) def get_new_coords_from_coords_guffs_prompting(rl, coords_guffs): def rationaliser_coords_or_valid_letter(x): if x not in [ coords_guff['r'] for coords_guff in coords_guffs if coords_guff['v'] is not None ]: return rationaliser_coords(x) coords_guff = [ coords_guff for coords_guff in coords_guffs if coords_guff['r'] == x ][0] if callable(coords_guff['v']): debug(20, 'rationaliser_coords_or_valid_letter: callable') return rationaliser_coords(coords_guff['v']()) else: debug(20, 'rationaliser_coords_or_valid_letter: not callable') return rationaliser_coords('%.6f,%.6f' % coords_guff['v']) def helper_coords_or_valid_letter(): for coords_guff in coords_guffs: print(coords_guff['p']) sql_statement = 'SELECT alias, latitude, longitude FROM coords_aliases;' results = sqlite_exec_wrapper(sql_statement) for result in results: (alias,latitude,longitude) = result print('%s %s' % (legend_key_fmtstr, legend_desc_fmtstr) % (alias, '%.6f,%.6f' % (latitude,longitude))) if len(coords_guffs) == 0: #prompt = format_prompt('coords', []) default = '' rationaliser = rationaliser_coords else: #prompt = format_prompt('coords', [coords_guff['p'] for coords_guff in coords_guffs ]) default = coords_guffs[0]['r'] rationaliser = rationaliser_coords_or_valid_letter prompt = format_prompt('coords', coords_guffs) helper = helper_coords_or_valid_letter validator = validator_coords return tuple([float(x) for x in get_answer(prompt, default, validator, rationaliser, 'coords', helper, always_help_flag).split(',')]) def get_coords_from_click(): info('waiting for a map click ...') # Clear copy/paste buffer. That doesn't really work. So instead we look for # *changes* in the copy/paste buffer's contents (that are valid coords). xsel_cmdline = 'xsel -o --clipboard' child = subprocess.Popen(xsel_cmdline, shell=True, stdout=subprocess.PIPE, stderr=DEVNULL, universal_newlines=True) initial_xsel_output = child.communicate()[0] debug(10, 'get_coords_from_click: initial_xsel_output=%s' % (initial_xsel_output)) while True: child = subprocess.Popen(xsel_cmdline, shell=True, stdout=subprocess.PIPE, stderr=DEVNULL, universal_newlines=True) xsel_output = child.communicate()[0] debug(10, 'get_coords_from_click: xsel_output=%s' % (xsel_output)) if xsel_output != initial_xsel_output and validator_coords(xsel_output): return xsel_output time.sleep(0.1) def get_new_exif_from_old_exif_and_new_exif_ts_and_new_exif_serno_new_exif_coords(rl, old_exif, new_exif_ts, new_exif_serno, new_exif_coords): new_exif = copy.deepcopy(old_exif) if new_exif_ts is not None: new_exif['DateTimeDigitized'] = get_xfts_from_ts(new_exif_ts) new_exif['DateTimeOriginal'] = get_xfts_from_ts(new_exif_ts) new_exif['CreateDate'] = get_xfts_from_ts(new_exif_ts) else: internal('get_new_exif_from_old_exif_and_new_exif_ts_and_new_exif_serno_new_exif_coords: new_exif_ts is None') if new_exif_serno is not None: new_exif['ImageUniqueID'] = new_exif_serno else: internal('get_new_exif_from_old_exif_and_new_exif_ts_and_new_exif_serno_new_exif_coords: new_exif_serno is None') if new_exif_coords is not None: new_exif['GPSLatitude'], new_exif['GPSLongitude'] = get_exifgpsstrs_from_coords(new_exif_coords) else: internal('get_new_exif_from_old_exif_and_new_exif_ts_and_new_exif_serno_new_exif_coords: new_exif_coords is None') return new_exif def get_exifgpsstrs_from_coords(coords): if coords is None: return None # Return two strings. return '%.6f' % (abs(coords[0])), '%.6f' % (abs(coords[1])) def get_coords_from_exifgpsstrs(gpslatitude, gpslongitude): if gpslatitude is None or gpslongitude is None: return None lat = float(gpslatitude) lon = float(gpslongitude) # Either my new phone or Debian's exif module is resulting in zeros in the coordinate fields where formerly # these fields had simply not been present. So we need to check for this. if lat == 0.0 and lon == 0.0: return None return (lat,lon) #def dd2dms(dd): # m,s = divmod(dd*3600,60) # d,m = divmod(m,60) # return (d,m,s) # #def dms2dd(dms): # return dms[0] + (dms[1] / 60.0) + (dms[2] / 3600.0) if dms[0] >= 0 else dms[0] - (dms[1] / 60.0) - (dms[2] / 3600.0) def get_country_guffs_from_old_ifn_country_and_new_places_dbent_and_override_country_silent(rl, old_ifn_country, new_places_dbent, override_country): country_guffs = [] # These are in order of preference if old_ifn_country is not None: letter = 'f' legend = '%s (from name of this file)' % (old_ifn_country) country_guffs.append({ 'p':'%s %s' % (legend_key_fmtstr, legend_desc_fmtstr) % (letter, legend), 'r':letter, 'v':old_ifn_country }) if override_country is not None: letter = 'o' legend = '%s (from overrides table)' % (override_country) country_guffs.append({ 'p':'%s %s' % (legend_key_fmtstr, legend_desc_fmtstr) % (letter, legend), 'r':letter, 'v':override_country }) if new_places_dbent is not None: letter = 'd' legend = '%s (from places table)' % (new_places_dbent['country']) country_guffs.append({ 'p':'%s %s' % (legend_key_fmtstr, legend_desc_fmtstr) % (letter, legend), 'r':letter, 'v':new_places_dbent['country'] }) country_guffs.append({ 'p':'%s %s' % (legend_key_fmtstr, legend_desc_fmtstr) % ('', 'country in 2-letter ISO format'), 'r':None, 'v':None, 'c':None, 'l':None }) country_guffs.append({ 'p':'%s %s' % (legend_key_fmtstr, legend_desc_fmtstr) % ('!', 'permanently override country in places table'), 'r':None, 'v':None, 'c':None, 'l':None }) return country_guffs def get_new_country_from_country_guffs_prompting(rl, country_guffs, new_coords): def rationaliser_country_wrapper(country): return rationaliser_country(country, new_coords) def rationaliser_country_or_valid_letter(x): if x not in [ country_guff['r'] for country_guff in country_guffs if country_guff['v'] is not None ]: debug(10, 'rationaliser_country_or_valid_letter: %s not in guffs; calling rationaliser_country() ...' % (x)) return rationaliser_country_wrapper(x) country_guff = [ country_guff for country_guff in country_guffs if country_guff['r'] == x ][0] if callable(country_guff['v']): return rationaliser_country_wrapper(country_guff['v']()) else: return rationaliser_country_wrapper(country_guff['v']) def helper_country_or_valid_letter(): for country_guff in country_guffs: print(country_guff['p']) #sql_statement = 'SELECT alias, country FROM country_aliases;' #results = sqlite_exec_wrapper(sql_statement) #for result in results: # (alias,country) = result # print('%s %s' % (legend_key_fmtstr, legend_desc_fmtstr) % (alias,country)) if len(country_guffs) == 0: #prompt = format_prompt('country', []) default = '' rationaliser = rationaliser_country_wrapper else: #prompt = format_prompt('country', [country_guff['p'] for country_guff in country_guffs ]) default = country_guffs[0]['r'] # The rationaliser is too complicated to express as an expression. rationaliser = rationaliser_country_or_valid_letter prompt = format_prompt('country', country_guffs) validator = validator_country helper = helper_country_or_valid_letter return get_answer(prompt, default, validator, rationaliser, 'country', helper, always_help_flag) def get_city_guffs_from_old_ifn_city_and_new_places_dbent_and_override_city_silent(rl, old_ifn_city, new_places_dbent, override_city): city_guffs = [] # These are in order of preference if old_ifn_city is not None: letter = 'f' legend = '%s (from name of this file)' % (old_ifn_city) city_guffs.append({ 'p':'%s %s' % (legend_key_fmtstr, legend_desc_fmtstr) % (letter, legend), 'r':letter, 'v':old_ifn_city }) if override_city is not None: letter = 'o' legend = '%s (from overrides table)' % (override_city) city_guffs.append({ 'p':'%s %s' % (legend_key_fmtstr, legend_desc_fmtstr) % (letter, legend), 'r':letter, 'v':override_city }) if new_places_dbent is not None: letter = 'd' legend = '%s (from places table)' % (new_places_dbent['city']) city_guffs.append({ 'p':'%s %s' % (legend_key_fmtstr, legend_desc_fmtstr) % (letter, legend), 'r':letter, 'v':new_places_dbent['city'] }) city_guffs.append({ 'p':'%s %s' % (legend_key_fmtstr, legend_desc_fmtstr) % ('', 'explicit city name'), 'r':None, 'v':None, 'c':None, 'l':None }) city_guffs.append({ 'p':'%s %s' % (legend_key_fmtstr, legend_desc_fmtstr) % ('!', 'permanently override city name in places table'), 'r':None, 'v':None, 'c':None, 'l':None }) return city_guffs def get_new_city_from_city_guffs_prompting(rl, city_guffs, new_coords): def rationaliser_city_wrapper(city): return rationaliser_city(city, new_coords) def rationaliser_city_or_valid_letter(x): if x not in [ city_guff['r'] for city_guff in city_guffs if city_guff['v'] is not None ]: return rationaliser_city_wrapper(x) city_guff = [ city_guff for city_guff in city_guffs if city_guff['r'] == x ][0] if callable(city_guff['v']): return rationaliser_city_wrapper(city_guff['v']()) else: debug(20, 'rationaliser_city_or_valid_letter: not callable') return rationaliser_city_wrapper(city_guff['v']) def helper_city_or_valid_letter(): for city_guff in city_guffs: print(city_guff['p']) #sql_statement = 'SELECT alias, city FROM city_aliases;' #results = sqlite_exec_wrapper(sql_statement) #for result in results: # (alias,city) = result # print('%s %s' % (legend_key_fmtstr, legend_desc_fmtstr) % (alias,city)) if len(city_guffs) == 0: #prompt = format_prompt('city', []) default = '' rationaliser = rationaliser_city_wrapper else: #prompt = format_prompt('city', [city_guff['p'] for city_guff in city_guffs ]) default = city_guffs[0]['r'] # The rationaliser is too complicated to express as an expression. rationaliser = rationaliser_city_or_valid_letter prompt = format_prompt('city', city_guffs) validator = validator_city helper = helper_city_or_valid_letter return get_answer(prompt, default, validator, rationaliser, 'city', helper, always_help_flag) def get_new_places_dbent_from_new_coords_silent(rl, new_coords): debug(10, 'get_new_places_dbent_from_new_coords_silent: new_coords=%s' % (str(new_coords))) if new_coords is None: return None return lookup_country_and_city(new_coords) #def coalesce(a, b): # if a is None and b is None: # return None # elif a is None and b is not None: # return b # elif a is not None and b is None: # return a # elif a == b: # return a # else: # return None def coalesce_with_tolerance(value1, value2, tolerance): if value1 is None and value2 is None: return None elif value1 is not None and value2 is None: return value1 elif value1 is None and value2 is not None: return value2 elif abs(value1-value2) <= tolerance: # We take the lower value as that's likely to he chronologically earler. But this is not # tested as an approach. return min([value1, value2]) else: return None def get_hfts_guffs_from_old_ifn_ts_and_old_exif_ts_silent(rl, old_ifn_ts, old_exif_ts): coalesced_ts = coalesce_with_tolerance(old_ifn_ts, old_exif_ts, 30) if coalesced_ts is not None: old_ifn_ts = None old_exif_ts = None hfts_guffs = [] # We consider the possible sources in most- to least-preferred order because it # makes working out the default easier. if coalesced_ts is not None: letter = 'f' legend = '%s (from name of this file and exif headers of this file)' % (get_hfts_from_ts(coalesced_ts)) hfts_guffs.append({ 'p':'%s %s' % (legend_key_fmtstr, legend_desc_fmtstr) % (letter, legend), 'r':letter, 'v':get_hfts_from_ts(coalesced_ts) }) if old_ifn_ts is not None: letter = 'f' legend = '%s (from name of this file)' % (get_hfts_from_ts(old_ifn_ts)) hfts_guffs.append({ 'p':'%s %s' % (legend_key_fmtstr, legend_desc_fmtstr) % (letter, legend), 'r':letter, 'v':get_hfts_from_ts(old_ifn_ts) }) if old_exif_ts is not None: letter = 'e' legend = '%s (from exif headers of this file)' % (get_hfts_from_ts(old_exif_ts)) hfts_guffs.append({ 'p':'%s %s' % (legend_key_fmtstr, legend_desc_fmtstr) % (letter, legend), 'r':letter, 'v':get_hfts_from_ts(old_exif_ts) }) hfts_guffs.append({ 'p':'%s %s' % (legend_key_fmtstr, legend_desc_fmtstr) % ('', 'explicit timestamp in YYYYMMDDHHMMSS format'), 'r':None, 'v':None, 'c':None, 'l':None }) return hfts_guffs def get_new_ts_from_hfts_guffs_prompting(rl, hfts_guffs): def rationaliser_hfts_or_valid_letter(x): if x not in [ hfts_guff['r'] for hfts_guff in hfts_guffs if hfts_guff['v'] is not None ]: return rationaliser_hfts(x) hfts_guff = [ hfts_guff for hfts_guff in hfts_guffs if hfts_guff['r'] == x ][0] if callable(hfts_guff['v']): debug(20, 'rationaliser_hfts_or_valid_letter: callable') return rationaliser_hfts(hfts_guff['v']()) else: debug(20, 'rationaliser_hfts_or_valid_letter: not callable') return rationaliser_hfts(hfts_guff['v']) def helper_hfts_or_valid_letter(): for hfts_guff in hfts_guffs: print(hfts_guff['p']) #sql_statement = 'SELECT alias, hfts FROM hfts_aliases;' #results = sqlite_exec_wrapper(sql_statement) #for result in results: # (alias,hfts) = result # print('%s %s' % (legend_key_fmtstr, legend_desc_fmtstr) % (alias,hfts)) if len(hfts_guffs) == 0: #prompt = format_prompt('hfts', []) default = '' rationaliser = rationaliser_hfts else: #prompt = format_prompt('hfts', [hfts_guff['p'] for hfts_guff in hfts_guffs]) default = hfts_guffs[0]['r'] rationaliser = rationaliser_hfts_or_valid_letter prompt = format_prompt('hfts', hfts_guffs) validator = validator_hfts helper = helper_hfts_or_valid_letter return get_ts_from_hfts(get_answer(prompt, default, validator, rationaliser, 'hfts', helper, always_help_flag)) def format_prompt(title, guffs): # old style #title_line = '%s%s%s' % (colours['red'], title, colours['clear']) #options = [ guff['p'] for guff in guffs ] #options_line = '%s%s%s' % (colours['yellow'], '\n'.join(options) if len(options) > 0 else '(no options available)', colours['clear']) #prompt_line = '%s%s%s' % (colours['green'], title, colours['clear']) #return '%s\n%s\n%s' % (title_line, options_line, prompt_line) # new style return '%s%s%s(%s)' % (colours['green'], title, colours['clear'], '/'.join([ guff['r'] for guff in guffs if guff['r'] is not None ])) def get_serno_guffs_from_old_ifn_serno_and_old_exif_serno_silent(rl, old_ifn_serno, old_exif_serno): serno_guffs = [] # These are in order of preference if old_ifn_serno is not None: letter = 'f' legend = '%s (from name of this file)' % (old_ifn_serno) serno_guffs.append({ 'p':'%s %s' % (legend_key_fmtstr, legend_desc_fmtstr) % (letter, legend), 'r':letter, 'v':old_ifn_serno }) letter = 'g' legend = 'auto-generated serno' serno_guffs.append({ 'p':'%s %s' % (legend_key_fmtstr, legend_desc_fmtstr) % (letter, legend), 'r':letter, 'v':lambda : gen_fake_serno() }) if old_exif_serno is not None: letter = 'e' legend = '%s (from exif headers of this file)' % (old_exif_serno) serno_guffs.append({ 'p':'%s %s' % (legend_key_fmtstr, legend_desc_fmtstr) % (letter, legend), 'r':letter, 'v':old_exif_serno }) serno_guffs.append({ 'p':'%s %s' % (legend_key_fmtstr, legend_desc_fmtstr) % ('', 'explicit serial number'), 'r':None, 'v':None, 'c':None, 'l':None }) return serno_guffs def get_new_serno_from_serno_guffs_prompting(rl, serno_guffs): def rationaliser_serno_or_valid_letter(x): if x not in [ serno_guff['r'] for serno_guff in serno_guffs if serno_guff['v'] is not None ]: return rationaliser_serno(x) serno_guff = [ serno_guff for serno_guff in serno_guffs if serno_guff['r'] == x ][0] if callable(serno_guff['v']): debug(20, 'rationaliser_serno_or_valid_letter: callable') return rationaliser_serno(serno_guff['v']()) else: debug(20, 'rationaliser_serno_or_valid_letter: not callable') return rationaliser_serno(serno_guff['v']) def helper_serno_or_valid_letter(): for serno_guff in serno_guffs: print(serno_guff['p']) #sql_statement = 'SELECT alias, serno FROM serno_aliases;' #results = sqlite_exec_wrapper(sql_statement) #for result in results: # (alias,serno) = result # print('%s %s' % (legend_key_fmtstr, legend_desc_fmtstr) % (alias,serno)) if len(serno_guffs) == 0: #prompt = format_prompt('serno', []) default = '' rationaliser = rationaliser_serno else: #prompt = format_prompt('serno', [serno_guff['p'] for serno_guff in serno_guffs ]) default = serno_guffs[0]['r'] # The rationaliser is too complicated to express as an expression. rationaliser = rationaliser_serno_or_valid_letter prompt = format_prompt('serno', serno_guffs) helper = helper_serno_or_valid_letter validator = validator_serno return get_answer(prompt, default, validator, rationaliser, 'serno', helper, always_help_flag) def get_people_guffs_from_old_ifn_people_silent(rl, old_ifn_people): debug(20, 'get_people_guffs_from_old_ifn_people_silent: old_ifn_people=%s' % (old_ifn_people)) people_guffs = [] # These are in order of preference if old_ifn_people is not None: letter = 'f' legend = '%s (from name of this file)' % (','.join(old_ifn_people)) people_guffs.append({ 'p':'%s %s' % (legend_key_fmtstr, legend_desc_fmtstr) % (letter, legend), 'r':letter, 'v':','.join(old_ifn_people) }) letter = 'n' legend = 'nobody' people_guffs.append({ 'p':'%s %s' % (legend_key_fmtstr, legend_desc_fmtstr) % (letter, legend), 'r':letter, 'v':'nobody' }) people_guffs.append({ 'p':'%s %s' % (legend_key_fmtstr, legend_desc_fmtstr) % (' ...', 'registered name(s) or alias(es)'), 'r':None, 'v':None, 'c':None, 'l':None }) people_guffs.append({ 'p':'%s %s' % (legend_key_fmtstr, legend_desc_fmtstr) % ('* ...', 'register person and optionally create alias'), 'r':None, 'v':None, 'c':None, 'l':None }) return people_guffs def get_new_people_from_people_guffs_prompting(rl, people_guffs): def rationaliser_people_or_valid_letter(x): if x not in [ people_guff['r'] for people_guff in people_guffs if people_guff['v'] is not None ]: return rationaliser_people(x) people_guff = [ people_guff for people_guff in people_guffs if people_guff['r'] == x ][0] if callable(people_guff['v']): debug(20, 'rationaliser_people_or_valid_letter: callable') return rationaliser_people(people_guff['v']()) else: debug(20, 'rationaliser_people_or_valid_letter: not callable') return rationaliser_people(people_guff['v']) def helper_people_or_valid_letter(): for people_guff in people_guffs: print(people_guff['p']) sql_statement = 'SELECT alias, person FROM person_aliases;' results = sqlite_exec_wrapper(sql_statement) for result in results: (alias,person) = result print('%s %s' % (legend_key_fmtstr, legend_desc_fmtstr) % (alias,person)) if len(people_guffs) == 0: #prompt = format_prompt('people', []) default = '' rationaliser = rationaliser_people else: #prompt = format_prompt('people', [people_guff['p'] for people_guff in people_guffs ]) default = people_guffs[0]['r'] # The rationaliser is too complicated to express as an expression. rationaliser = rationaliser_people_or_valid_letter prompt = format_prompt('people', people_guffs) validator = validator_known_people helper = helper_people_or_valid_letter return get_answer(prompt, default, validator, rationaliser, 'people', helper, always_help_flag).split() def get_desc_guffs_from_old_ifn_desc_silent(rl, old_ifn_desc): desc_guffs = [] # These are in order of preference if old_ifn_desc is not None: letter = 'f' legend = '%s (from name of this file)' % (old_ifn_desc) desc_guffs.append({ 'p':'%s %s' % (legend_key_fmtstr, legend_desc_fmtstr) % (letter, legend), 'r':letter, 'v':old_ifn_desc }) desc_guffs.append({ 'p':'%s %s' % (legend_key_fmtstr, legend_desc_fmtstr) % ('', 'description or alias'), 'r':None, 'v':None, 'c':None, 'l':None }) desc_guffs.append({ 'p':'%s %s' % (legend_key_fmtstr, legend_desc_fmtstr) % ('*', 'create alias for description'), 'r':None, 'v':None, 'c':None, 'l':None }) return desc_guffs def get_new_desc_from_desc_guffs_prompting(rl, desc_guffs): def rationaliser_desc_or_valid_letter(x): if x not in [ desc_guff['r'] for desc_guff in desc_guffs if desc_guff['v'] is not None ]: return rationaliser_desc(x) desc_guff = [ desc_guff for desc_guff in desc_guffs if desc_guff['r'] == x ][0] if callable(desc_guff['v']): debug(20, 'rationaliser_desc_or_valid_letter: callable') return rationaliser_desc(desc_guff['v']()) else: debug(20, 'rationaliser_desc_or_valid_letter: not callable') return rationaliser_desc(desc_guff['v']) def helper_desc_or_valid_letter(): for desc_guff in desc_guffs: print(desc_guff['p']) sql_statement = 'SELECT alias, desc FROM desc_aliases;' results = sqlite_exec_wrapper(sql_statement) for result in results: (alias,desc) = result print('%s %s' % (legend_key_fmtstr, legend_desc_fmtstr) % (alias,desc)) if len(desc_guffs) == 0: #prompt = format_prompt('desc', []) default = '' rationaliser = rationaliser_desc else: #prompt = format_prompt('desc', [desc_guff['p'] for desc_guff in desc_guffs ]) default = desc_guffs[0]['r'] # The rationaliser is too complicated to express as an expression. rationaliser = rationaliser_desc_or_valid_letter prompt = format_prompt('desc', desc_guffs) validator = validator_desc helper = helper_desc_or_valid_letter return get_answer(prompt, default, validator, rationaliser, 'desc', helper, always_help_flag) def usage(rc=1): global progname fh = {True:sys.stdout, False:sys.stderr}[rc == 0] fh.write('Usage: %s [ ] [ --always-help ] [ --graph={deps|pars}[,...] | --eval= ]\n' % progname) sys.exit(rc) def get_value(rl, variable_name): global variable_natures, variable_values # variable_name parameter is *always* a tuple. This check can eventually be removed. if not isinstance(variable_name, tuple): internal('get_value[%d]: %s: not a tuple' % (r, variable_name)) # If it's in cache then return that without further work. if variable_name in variable_values: debug(5, 'get_value[%d]: %s: retrieving its value (%s) from cache ...' % (rl, variable_name, quote_value(variable_values[variable_name]))) return variable_values[variable_name] debug(5, 'get_value[%d]: %s: calculating its value' % (rl, variable_name)) # Depending on whether the variable we're looking up is a global or a file-specific variable, # we might need to look it up as the tuple that it is or we might need to look it up using # the first component only (and omit the second filename component) of the tuple. if variable_name in variable_natures: debug(10, 'get_value[%d]: %s: found dependencies for complete variable_name tuple (so it\'s probably a global)' % (rl, str(variable_name))) nature_key = variable_name elif variable_name[0] in variable_natures: debug(10, 'get_value[%d]: %s: found dependencies for first part of variable_name tuple (so it\'s probably file-specific)' % (rl, str(variable_name))) nature_key = variable_name[0] else: internal('get_value[%d]: %s: failed to find deps' % (rl, str(variable_name))) # Get the dependencies and the combiner function. for k in ['deps','pars','fnc']: if k not in variable_natures[nature_key]: internal('get_value: \'%s\' was not found in variable_natures[\'%s\'] (hint: did you make a type in the table?)' % (k, nature_key)) deps = variable_natures[nature_key]['deps'] pars = variable_natures[nature_key]['pars'] fnc = variable_natures[nature_key]['fnc'] # The dependencies may be tuples or they may simply be strings. If they're tuples then # we need to get the values of those tuples. If they're strings then we need to append # the filename component to it to make into a tuple. Let's fix that by converting the # *entire* list into tuples. deps = [dep if isinstance(dep,tuple) else (dep,variable_name[1]) for dep in deps] pars = [par if isinstance(par,tuple) else (par,variable_name[1]) for par in pars] # Then we need to get the values of the dependencies. We choose to do this in a list # becuause we're going to need to convert it to a parameter list in a moment. debug(10, 'get_value[%d]: getting values of dependencies ...' % (rl)) for dep in deps: get_value(rl+1, dep) debug(10, 'get_value: getting values of parameters ...') par_values = [get_value(rl+1, par) for par in pars] # We call the combiner function to get the value we're actually after. debug(10, 'get_value[%d]: %s: combining parameters by calling %s(%s) ...' % (rl, str(variable_name), fnc.__name__, ', '.join([quote_value(x) for x in par_values]))) value = fnc(rl, *par_values) # We put it in the cache. variable_values[variable_name] = value # And finally we return the value. return value def quote_value(x): return '\'%s\'' % (x) if isinstance(x, str) else str(x) def rationaliser_desc(desc): global tied_ui_regexps def helper_desc_alias(): print('%s %s' % (legend_key_fmtstr, legend_desc_fmtstr) % (':', 'new alias for that description')) desc = rationaliser_text(desc) debug(10, 'rationaliser_desc: desc=%s' % (desc)) # Normal desc (handled by 'existing alias' below) # Existing alias if not desc.endswith('*'): debug(10, 'rationaliser_desc: expanding ...') sql_statement = 'SELECT desc FROM desc_aliases WHERE alias = ?;' sql_value = (desc,) results = sqlite_exec_wrapper_qm(sql_statement, sql_value) for result in results: (desc,) = result # New alias else: debug(10, 'rationaliser_desc: new alias; asking about it ...') desc = desc.rstrip('*') # Don't add invalid stuff. if not validator_unknown_desc(desc): return desc helper = helper_desc_alias validator = validator_unknown_desc_alias rationaliser = rationaliser_new_desc_alias alias = get_answer('alias', '', validator, rationaliser, 'desc-alias', helper, always_help_flag) sql_statement = 'INSERT INTO desc_aliases VALUES (?, ?);' sql_value = (alias, desc) sqlite_exec_wrapper_qm(sql_statement, sql_value) return desc def rationaliser_serno(serno): return rationaliser_text(serno) def rationaliser_confirm_flag(confirm_flag): return rationaliser_text(confirm_flag) # Note that the coordinates are passed into this function. That means # that this function is not suitable for passing to get_answer() (since # it won't pass the additional parameter). def rationaliser_country(country, new_coords): country = rationaliser_text(country) debug(10, 'rationaliser_country: country=%s, new_coords=%s' % (country, new_coords)) # Not overridden if not country.endswith('!'): return country country = country.rstrip('!') # Don't override with invalid country debug(10, 'rationaliser_country: new override') if not validator_country(country): return country # Add override in two steps. sql_statement = 'INSERT OR IGNORE INTO place_overrides VALUES (?, ?, NULL, NULL);' sql_value = new_coords sqlite_exec_wrapper_qm(sql_statement, sql_value) sql_statement = 'UPDATE place_overrides SET country = ? WHERE latitude = ? AND LONGITUDE = ?;' sql_value = (country, new_coords[0], new_coords[1]) sqlite_exec_wrapper_qm(sql_statement, sql_value) info('override added') return country # Note that the coordinates are passed into this function. That means # that this function is not suitable for passing to get_answer() (since # it won't pass the additional parameter). def rationaliser_city(city, new_coords): city = rationaliser_text(city) debug(10, 'rationaliser_city: city=%s, new_coords=%s' % (city, new_coords)) # Not overridden if not city.endswith('!'): return city city = city.rstrip('!') # Don't override with invalid city debug(10, 'rationaliser_city: new override') if not validator_city(city): return city # Add override in two steps. sql_statement = 'INSERT OR IGNORE INTO place_overrides VALUES (?, ?, NULL, NULL);' sql_value = new_coords sqlite_exec_wrapper_qm(sql_statement, sql_value) sql_statement = 'UPDATE place_overrides SET city = ? WHERE latitude = ? AND LONGITUDE = ?;' sql_value = (city, new_coords[0], new_coords[1]) sqlite_exec_wrapper_qm(sql_statement, sql_value) info('override added') return city def rationaliser_hfts(hfts): return rationaliser_text(hfts) def rationaliser_people(people): global tied_ui_regexps if people in ['', 'nobody']: return 'nobody' rationalised_people = [] for person in re.findall(r'[^ ,]+', rationaliser_text(people)): rationalised_people.append(rationaliser_person(person)) return ' '.join(rationalised_people) def rationaliser_person(person): debug(10, 'rationaliser_person: person=%s' % (person)) # Normal person (handled by 'existing alias' below) # Existing alias if not person.endswith('*'): debug(10, 'rationaliser_person: expanding ...') sql_statement = 'SELECT person FROM person_aliases WHERE ? in (alias, person);' sql_value = (person,) results = sqlite_exec_wrapper_qm(sql_statement, sql_value) unaliased_person = None for result in results: (unaliased_person,) = result person = unaliased_person if unaliased_person is not None else '%s?' % (person) # New alias else: person = person.rstrip('*') # Don't add invalid stuff. if not validator_unknown_person(person): return person def rationaliser_person_alias(x): if x == '': return person else: return rationaliser_new_person_alias(x) def helper_person_alias(): print('%s %s' % (legend_key_fmtstr, legend_desc_fmtstr) % (':', 'new alias for that person')) debug(10, 'rationaliser_person: new alias; asking about it ...') helper = helper_person_alias validator = validator_unknown_person_alias rationaliser = rationaliser_person_alias alias = get_answer('alias for %s (or ENTER to register them without an alias)' % (person), '', validator, rationaliser, 'person-alias', helper, always_help_flag) sql_statement = 'INSERT INTO person_aliases VALUES (?, ?);' sql_value = (alias, person) sqlite_exec_wrapper_qm(sql_statement, sql_value) return person def get_override_city_from_new_coords_silent(rl, new_coords): if new_coords is None: return None sql_statement = 'SELECT city FROM place_overrides WHERE latitude = ? AND longitude = ?;' sql_value = new_coords results = sqlite_exec_wrapper_qm(sql_statement, sql_value) override_city = None # Note that this loop runs zero or once only. If zero then coords is not expanded # and if once then there is no need to break out of the loop. for result in results: (override_city,) = result return override_city def get_override_country_from_new_coords_silent(rl, new_coords): if new_coords is None: return None sql_statement = 'SELECT country FROM place_overrides WHERE latitude = ? AND longitude = ?;' sql_value = new_coords results = sqlite_exec_wrapper_qm(sql_statement, sql_value) override_country = None # Note that this loop runs zero or once only. If zero then coords is not expanded # and if once then there is no need to break out of the loop. for result in results: (override_country,) = result return override_country def rationaliser_coords(coords): global tied_ui_regexps coords = rationaliser_text(coords) # Normal coordinates if not coords.endswith('*') and len(coords) > 0 and not coords[0].isalpha(): pass # Existing alias elif not coords.endswith('*'): sql_statement = 'SELECT latitude, longitude FROM coords_aliases WHERE alias = ?;' sql_value = (coords,) results = sqlite_exec_wrapper_qm(sql_statement, sql_value) unaliased_coords = None # Note that this loop runs zero or once only. If zero then coords is not expanded # and if once then there is no need to break out of the loop. for result in results: unaliased_coords = '%s,%s' % (result) # If can't expand/verify then let validator inform user. if unaliased_coords is None: return coords coords = unaliased_coords # New alias else: coords = coords.rstrip('*') # Don't add invalid stuff. if not validator_coords(coords): return coords def helper_coords_alias(): print('%s %s' % (legend_key_fmtstr, legend_desc_fmtstr) % (':', 'new alias for those coords')) helper = helper_coords_alias validator = validator_new_coords_alias rationaliser = rationaliser_new_coords_alias alias = get_answer('alias', '', validator, rationaliser, 'coords-alias', helper, always_help_flag) sql_statement = 'INSERT INTO coords_aliases VALUES (?, ?, ?);' sql_value = (alias, *[float(x) for x in coords.split(',')]) sqlite_exec_wrapper_qm(sql_statement, sql_value) debug(10, 'rationaliser_coords: coords=%s' % (coords)) return coords def rationaliser_new_coords_alias(new_coords_alias): return rationaliser_single_word(new_coords_alias) def rationaliser_new_person_alias(new_person_alias): return rationaliser_single_word(new_person_alias) def rationaliser_new_desc_alias(new_desc_alias): return rationaliser_single_word(new_desc_alias) def validator_new_coords_alias(new_coords_alias): sql_statement = 'SELECT COUNT(*) FROM coords_aliases WHERE alias = ?;' sql_value = (new_coords_alias,) sql_results = sqlite_exec_wrapper_qm(sql_statement, sql_value).fetchall() (count,) = sql_results[0] return count == 0 def validator_unknown_person_alias(new_person_alias): sql_statement = 'SELECT COUNT(*) FROM person_aliases WHERE alias = ?;' sql_value = (new_person_alias,) sql_results = sqlite_exec_wrapper_qm(sql_statement, sql_value).fetchall() (count,) = sql_results[0] return count == 0 def validator_unknown_desc_alias(new_desc_alias): sql_statement = 'SELECT COUNT(*) FROM desc_aliases WHERE alias = ?;' sql_value = (new_desc_alias,) sql_results = sqlite_exec_wrapper_qm(sql_statement, sql_value).fetchall() (count,) = sql_results[0] return count == 0 def rationaliser_text(text): # Note that split() splits on any amount of space (except zero space) *and* strips leading # and trailing space first. return ' '.join(text.split()).lower() def validator_hfts(hfts): global tied_ui_regexps debug(10, 'validator_hfts: hfts=%s' % (hfts)) return True if re.search(tied_ui_regexps['hfts'], hfts) else False def validator_country(country): global tied_ui_regexps return True if re.search(tied_ui_regexps['country'], country) else False def validator_city(city): global tied_ui_regexps return True if re.search(tied_ui_regexps['city'], city) else False def validator_desc(desc): if len(desc) == 0: warning('can\'t have empty description') return False return True def validator_serno(serno): global tied_ui_regexps return True if re.search(tied_ui_regexps['serno'], serno) else False def validator_confirm_flag(confirm_flag): global tied_ui_regexps return True if re.search(tied_ui_regexps['confirm_flag'], confirm_flag) else False def validator_known_people(people): if len(people) == 0: internal('validator_known_people: empty string but the rationaliser should have changed that to \'nobody\'') if len(people.split()) > 1 and 'nobody' in people.split(): warning('can\'t have somebody and nobody (hint: either remove \'nobody\' or remove everybody else') return False for person in people.split(): if validator_known_person(person) == False: return False return True def validator_unknown_person(person): if not validator_single_word(person): return False sql_statement = 'SELECT COUNT(*) FROM person_aliases WHERE person = ?;' sql_value = (person,) sql_results = sqlite_exec_wrapper_qm(sql_statement, sql_value).fetchall() (count,) = sql_results[0] return count == 0 def validator_known_person(person): if person == 'nobody': return True sql_statement = 'SELECT COUNT(*) FROM person_aliases WHERE person = ?;' sql_value = (person,) sql_results = sqlite_exec_wrapper_qm(sql_statement, sql_value).fetchall() (count,) = sql_results[0] return count == 1 def validator_known_desc(desc): sql_statement = 'SELECT COUNT(*) FROM desc_aliases WHERE desc = ?;' sql_value = (desc,) sql_results = sqlite_exec_wrapper_qm(sql_statement, sql_value).fetchall() (count,) = sql_results[0] return count == 1 def validator_unknown_desc(desc): sql_statement = 'SELECT COUNT(*) FROM desc_aliases WHERE desc = ?;' sql_value = (desc,) sql_results = sqlite_exec_wrapper_qm(sql_statement, sql_value).fetchall() (count,) = sql_results[0] return count == 0 def validator_single_word(word): global tied_ui_regexps return True if re.search(tied_ui_regexps['international_word'], word) else False def rationaliser_single_word(word): return rationaliser_text(word) def validator_coords(coords): global tied_ui_regexps return True if re.search(tied_ui_regexps['coords'], coords) else False def lookup_country_and_city(coords): global city_distance_tolerance if coords is None: internal('lookup_country_and_city: passed None') try: x = db_file except: internal('lookup_country_and_city: db_file undefined') sql_statement = ''' SELECT name, countrycode, latitude, longitude FROM places WHERE featurecode LIKE \'PPL%%\' ORDER BY (latitude-?)*(latitude-?)+(longitude-?)*(longitude-?) LIMIT 1; ''' sql_value = (float(coords[0]), float(coords[0]), float(coords[1]), float(coords[1])) debug(10, 'lookup_country_and_city: running query ...') ((nearest_city_name, nearest_country_name, nearest_city_latitude, nearest_city_longitude),) = sqlite_exec_wrapper_qm(sql_statement, sql_value) nearest_city_distance = geopy.distance.geodesic((coords), (nearest_city_latitude,nearest_city_longitude)).km debug(10, 'lookup_country_and_city: nearest_city_name=%s, nearest_country_name=%s, nearest_city_latitude=%.6f, nearest_city_longitude=%.6f, nearest_city_distance=%.3f' % (nearest_city_name.lower(), nearest_country_name.lower(), float(nearest_city_latitude), float(nearest_city_longitude), nearest_city_distance)) # If not close enough then pretend we didn't look. if nearest_city_distance <= city_distance_tolerance: country_and_city = { 'country': nearest_country_name.lower(), 'city': nearest_city_name.lower() } else: country_and_city = None return country_and_city def sqlite_exec_wrapper_qm(statement, value): global cursor return cursor.execute(statement, value) def sqlite_exec_wrapper(statement): global cursor return cursor.execute(statement) def get_hfts_from_ts(ts): return '%04d%02d%02d%02d%02d%02d' % (time.localtime(ts)[:6]) def get_dhms_from_s(s): if s >= 2*3600*24: return '%d days %02d:%02d:%02d' % (int(s/(3600*24)), int((s%(3600*24))/3600), int((s%3600)/60), s%60) elif s >= 3600*24: return '%d day %02d:%02d:%02d' % (int(s/(3600*24)), int((s%(3600*24))/3600), int((s%3600)/60), s%60) else: return '%02d:%02d:%02d' % (int((s%(3600*24))/3600), int((s%3600)/60), s%60) def get_ts_from_hfts(hfts): if hfts is None: return None m = re.search('^(\d{4})(\d{2})(\d{2})(\d{2})(\d{2})(\d{2})$', hfts) if not m: return None return int(time.mktime(tuple([ int(x) for x in m.group(1,2,3,4,5,6) ] + [0, 0, -1]))) # gpxpy stores times as strings like '2019-09-06 09:21:45+00:00'. This is not # the same as the GPX file itself contains. We call this format gpxpy-formatted-timestamp, # or gpxpyts for short. def get_ts_from_gpxpyts(gpxpyts): return int(calendar.timegm(time.strptime(str(gpxpyts)+' UTC', '%Y-%m-%d %H:%M:%S+00:00 %Z'))) def get_ts_from_xfts(xfts): if xfts is None: return None m = re.search('^(\d{4}):(\d{2}):(\d{2}) (\d{2}):(\d{2}):(\d{2})$', xfts) if not m: internal('get_hfts_from_xfts: %s: unable to convert xfts to hfts' % (xfts)) return int(time.mktime(tuple([ int(x) for x in m.group(1,2,3,4,5,6) ] + [0, 0, -1]))) def get_xfts_from_ts(ts): return '%04d:%02d:%02d %02d:%02d:%02d' % (time.localtime(ts)[:6]) #def get_xfts_from_hfts(hfts): # if hfts is None: # return None # m = re.search('^(\d{4})(\d{2})(\d{2})(\d{2})(\d{2})(\d{2})$', hfts) # if not m: # internal('get_xfts_from_hfts: %s: unable to convert xfts to hfts' % (xfts)) # return '%s:%s:%s %s:%s:%s' % m.group(1,2,3,4,5,6) def show_help_places(): sys.stdout.write('How to build a places database\n') sys.stdout.write('------------------------------\n') sys.stdout.write('\n') sys.stdout.write('''\ 0) Create a new temporary directory and cd into it, e.g.: mkdir ~/places cd ~/places 1) Download country-specific ZIP files from http://download.geonames.org/export/dump/, e.g.: CS="de fr gb cz sk at hu dk se" for C in $CS; do wget -q http://download.geonames.org/export/dump/${C^^}.zip done 2) Unzip them all, e.g.: for Z in ??.zip; do unzip -o $Z done rm -f readme.txt 3) Check each file's contents can be loaded - as is - into SQL, e.g.: for T in *.txt; do echo "$T ..." >&2 { echo "CREATE TABLE places (geonameid int, name char, asciiname char, alternatenames char, latitude float, longitude float, featureclass char, featurecode char, countrycode char, cc2 char, admin1code char, admin2code char, admin3code char, admin4code char, population int, elevation int, dem char, timezone char, modificationdate char, primary key(geonameid));" echo ".import $T places" } | sqlite3 -separator $\'\\t' done 4) Any errors that the previous command displayed need to be fixed by editing the appropriate .txt file. (Typically, that means deleting any double-quotes.) After doing that, rerun the previous command to check that the errors have been fixed. 5) Create the database for real by running: { echo "CREATE TABLE places (geonameid int, name char, asciiname char, alternatenames char, latitude float, longitude float, featureclass char, featurecode char, countrycode char, cc2 char, admin1code char, admin2code char, admin3code char, admin4code char, population int, elevation int, dem char, timezone char, modificationdate char, primary key(geonameid));" for TXT in *.txt; do echo ".import $TXT places" done } | sqlite3 -separator $\'\\t' %s.sqlite 6) The database needs to be moved to %s, but you might want to give it a name that indicates what countries it contains and then symlink that into the required place, e.g.: mkdir -p ~/.%s mv %s.sqlite ~/.%s/%s-${CS// /-}.sqlite ln -s %s-${CS// /-}.sqlite ~/.%s/%s.sqlite ''' % (progname, db_file, progname, progname, progname, progname, progname, progname, progname)) sys.exit(0) def get_answer(prompt, default, validator, rationaliser, thing_name, helper, always_help_flag): debug(100, 'get_answer: default=%s, type(default)=%s' % (default, type(default))) debug(100, 'get_answer: prompt=%s, default=%s, validator=%s, rationaliser=%s, thing_name=%s' % (prompt, default, validator.__name__ if validator is not None else validator, rationaliser.__name__ if rationaliser is not None else rationaliser, thing_name)) def readline_input(): def readline_start_hook(): debug(100, 'readline_start_hook: inserting \'%s\' ...' % (default)) # Insert encoding of unicode variable into readline buffer, encoded according to the # environment in which it will shortly be displayed. readline.insert_text(default) readline.set_startup_hook(readline_start_hook) try: # raw_input() returns a byte stream representing the users input, encoded according # the environment it was read from. Convert this to unicode for internal usage. rc = input('%s: ' % (prompt)) readline.set_startup_hook() return rc except KeyboardInterrupt: print('^C') warning('signal received; cleaning up and exiting ...') sys.exit(1) def pop_history(): if readline.get_current_history_length() > 0: last_history_item = readline.get_history_item(readline.get_current_history_length()) readline.remove_history_item(readline.get_current_history_length()-1) return last_history_item else: return None def get_history(): return [ readline.get_history_item(x) for x in range(1,readline.get_current_history_length()+1) ] def my_dirname(thing): # Hack os.path.dirname() idiosyncracy: while 'a/b' returns 'a', 'a' returns '' instead of '.'. # See https://stackoverflow.com/questions/7783308/os-path-dirname-file-returns-empty return os.path.dirname(thing) or '.' # Load history debug(100, 'get_answer: loading history ...') history_file = '%s/.%s/history-%s' % (os.environ['HOME'], progname, thing_name) readline.clear_history() try: readline.read_history_file(history_file) except IOError: pass debug(100, 'get_answer: after loading history, history is: %s' % (get_history())) # Work out default value, based on whether a default was passed. If that's # the same as the last item in the history then adjust the history accordingly. debug(100, 'get_answer: before default processing, default=%s and history=%s' % (default, get_history())) debug(100, 'get_answer: popping history ...') last_history_item = pop_history() debug(100, 'get_answer: popped %s' % (last_history_item)) if default is None: debug(100, 'get_answer: default is none, so we\'ll use last_history_item as the default') debug(100, 'get_answer: last_history_item=%s, type(last_history_item)=%s' % (last_history_item, type(last_history_item))) default = last_history_item elif default == last_history_item: debug(100, 'get_answer: default and last_history_item are the same, so there\'s nothing to do (remember we\'ve popped it off the history anyway)') pass elif last_history_item is not None: debug(100, 'get_answer: default and last_history_item differ, so putting last_history_item back ...') readline.add_history(last_history_item) debug(100, 'get_answer: after default processing, default=%s and history=%s' % (default, get_history())) # Ask the question, get the answer. If the answer is bad then clear the bad answer # from the history and ask again. If the answer is '?' then call help function and # clear '?' from the history and ask again. debug(100, 'get_answer: looping until we have a valid answer ...') while True: if always_help_flag: helper() debug(100, 'get_answer: before prompting, history=%s' % (get_history())) ans = readline_input() debug(100, 'get_answer: after calling readline_input(), ans=%s' % (ans)) if not always_help_flag and ans == '?': debug(100, 'get_answer: deleting question mark from history and calling help function ...') pop_history() if not callable(helper): internal('get_answer: helper %s is not callable' % (helper)) helper() else: if rationaliser is None: rationalised_ans = ans else: rationalised_ans = rationaliser(ans) debug(10, 'get_answer: ans=%s, rationalised_ans=%s' % (ans, rationalised_ans)) if validator is None or validator(rationalised_ans) == True: break # This warning used to report 'ans' was invalid, but I changed this # to report than 'rationalised_ans' is invalid. This results in # in warnings like 'faked-country: invalid country' rathe than # 'o: invalid country' (where 'o' is the response to use # the (currently faked) country from the overrides database. warning('%s: invalid %s; reasking ...' % (rationalised_ans, thing_name)) # Remember that empty reponses don't get added to history. if ans != '': debug(100, 'get_answer: deleting invalid and non-empty response from history ...') pop_history() debug(100, 'get_answer: after prompting, history=%s' % (get_history())) # If a history was popped in order to provide a default value, but then the # value actually entered wasn't the default, then the default is now missing # from the history, so we need to put it back. Actually, let's simplify this: # if the default (which at this point we're certain is not None) and the # provided answer differ, then put the default back in the history, but put # it before the provided entry, which has already entered the history. if ans != '' and default is not None and ans != default: debug(100, 'get_answer: ans=%s, type(ans)=%s' % (ans, type(ans))) debug(100, 'get_answer: default=%s, type(default)=%s' % (default, type(default))) debug(100, 'get_answer: default (%s) (type %s) and response (%s) (type %s) differ, so reinserting default in history ...' % (default, type(default), ans, type(ans))) last_history_item = pop_history() # Convert unicode variable for storage in history file. We use the local standard encoding # because things can go from the history to the screen without passing through this script # (e.g. when the use rejects the default value and presses up-arrow a few times. readline.add_history(default) readline.add_history(ans) debug(100, 'get_answer: after reinserting default, history=%s' % (get_history())) #if should_temporarily_remove_last_history_item_flag: # if ans != '': # readline.remove_history_item(readline.get_current_history_length()-1) # # Hardcoded defaults may be None # if default is not None and default != '': # readline.add_history(default) # if ans != default and ans != '': # readline.add_history(ans) # Deduplicate history (before writing it). deduplicated_history_items = [] # Note that we do it from the end back, # i.e. we delete the *earlier* of the # identical values, not the later. We do this so that history ABCDC keeps C as # the most recently entered item. The old code walked through the list in the # normal direction and appended each non-duplicate (or first of the duplicates) # to a new list: # # for history_item in [ readline.get_history_item(x) for x in range(1,readline.get_current_history_length()+1) ]: # if history_item not in deduplicated_history_items: # deduplicated_history_items.append(history_item) # # But now we walk the list backwards, so we can't *append* because although # we want the most recent of identical values to be the preserved one, we # still want the order to be the same as normal), so we need to *prepend*. # This way the order comes out as in the original code. for history_item in [ readline.get_history_item(x) for x in range(readline.get_current_history_length(),0,-1) ]: if history_item not in deduplicated_history_items: deduplicated_history_items.insert(0,history_item) readline.clear_history() for history_item in deduplicated_history_items: readline.add_history(history_item) debug(100, 'get_answer: after deduplicating history, history is: %s' % ([ readline.get_history_item(x) for x in range(1,readline.get_current_history_length()+1) ])) # Write history to file. history_dir = my_dirname(history_file) if not os.path.isdir(history_dir): os.makedirs(history_dir) readline.write_history_file(history_file) return rationalised_ans def debug(level, text): global verboselevel if verboselevel < level: return msg_writer("DEBUG[%s]: %s" % (level, text), syslog.LOG_DEBUG) def info(text): global verboselevel if verboselevel < 3: return msg_writer("INFO: %s" % (text), syslog.LOG_INFO) def warning(text): global verboselevel if verboselevel < 2: return msg_writer("WARNING: %s" % (text), syslog.LOG_WARNING) def error(text): msg_writer("ERROR: %s" % (text), syslog.LOG_ERR) sys.exit(1) def internal(text): msg_writer("INTERNAL ERROR: %s" % (text), syslog.LOG_CRIT) sys.exit(2) def msg_writer(text, syslog_level): msg_writer_stderr(text) if not sys.stderr.isatty(): msg_writer_syslog(text, syslog_level) def msg_writer_stderr(text): global progname sys.stderr.write("%s: %s\n" % (progname, text)) def msg_writer_syslog(text, syslog_level): global syslog_opened_flag, progname if not syslog_opened_flag: if sys.version_info < (2, 7): syslog.openlog(progname, syslog.LOG_PID) else: syslog.openlog(progname, logoption=syslog.LOG_PID) syslog_opened_flag = True syslog.syslog((syslog.LOG_LOCAL0|syslog_level), text) # sig and frame variables have underscore-prefix to prevent vulture # reporting about them. def signal_handler(_sig, _frame): sys.stderr.write('^C\n') sys.exit(1) class MyTCPServerClass(socketserver.TCPServer): # When this script exits it'll kill the server, but that doesn't mean # that the socket that was created is *immediately* disposed of. In order # to allow the script to be immediately re-run, then we need to set a # property on the socket. allow_reuse_address = True # Stop server writing to terminal. logging = False class MySimpleHTTPRequestHandlerClass(http.server.SimpleHTTPRequestHandler): # Overload my own logging function, which consults an object attribute before # actually logging. def log_message(self, format, *args): if self.server.logging: http.server.SimpleHTTPRequestHandler.log_message(self, format, *args) # Entry point if __name__=="__main__": main()