#!/usr/bin/python3 -u import subprocess import sys import re import os import time import syslog import itertools import statistics import json import pickle import socket import signal import base64 import shutil sys.path.append(subprocess.Popen(['miniade', '--dirname'], stdout=subprocess.PIPE, universal_newlines=True).communicate()[0].rstrip()) import miniade # Globals modroot = os.path.realpath(sys.argv[0] + '/../..') syslog_opened_flag = False data = {} current_data_key = None max_message_length = 4096 letters = [ chr(ord('a')+i) for i in range(0,26) ] def main(): global modroot, verboselevel, word_length, guess_dictionary, solution_dictionary, output_file global socket_path, mode, control_message, lock_file # Defaults for options verboselevel = 2 initial_guesses = [] output_file = '/tmp/%s.out' % (miniade.get_progname()) word_length = 5 guess_dictionary = None solution_dictionary = None output_file = '-' mode = 'client_solve' # Process options def special_options_handler(): global guess_dictionary, solution_dictionary, word_length, output_file, mode, control_message if sys.argv[1].startswith('--guess-dict='): guess_dictionary = sys.argv[1][len('--guess-dict='):] if sys.argv[1].startswith('--solution-dict='): solution_dictionary = sys.argv[1][len('--solution-dict='):] elif sys.argv[1].startswith('--length='): word_length = int(sys.argv[1][len('--length='):]) elif sys.argv[1].startswith('--output='): output_file = sys.argv[1][len('--output='):] elif sys.argv[1] == '--server-start': mode = 'server_start' elif sys.argv[1] == '--server-stop': mode = 'server_stop' elif sys.argv[1].startswith('--control='): mode = 'client_control' control_message = sys.argv[1][len('--control='):] else: return False return True miniade.process_options(special_options_handler, help) # Process arguments if len(sys.argv) != 1: usage() # Sanity checks and derivations if mode is None: usage() if guess_dictionary is None: guess_dictionary = '%s/etc/guess-words-%d.txt' % (modroot, word_length) if solution_dictionary is None: solution_dictionary = '%s/etc/solution-words-%d.txt' % (modroot, word_length) socket_path = '/tmp/%s-%d.sock' % (miniade.get_progname(), word_length) lock_file = '/tmp/%s-%d.lock' % (miniade.get_progname(), word_length) # Guts if mode == 'client_solve': client_solve() elif mode == 'server_start': server_start() elif mode == 'server_stop': server_stop() elif mode == 'client_control': client_control(control_message) def help(): print('Usage: %s [ ] [ --length= ] [ --server-start [ --output= ] [ --dict= ] | --server-stop ]' % (miniade.get_progname())) sys.exit(0) def server_stop(): # client_control() will check server is running so no need to do that here. client_control('QUIT') def client_control(control_message): global lock_file # Sanity checks and derivations if not miniade.lock_is_fresh(lock_file): miniade.error('server not running') # Guts sys.stdout.write('%s\n' % (send_message_to_server_and_read_reply(control_message, []))) def client_solve(): global all_guess_words, lock_file # Sanity checks and derivations if not miniade.lock_is_fresh(lock_file): miniade.error('server not running') miniade.info('loading all solution words from %s ...' % (solution_dictionary)) try: fp = open(solution_dictionary,'r') except: miniade.error('%s: file not found' % (solution_dictionary)) remaining_potential_solution_words = fp.read().split('\n')[0:-1] fp.close() if output_file != '-': output_handle = open(output_file, 'w') else: output_handle = sys.stdout # Guts guess_sequence = [] response_sequence = [] while True: if len(remaining_potential_solution_words) == 0: miniade.error('based on responses, no words are possible!') elif len(remaining_potential_solution_words) == 1: guess_word = remaining_potential_solution_words[0] else: miniade.info('getting best guess; this can take some time ...') guess_word = send_message_to_server_and_read_reply('GETBESTGUESSWORD', [remaining_potential_solution_words, response_sequence]) # Issue guess, read response, break out of guessing loop if correct. sys.stdout.write('%s (%d different words are possible)\n' % (guess_word, len(remaining_potential_solution_words))) guess_sequence.append(guess_word) try: response = input() except (EOFError, KeyboardInterrupt): break miniade.debug(10, 'client_solve: received response \'%s\'' % (response)) if response == '?': miniade.debug(10, 'client_solve: remaining_potential_solution_words=%s' % (remaining_potential_solution_words)) continue p = re.search('^([byg]{%d}).*$' % (word_length), response) if not p: sys.stdout.write('%s: invalid guess (case #1)\n' % (repr(response))) continue response = p.group(1) response_sequence.append(response) if response == 'g' * word_length: break elif len(remaining_potential_solution_words) == 1: miniade.error('I think there was only one possible word (%s) but you\'re telling me it\'s not that word!' % (remaining_potential_solution_words[0])) # Filter the list of potential set words. # Beware that earlier, at the point we requested # the best guess, response_sequence contained # only the sequences that up to *that* time we # had had (e.g. at startup this would be []), but # because we have now had a response, to the # guess we just made, response_sequence is no # longer as it was. But the cached data we want # to now access is that that is indexed by that # previous key ([]) and not the just-appended # one (['some-response-letters']), so we need # to strip off that most recently added one # in the lookup, hence the '[:-1]' in this next # statement. remaining_potential_solution_words = send_message_to_server_and_read_reply('GETPOTENTIALSETWORDS', [remaining_potential_solution_words, guess_word, response_sequence[:-1], response]) output_handle.write('Guesses: %s (%d)\n' % (', '.join(guess_sequence), len(guess_sequence))) if output_file != '-': output_handle.close() def send_message_to_server_and_read_reply(command, argument_list): global socket_path, max_message_length # Prepare message message = { 'command':command, 'argument_list':argument_list } # Connect to server conn = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) try: conn.connect(socket_path) except FileNotFoundError: miniade.error('socket not found') except ConnectionRefusedError: miniade.error('connection to server refused') miniade.debug(10, 'send_message_to_server_and_read_reply: connection established') # Encode and send message miniade.debug(10, 'send_message_to_server_and_read_reply: json-ing, EOL-ing and byte-ifying message %s ...' % (message)) message = (json.dumps(message) + '\r').encode() miniade.debug(10, 'send_message_to_server_and_read_reply: sending json-ed EOL-ed message ...') conn.send(message) # Await reply fragments and assembly into reply miniade.debug(10, 'send_message_to_server_and_read_reply: sent message; awaiting reply ...') reply = b'' while True: reply_fragment = conn.recv(max_message_length) if not reply_fragment: miniade.error('server disconnected before sending reply') if len(reply_fragment) == 0: miniade.internal('reply fragment is zero-sized! (hint: look up what that means)') reply += reply_fragment # Break out of reply assembly loop if *stringified* reply has EOL if reply.decode()[-1] == '\r': break miniade.debug(10, 'send_message_to_server_and_read_reply: reply_fragment isn\'t EOL-ed; assuming partial reply and looping ...') # Decode reply miniade.debug(10, 'send_message_to_server_and_read_reply: stringifying, un-json-ing and un-EOL-ing reply ...') reply = json.loads(reply.decode()[0:-1]) miniade.debug(10, 'send_message_to_server_and_read_reply: un-json-ed un-EOL-ed reply: %s' % (reply)) # Disconnect from server miniade.debug(10, 'send_message_to_server_and_read_reply: closing connection ...') conn.close() # Pass reply up return reply def sigusr1_handler(signum, frame): miniade.set_verboselevel(miniade.get_verboselevel()+1) miniade.debug(2, 'sigusr1_handler: verboselevel is now %d' % (miniade.get_verboselevel())) def sigusr2_handler(signum, frame): miniade.set_verboselevel(max([miniade.get_verboselevel()-1,0])) miniade.debug(2, 'sigusr2_handler: verboselevel is now %d' % (miniade.get_verboselevel())) def server_start(): global modroot, word_length, max_message_length, socket_path, lock_file, all_guess_words # Sanity checks and derivations signal.signal(signal.SIGUSR1, sigusr1_handler) signal.signal(signal.SIGUSR2, sigusr2_handler) miniade.info('loading all valid words from %s ...' % (guess_dictionary)) try: fp = open(guess_dictionary,'r') except: miniade.error('%s: file not found' % (guess_dictionary)) all_guess_words = fp.read().split('\n')[0:-1] fp.close() # Lock if not miniade.lock(lock_file): miniade.error('failed to lock') # Start listening if os.path.exists(socket_path): os.remove(socket_path) server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) server.bind(socket_path) miniade.debug(10, 'server_start: setting backlog ...') server.listen() # set backlog (to default) # Process messages until told to exit quit_flag = False while not quit_flag: # Await client connection miniade.debug(10, 'server_start: awaiting incoming connection ...') conn, addr = server.accept() miniade.debug(10, 'server_start: connection established') # Await message fragments and assembly into message miniade.debug(10, 'server_start: awaiting message ...') message = b'' outer_loop_should_continue = False while True: message_fragment = conn.recv(max_message_length) if not message_fragment: miniade.warning('client disconnected before sending message') outer_loop_should_continue = True break if len(message_fragment) == 0: miniade.internal('message fragment is zero-sized! (hint: look up what that means)') miniade.debug(10, 'server_start: message_fragment=%s' % (message_fragment)) message += message_fragment # Break out of message assembly loop if *stringified* message has EOL if message.decode()[-1] == '\r': break miniade.debug(10, 'server_start: message_fragment isn\'t EOL-ed; assuming partial reply and looping ...') if outer_loop_should_continue: continue # Decode message miniade.debug(10, 'server_start: stringifying, un-json-ing and un-EOL-ing message ...') message = json.loads(message.decode()[0:-1]) miniade.debug(10, 'server_start: un-json-ed un-EOL-ed message: %s' % (message)) # Process message and prepare reply if 'command' not in message: miniade.warning('message missing \'command\' tag') continue if 'argument_list' not in message: miniade.warning('message missing \'argument_list\' tag') continue if message['command'] == 'GETBESTGUESSWORD': reply = handle_request_for_guess_word(*message['argument_list']) elif message['command'] == 'GETPOTENTIALSETWORDS': reply = handle_request_for_remaining_potential_solution_words(*message['argument_list']) elif message['command'] == 'QUIT': quit_flag = True reply = 'OK' else: miniade.internal('%s: bad/unhandled command' % (message['command'])) # Encode and send reply miniade.debug(10, 'server_start: json-ing, EOL-ing and byte-ifying reply %s ...' % (reply)) reply = (json.dumps(reply) + '\r').encode() miniade.debug(10, 'server_start: sending json-ed EOL-ed reply ...') conn.send(reply) # Disconnect from client miniade.debug(10, 'server_start: closing connection ...') conn.close() # Unlock miniade.unlock(lock_file) def handle_request_for_guess_word(remaining_potential_solution_words, response_sequence): global data get_data(remaining_potential_solution_words, response_sequence) return data['best_guess'] def handle_request_for_remaining_potential_solution_words(remaining_potential_solution_words, guess_word, response_sequence, response): global data get_data(remaining_potential_solution_words, response_sequence) return data['guesses'][guess_word]['remaining_potential_solution_words'][response] def get_data(remaining_potential_solution_words, response_sequence): global data, current_data_key, word_length, all_guess_words, letters # Check if data already in memory is the data we need. if current_data_key is not None and current_data_key == repr(response_sequence): return # Check if data loadable from disk. data = load_data(response_sequence) if len(data) > 0: current_data_key = repr(response_sequence) return miniade.debug(10, 'get_data: data for response_sequence %s not in memory or on disk; calculating ...' % (response_sequence)) before_time = time.time() # To generate a subcache, we consider all possible valid words, not # only those that fit given what's been discovered (e.g. if guess # 'oat' solicited response 'bbb' then we might still find it useful # to consider what the response to 'tub' might be even though it # cannot be the solution word), and for each of those we consider # all possible reponses (i.e. 'bbbbb' through to 'ggggg') and for # each of those we work out the list of still-possible solution # words. data = {} data['guesses'] = {} for i in range(0,len(all_guess_words)): guess_word = all_guess_words[i] miniade.debug(5, 'get_data: getting wert for past actual guesses %s and potential guess \'%s\' ...' % (response_sequence, guess_word)) data['guesses'][guess_word] = {} data['guesses'][guess_word]['remaining_potential_solution_words'] = {} for possible_response in [''.join(x) for x in itertools.product('byg', repeat=word_length)]: data['guesses'][guess_word]['remaining_potential_solution_words'][possible_response] = [] for potential_solution_word in remaining_potential_solution_words: if get_response(potential_solution_word, guess_word) == possible_response: data['guesses'][guess_word]['remaining_potential_solution_words'][possible_response].append(potential_solution_word) # Now, for each possible guess we might make, we work out that # guess's maximum-number-of-subsequently-possible-words-across-all-possible-responses-to-that-guess. # We call that value the 'wert' (I couldn't think of a better word). This # value also forms part of the cache, as it's slightly expensive to determine. data['guesses'][guess_word]['wert'] = max([len(data['guesses'][guess_word]['remaining_potential_solution_words'][possible_response]) for possible_response in data['guesses'][guess_word]['remaining_potential_solution_words']]) # Now we have the subcache that relates to the sequence of responses given so far, # we pick the guess that has the lowest 'wert' (i.e .the lowest # maximum-number-of-subsequently-possible-words-across-all-possible-responses-to-that-guess). miniade.debug(10, 'get_data: working out best guess ...') best_guesses = [] # Could look up wert of best_guess via best_guess but this makes code cleaner. best_wert = None for i in range(0,len(all_guess_words)): guess_word = all_guess_words[i] # If the wert is zero that means *no* words would subsequently be possible. # Which means (1) a 'bbbbb' response is guaranteed (not a problem in itself # and something that can happen even with other guesses), (2) we would wind # up looking at an empty subcache (be it loaded or generated) next loop # and the program would crash. We need to exclude these cases because # zero *would* otherwise be the lowest maximum-number-of-subsequently-possible-words-across-all-possible-responses-to-that-guess, which would make it an attractive guess to make. if data['guesses'][guess_word]['wert'] == 0: pass # First time round accept the first word as the best word. if best_wert is None or data['guesses'][guess_word]['wert'] < best_wert: best_guesses = [ guess_word ] best_wert = data['guesses'][guess_word]['wert'] elif data['guesses'][guess_word]['wert'] == best_wert: best_guesses.append(guess_word) miniade.debug(10, 'get_data: %d best guess(es) with wert %d are %s' % (len(best_guesses), best_wert, best_guesses)) # If there are words in best_guesses that # are actually also in the remaining remaining_potential_solution_words then # use those as the best guesses: we might get lucky and it # be one of those words! if len(list(set(best_guesses) & set(remaining_potential_solution_words))) > 0: # Note that we don't set best_guesses to that 'list(set(...) & set(...))' # because that does not provide consistent results across invocations # (remember sets are not ordered and see the comment by phil_b # at https://stackoverflow.com/questions/3697432/how-to-find-list-intersection) best_guesses = [guess for guess in best_guesses if guess in remaining_potential_solution_words] # To get the one with # the highest more-words-that-have-same-character-in-same-position # scores. First initialise to {'a':[0,0,0,0,0], 'b':[0,0,0,0,0], ... }. # ('polap' is 'popularity of letter at position'.) potential_solution_word_polaps = { letter:[0 for i in range(0,word_length)] for letter in letters } # Then, for each potential word, bump score of it's first letter # in first position, second letter in second position, etc. for potential_solution_word in remaining_potential_solution_words: for i in range(0,word_length): potential_solution_word_polaps[potential_solution_word[i]][i] += 1 miniade.debug(10, 'get_data: potential_solution_word_polaps=%s' % (potential_solution_word_polaps)) # That's the reference table done. Now we need to work out the score # for each of the best_guesses to see which is the # best guess. best_guesses = list(sorted(best_guesses, key=lambda word: get_normalised_polap(word, potential_solution_word_polaps), reverse=True)) miniade.debug(10, 'get_data: %d best guess(es) with wert %d and sorted by polap are %s' % (len(best_guesses), best_wert, best_guesses)) data['best_guess'] = best_guesses[0] # Note what data in memory pertains to. current_data_key = repr(response_sequence) save_data(data, response_sequence) after_time = time.time() miniade.debug(10, 'get_data: data generation for response sequence %s took %.2fs' % (response_sequence, after_time-before_time)) def get_normalised_polap(word, potential_solution_word_polaps): global letters # Sum up the scores of the letters in their positions for this particular word. # For 3 letters, squaring the popap didn't make any difference to the stats. That's # a bit surprising, but perhaps it will with four. return sum([ potential_solution_word_polaps[word[i]][i] for i in range(0,word_length) ]) def load_data(response_sequence): data_dir = '%s/var/cache/%s-%d.split-pickle' % (modroot, miniade.get_progname(), word_length) data_file = '%s/%s' % (data_dir, base64.b16encode(repr(response_sequence).encode()).decode().lower()) if not os.access(data_file, os.R_OK): return {} miniade.debug(10, 'load_data: loading data for response_sequence %s from disk ...' % (response_sequence)) fp = open(data_file, 'rb') data = pickle.load(fp) fp.close() return data def save_data(data, response_sequence): data_dir = '%s/var/cache/%s-%d.split-pickle' % (modroot, miniade.get_progname(), word_length) data_file = '%s/%s' % (data_dir, base64.b16encode(repr(response_sequence).encode()).decode().lower()) data_file_tmp = '%s.%d' % (data_file, os.getpid()) miniade.debug(10, 'save_data: saving data for response_sequence %s to disk ...' % (response_sequence)) if os.path.isdir(data_dir): pass else: os.makedirs(data_dir) fp = open(data_file_tmp, 'wb') pickle.dump(data, fp) fp.close() os.replace(data_file_tmp, data_file) # For a specified solution word and a specified guess, determine # what the response would be. E.g. 'cargo'+'bumph' gives 'bbbbb'. def get_response(set_word, guess_word): # Turn strings into fully-fledged lists to make replacing # arbitrary elements easier. set_word = list(set_word) guess_word = list(guess_word) # Assume response will be 'bbbbb' (in fully-fledged list form) # and we'll overwrite letters as we determine that's not the case. response = list('b' * word_length) # Scan ... for i in range(0,word_length): # if guess letter matches corresponding set word letter then green response letter and ... if guess_word[i] == set_word[i]: response[i] = 'g' # ... mark letter in guess and set word as used to prevent rematching when # we scan for yellows. guess_word[i] = None set_word[i] = None # Scan ... for i in range(0,word_length): # Ignore guess letters that are already marked 'used' (by a green match). if guess_word[i] is None: continue # Scan set word for same letter ... for j in range(0,word_length): # ... (ignoring letters in set word marked 'used' by green matches # or yellow matches in earlier iteration of this scan loop) ... if set_word[j] is None: pass # and if there is a match then yellow response letter and ... elif guess_word[i] == set_word[j]: response[i] = 'y' # ... mark letter in guess and set word as used to prevent rematching as # we continue to scan for yellows. guess_word[i] = None set_word[j] = None # Don't allow us to make comparisons of None (which is what # guess_word[i] is now set to) with later letters in the # set word. This is just an optimisation. break # Turn fully-fledged response list into a string and return it. return ''.join(response) # Entry point if __name__=="__main__": main()