#!/usr/bin/perl use warnings; use strict; my($app_svnid) = '$HeadURL$ $LastChangedRevision$'; ## no critic (RequireInterpolationOfMetachars) use lib substr(`ade-config ade_share_prefix`,0,-1) . '/include'; ## no critic (ProhibitBacktickOperators) use ADE; use lib substr(`js-config js_share_prefix`,0,-1) . '/include'; ## no critic (ProhibitBacktickOperators) use JS; # see perlipc(1) for explanation of why this is needed use POSIX ':sys_wait_h'; use experimental 'smartmatch'; my(%jsd_defined_errors) = ( 'jsd_err_misc' => { fmt => '%s' }, ); # Options my($opt_resources_str); my($opt_schedule_flag); # Other globals my(%job_rc_of_recently_reaped_pids, $js_shell); my($db_init_file) = substr(`js-config js_share_prefix`,0,-1) . '/sql/jsd.sql'; ## no critic (ProhibitBacktickOperators) my($job_id_fmtlen) = 6; my($job_rc_fmtlen) = 5; my($job_pid_fmtlen) = 7; my($job_state_fmtlen) = 9; my($job_name_fmtlen) = 10; my($resource_id_fmtlen) = 24; my($quantity_fmtlen) = 5; my($submit_timestamp_fmtlen) = 20; sub jsd ## no critic (ProhibitExcessComplexity) { my($errstack_ref) = @_; my($rc, $db_file, $pid, $dbh, $sql_statement, $i, $quit_flag, $next_runnable_job_id); my($lock_file, $server_handle, $server_name, @sql_value, %resources); # Set ADE options ADE::register_error_types(\%jsd_defined_errors); # Defaults for options # (identical line in jsd, jss, jsq). $opt_schedule_flag = 1; # Register options if (($rc=ADE::register_options($errstack_ref, 'r:s', 'resources:s,suspend', 'main::jsd_opt_handler_%s')) != $ADE::OK) { return $rc; } # Register handler functions if (($rc=ADE::set_callbacks($errstack_ref, \&jsd_usage_help, \&jsd_version, \&jsd_paths)) != $ADE::OK) { return $rc; } # Process options if (($rc=ADE::process_options($errstack_ref)) != $ADE::OK) { return $rc; } # Process arguments ADE::show_bad_usage($errstack_ref, \&app_usage, 1) if ($#ARGV+1 != 0); # Sanity checks and derivations # --resources is not optional. (Despite this testing the variable # used only a couple of lines down in the call to # read_resources_hash_from_resources_str(), it still makes sense to check this # *here* rather than there because doing it here puts a usage check # before a non-usage check (i.e. the call to JS::get_server_name()).) if (not defined $opt_resources_str) { ADE::show_bad_usage($errstack_ref, \&app_usage, 1); } if (($rc=JS::get_server_name($errstack_ref, \$server_name)) != $ADE::OK) { return $rc; } # Unpack resources string (e.g. cpus:1,memory:1024) into hash. if (($rc=read_resources_hash_from_resources_str($errstack_ref, $opt_resources_str, \%resources)) != $ADE::OK) { return $rc; } $lock_file = "/tmp/jsd-$server_name.lock"; $db_file = "/tmp/jsd-$server_name.sqlite"; # Set $js_shell if (defined $ENV{'JS_SHELL'} and not -x $ENV{'JS_SHELL'}) { ADE::error($errstack_ref, 'jsd_err_misc', "JS_SHELL: invalid shell (hint: '$ENV{'JS_SHELL'}' was not found or is not executable"); return $ADE::FAIL; } elsif (defined $ENV{'JS_SHELL'}) { $js_shell = $ENV{'JS_SHELL'}; } else { $js_shell = '/bin/sh'; } # Lock if (($rc=ADE::lock($errstack_ref, $lock_file)) != $ADE::OK) { return $rc; } # Open (new) database. if (-f $db_file) { ADE::warning($errstack_ref, 'jsd_err_misc', "$db_file: removing old database ..."); if (not unlink $db_file) { ADE::internal($errstack_ref, 'jsd: failed to remove old database'); } } ADE::register_temp_file($errstack_ref, $db_file); if (($rc=ADE::connect_sqlite($errstack_ref, $db_file, $db_init_file, \$dbh)) != $ADE::OK) { return $rc; } # Populate resources table. It would be nice to do this at directly # after unpacking the string, but this step can only be done once the # database is locked and opened. if (($rc=write_resources_hash_to_database($errstack_ref, $dbh, \%resources)) != $ADE::OK) { return $rc; } # let's be clear; this is no longer needed. undef %resources; # Initialise the UNIX socket server JS::init_server($errstack_ref, $server_name, \$server_handle); # Main loop local $SIG{'CHLD'} = \&sighandler_chld; $quit_flag = 0; while (1) { ADE::debug($errstack_ref, 10, 'jsd: top of loop'); # Top priority: release job slots occupied by completed jobs! if (($rc=process_recently_reaped_pids($errstack_ref, $dbh)) != $ADE::OK) { return $rc; } # Next priority: start as many new jobs as possible but # always going back to the top of the loop to check if any # slots are releasable. if ($opt_schedule_flag) { ADE::debug($errstack_ref, 5, 'jsd: checking for next runnable job ...'); if (($rc=get_next_runnable_job_id($errstack_ref, $dbh, \$next_runnable_job_id)) != $ADE::OK) { return $rc; } ADE::debug($errstack_ref, 5, sprintf 'jsd: next_runnable_job_id=%s', (defined $next_runnable_job_id ? $next_runnable_job_id : '(not defined)')); if (defined $next_runnable_job_id) { ADE::debug($errstack_ref, 5, "jsd: starting job $next_runnable_job_id ..."); if (($rc=start_next_runnable_job_id($errstack_ref, $dbh, $next_runnable_job_id)) != $ADE::OK) { return $rc; } next; } } # Lowest priority: deal with incoming messages. Note that # read_message_str_and_send_reply_str() may be interrupted # by a job completing. That does not result in it returning # an error. ADE::debug($errstack_ref, 5, 'jsd: waiting for a message ...'); if (($rc=JS::read_message_str_and_send_reply_str($errstack_ref, $server_handle, \&process_message, [ $dbh, \$quit_flag, \$opt_schedule_flag ])) != $ADE::OK) { return $rc; } # If the call to process_message() resulting in quit_flag # being set then break out of loop. last if ($quit_flag); } # Tidy up local $SIG{'CHLD'} = 'DEFAULT'; sleep 1; # give jsc time to read the reply we've sent to its instruction to quit. $dbh->disconnect; if (not unlink $db_file) { ADE::internal($errstack_ref, 'jsd: failed to remove old database'); } ADE::deregister_temp_file($errstack_ref, $db_file); if (($rc=ADE::unlock($errstack_ref, $lock_file)) != $ADE::OK) { return $rc; } return $ADE::OK; } sub jsd_opt_handler_r { my($errstack_ref, $resources_str) = @_; $opt_resources_str = $resources_str; return $ADE::OK; } sub read_resources_hash_from_resources_str { my($errstack_ref, $resources_str, $resources_hash_ref) = @_; my($resource_id, $resource_quantity); foreach my $resource_str (split /,/, $resources_str, -1) { ($resource_id, $resource_quantity) = split /:/, $resource_str, -1; # resource quantity cannot be defaulted. if ((not defined $resource_quantity) or ($resource_quantity !~ /^(?:0|[1-9]\d*)$/)) { # Note that this error message occurs multiple times in this script! ADE::error($errstack_ref, 'jsd_err_misc', 'invalid resource spec string (hint: should be an integer)'); return $ADE::FAIL; } ${$resources_hash_ref}{$resource_id} = $resource_quantity; } if (scalar keys %{$resources_hash_ref} == 0) { ADE::error($errstack_ref, 'jsd_err_misc', 'no resources specified'); return $ADE::FAIL; } foreach my $resource_id (keys %{$resources_hash_ref}) { # resource names are wordlike. if ($resource_id !~ /^(?:[a-z]|[a-z][a-z0-9_]*[a-z0-9])$/) { ADE::error($errstack_ref, 'jsd_err_misc', "$resource_id: invalid resource name (hint: it should be word-like)"); return $ADE::FAIL; } if ((not defined ${$resources_hash_ref}{$resource_id}) or (${$resources_hash_ref}{$resource_id} !~ /^(?:0|[1-9]\d*)$/)) { ADE::error($errstack_ref, 'jsd_err_misc', (defined ${$resources_hash_ref}{$resource_id} ? ${$resources_hash_ref}{$resource_id} : '') . ': invalid quantity (hint: must be numeric)'); return $ADE::FAIL; } } return $ADE::OK; } sub write_resources_hash_to_database { my ($errstack_ref, $dbh, $resources_hash_ref) = @_; my ($sql_statement, $resource_id, @sql_value, $rc); $sql_statement = 'INSERT OR REPLACE INTO resources (resource_id, total) VALUES (?, ?);'; foreach my $resource_id (keys %{$resources_hash_ref}) { @sql_value = ($resource_id, ${$resources_hash_ref}{$resource_id}); if (($rc=ADE::execute_sql_qm($errstack_ref, $dbh, $sql_statement, \@sql_value)) != $ADE::OK) { return $rc; } } return $ADE::OK; } sub read_resource_hash_from_database { my($errstack_ref, $dbh, $resources_hash_ref) = @_; my($sql_statement, $rc, $select_result, @select_results, $resource_id, $required_quantity); $sql_statement = "SELECT resource_id, total FROM resources;\n"; if (($rc=ADE::select_sql($errstack_ref, $dbh, \@select_results, $sql_statement)) != $ADE::OK) { return $rc; } foreach my $select_result (@select_results) { ($resource_id, $required_quantity) = @{$select_result}; ${$resources_hash_ref}{$resource_id} = $required_quantity; } return $ADE::OK; } sub cross_check_new_resources_hash_with_old_resources_hash { my($errstack_ref, $new_resources_hash_ref, $old_resources_hash_ref) = @_; if (scalar keys %{$new_resources_hash_ref} != scalar keys %{$old_resources_hash_ref}) { ADE::error($errstack_ref, 'js_err_misc', 'resource count mismatch'); return $ADE::FAIL; } foreach my $new_resource (keys %{$new_resources_hash_ref}) { if (not grep {$_ eq $new_resource } keys %{$old_resources_hash_ref}) { ADE::error($errstack_ref, 'js_err_misc', 'resource list mismatch'); return $ADE::FAIL; } } return $ADE::OK; } sub jsd_opt_handler_resources ## no critic (Subroutines::RequireArgUnpacking) { return jsd_opt_handler_r(@_); } sub jsd_opt_handler_suspend { my($errstack_ref) = @_; $opt_schedule_flag = 0; return $ADE::OK; } sub jsd_version { my($errstack_ref, $version_text_ref) = @_; return ADE::extract_version($errstack_ref, $app_svnid, $version_text_ref); } sub jsd_paths { my($errstack_ref, $pathlist_text_ref) = @_; my($rc); #${$pathlist_text_ref} = ''; return $ADE::OK; } sub jsd_usage_help { my($errstack_ref, $usage_text_short_ref, $usage_text_long_ref) = @_; ${$usage_text_short_ref} = undef; ${$usage_text_long_ref} = " --suspend start with scheduler suspended\n" . " -r ... | --resources=... set system resources\n"; return $ADE::OK; } sub process_recently_reaped_pids { my($errstack_ref, $dbh) = @_; my($rc, $job_id, $sql_statement, @sql_value); foreach my $pid (keys %job_rc_of_recently_reaped_pids) { ADE::debug($errstack_ref, 5, "process_recently_reaped_pids: processing recently reaped pid $pid ..."); # Translate exited PID to job number. if (($rc=get_job_id_by_pid($errstack_ref, $dbh, $pid, \$job_id)) != $ADE::OK) { return $rc; } $sql_statement = 'BEGIN EXCLUSIVE TRANSACTION;'; if (($rc=ADE::execute_sql($errstack_ref, $dbh, $sql_statement)) != $ADE::OK) { return $rc; } # Release the running demands ADE::debug($errstack_ref, 10, "process_recently_reaped_pids: $job_id: deleting running resources allocations ..."); $sql_statement = "DELETE FROM in_use_resource_instances\n" . "WHERE JOB_ID = ?;\n"; @sql_value = ( $job_id ); if (($rc=ADE::execute_sql_qm($errstack_ref, $dbh, $sql_statement, \@sql_value)) != $ADE::OK) { return $rc; } # Finally change the state of the job. ADE::debug($errstack_ref, 10, "process_recently_reaped_pids: $job_id: changing job state to completed ..."); if (($rc=set_job_state_and_job_rc_by_job_id($errstack_ref, $dbh, $job_id, 'completed', 0, $job_rc_of_recently_reaped_pids{$pid})) != $ADE::OK) { return $rc; } # And if we managed to do all of that then we can commit the transaction. ADE::debug($errstack_ref, 10, "process_recently_reaped_pids: $job_id: committing transaction ..."); $sql_statement = "END TRANSACTION;\n"; if (($rc=ADE::execute_sql($errstack_ref, $dbh, $sql_statement)) != $ADE::OK) { return $rc; } ADE::debug($errstack_ref, 10, "process_recently_reaped_pids: $job_id: marking job as completed ..."); delete $job_rc_of_recently_reaped_pids{$pid}; } return $ADE::OK; } sub start_next_runnable_job_id { my($errstack_ref, $dbh, $job_id) = @_; my($pid, $user, $rc, $i, @select_results, $sql_statement, $resource_id); my(@new_envs, @sql_value); my($command, $var, $requirement, $instance_ids_concat, @instance_ids, $instance_ids_range); # Get the job's command and user. $sql_statement = "SELECT command, [user]\n" . "FROM jobs WHERE job_id = ?;\n"; @sql_value = ( $job_id ); if (($rc=ADE::select_sql_qm($errstack_ref, $dbh, \@select_results, $sql_statement, \@sql_value)) != $ADE::OK) { return $rc; } ($command, $user) = @{$select_results[0]}; ADE::debug($errstack_ref, 10, "start_next_runnable_job_id: job_id=$job_id, command=$command, user=$user"); @new_envs = (); # Set JS__LIMIT. $sql_statement = "SELECT resource_id, requirement\n" . "FROM job_resource_requirements\n" . "WHERE job_id = ?;"; @sql_value = ( $job_id ); if (($rc=ADE::select_sql_qm($errstack_ref, $dbh, \@select_results, $sql_statement, \@sql_value)) != $ADE::OK) { return $rc; } foreach my $select_result (@select_results) { ($resource_id, $requirement) = @{$select_result}; ADE::debug($errstack_ref, 10, "start_next_runnable_job_id: for setting JS_*_LIMIT: resource_id=$resource_id, requirement=$requirement"); $var = sprintf 'JS_%s_LIMIT', uc $resource_id; push @new_envs, { var=>$var, val=>$requirement }; } # Set JS__AFFINITY. $sql_statement = "SELECT resource_id, GROUP_CONCAT(instance_id,',') AS instance_ids\n" . # Sub-select ensures ordered instance_ids passed to GROUP_CONCAT() above. "FROM (SELECT resource_id, instance_id FROM next_runnable_job_resource_instances ORDER BY resource_id, instance_id)\n" . "GROUP BY resource_id\n" . "ORDER BY resource_id;"; if (($rc=ADE::select_sql($errstack_ref, $dbh, \@select_results, $sql_statement)) != $ADE::OK) { return $rc; } foreach my $select_result (@select_results) { ($resource_id, $instance_ids_concat) = @{$select_result}; ADE::debug($errstack_ref, 10, "start_next_runnable_job_id: for setting JS_*_AFFINITY: resource_id=$resource_id, instance_ids=$instance_ids_concat"); $var = sprintf 'JS_%s_AFFINITY', uc $resource_id; #ADE::debug($errstack_ref, 10, "getting range ... "); # Rewrite $instance_ids to make use of ranges. This block is not # needed, but makes JS__AFFINITY more readable. cpuset(7) # man page explains that lists can be 1,2,3,11,12,13 or 1-3,11-13. @instance_ids = split /,/, $instance_ids_concat; # We construct a list of lists, where each list contains *adjacent* # resource ids (e.g. 0,1,2 or 4,5,6 but not 1,4,5, which would be # split into two lists 1 and 4,5). As each list is completed then # it gets added to the list of lists. However, calling: # # push @lol, \@l; # @l = (); # # does not result in @lol containing @l's items as they were prior # to zeroing @l. This is because '@l = ()' does not create a new # list and leave the old one for @lol to refer to! So we need to # use an anonymous list reference, which *does* get newly # instantiated each time we reset it. Note that arrays are assigned # as '@whatever = ()' whereas anonymous array references are # assigned as '$whatever = []'. my @lol = (); my $l_ref = []; foreach my $instance_id (@instance_ids) { # If both the list and the list of lists are empty, then # simply put the instance id on the list (and in the process # instantiate the list. if ($#lol+1 == 0 and $#{$l_ref}+1 == 0) { $l_ref = [ $instance_id ]; # At this point we are certain that @{$l_ref} has items # in it. That is not implied by the condition in the # previous test alone but it is nonetheless true because # after the first loop there is *always* something in @{$l_ref}. # So we can safely check what the last item in that list # is. And if it is one less than the current instance id, # then the range continues. } elsif ($instance_id == ${$l_ref}[$#{$l_ref}]+1) { push @{$l_ref}, $instance_id; # Finally that leaves us with the new instance id not # being directly adjacent to the old one, necessitating # making a new list. } else { push @lol, $l_ref; $l_ref = [ $instance_id ]; } } # When we've done the very last instance id, there will be # stuff in the list (unless there were no resource ids at all!) # that needs to be flushed out to the list of lists. if ($#{$l_ref}+1 > 0) { push @lol, $l_ref; $l_ref = []; } # At this point, accepting that one is a list and the other # is a list reference, we have something like the following: # # @lol = ( # (0,1), # (3,4,5,6,7), # (10), # (19) # ) # # So now we just need to construct ranges for those individual # lists that contain more than one item and then join all together # with commas. $instance_ids_range = ''; foreach my $l_ref (@lol) { if ($instance_ids_range ne '') { $instance_ids_range .= ','; } if ($#{$l_ref}+1 == 1) { $instance_ids_range .= ${$l_ref}[0]; } else { $instance_ids_range .= ${$l_ref}[0] . '-' . ${$l_ref}[$#{$l_ref}]; } } push @new_envs, { var=>$var, val=>$instance_ids_range }; } # Additional environment variables. push @new_envs, { var=>'JS_JOB_ID', val=>sprintf "%0${job_id_fmtlen}d", $job_id }; push @new_envs, { var=>'JS_USER', val=>$user }; # Start the job ADE::debug($errstack_ref, 10, "start_next_runnable_job_id: $job_id: forking ..."); if (($pid=fork) < 0) { ADE::error($errstack_ref, 'jsd_err_misc', 'fork: failed'); return $ADE::FAIL; } elsif ($pid == 0) { ADE::debug($errstack_ref, 10, "start_next_runnable_job_id: $job_id: child setting up environment and execing command ..."); foreach my $new_env_ref (@new_envs) { # we can't localise setting $ENV{...} because we need its effects in # the scope of this 'elsif ... ' stanza, not within this inner 'foreach' # stanza. $ENV{$new_env_ref->{'var'}} = $new_env_ref->{'val'}; ## no critic (RequireLocalizedPunctuationVars) } if (!exec $js_shell, '-c', $command) { ADE::error($errstack_ref, 'jsd_err_misc', 'failed to exec'); return $ADE::FAIL; } } ADE::debug($errstack_ref, 5, "start_next_runnable_job_id: forked pid $pid"); # Take the resources $sql_statement = "INSERT INTO in_use_resource_instances\n" . "SELECT resource_id, instance_id, job_id\n" . "FROM next_runnable_job_resource_instances;"; if (($rc=ADE::execute_sql($errstack_ref, $dbh, $sql_statement)) != $ADE::OK) { return $rc; } # Finally change the state of the job. if (($rc=set_job_state_and_job_rc_by_job_id($errstack_ref, $dbh, $job_id, 'running', $pid, 0)) != $ADE::OK) { return $rc; } return $ADE::OK; } sub sighandler_chld ## no critic (RequireFinalReturn) { my($sig) = @_; my($pid, @errstack_local, $rc, $job_id); @errstack_local = (); # Reap and note exit codes. The main loop will update the database table with this info. while (($pid = waitpid -1, WNOHANG) > 0) { $job_rc_of_recently_reaped_pids{$pid} = $?; ADE::debug(\@errstack_local, 5, "======> sighandler_chld: noting $pid exited with $job_rc_of_recently_reaped_pids{$pid} ..."); } } sub set_job_state_and_job_rc_by_job_id ## no critic (ProhibitManyArgs) { my($errstack_ref, $dbh, $job_id, $job_state, $pid, $job_rc) = @_; my($rc, $sql_statement, @sql_value); $sql_statement = "UPDATE jobs\n" . "SET job_state=?, pid=?, job_rc=? WHERE job_id == ?;\n"; @sql_value = ( $job_state, $pid, $job_rc, $job_id ); if (($rc=ADE::execute_sql_qm($errstack_ref, $dbh, $sql_statement, \@sql_value)) != $ADE::OK) { return $rc; } return $ADE::OK; } sub get_job_id_by_pid { my($errstack_ref, $dbh, $pid, $job_id_ref) = @_; my($rc, @select_results, $sql_statement, @sql_value); $sql_statement = "SELECT job_id\n" . "FROM jobs WHERE pid == ?;\n"; @sql_value = ( $pid ); if (($rc=ADE::select_sql_qm($errstack_ref, $dbh, \@select_results, $sql_statement, \@sql_value)) != $ADE::OK) { return $rc; } # Extract the first column out of each row, assemble an array out of that # and write it where to caller told us to write it. ${$job_id_ref} = ${$select_results[0]}[0]; return $ADE::OK; } sub get_job_state_by_job_id { my($errstack_ref, $dbh, $job_id, $job_state_ref) = @_; my($rc, @select_results, $sql_statement, @sql_value); $sql_statement = "SELECT job_state\n" . "FROM jobs WHERE job_id == ?;\n"; @sql_value = ( $job_id ); if (($rc=ADE::select_sql_qm($errstack_ref, $dbh, \@select_results, $sql_statement, \@sql_value)) != $ADE::OK) { return $rc; } # Extract the first column out of each row, assemble an array out of that # and write it where to caller told us to write it. ${$job_state_ref} = ${$select_results[0]}[0]; return $ADE::OK; } sub get_current_resource_value_by_name { my($errstack_ref, $dbh, $resource_id, $resource_value_ref) = @_; my(@select_results, $rc, $sql_statement, @sql_value); $sql_statement = "SELECT current\n" . "FROM resources\n" . "WHERE resource_id == ?;\n"; @sql_value = ($resource_id); if (($rc=ADE::select_sql_qm($errstack_ref, $dbh, \@select_results, $sql_statement, \@sql_value)) != $ADE::OK) { return $rc; } ${$resource_value_ref} = ${$select_results[0]}[0]; return $ADE::OK; } sub get_demanded_resource_value_by_job_id { my($errstack_ref, $dbh, $resource_id, $job_id, $demanded_resource_value_ref) = @_; my(@select_results, $rc, $sql_statement, @sql_value); $sql_statement = "SELECT $resource_id\n" . "FROM jobs\n" . "WHERE job_id == ?;\n"; @sql_value = ( $job_id ); if (($rc=ADE::select_sql_qm($errstack_ref, $dbh, \@select_results, $sql_statement, \@sql_value)) != $ADE::OK) { return $rc; } ${$demanded_resource_value_ref} = ${$select_results[0]}[0]; return $ADE::OK; } sub get_job_ids { my($errstack_ref, $dbh, $job_state, $job_ids_arrayref) = @_; my($rc, @select_results, $sql_statement, @sql_value); if ($job_state eq 'pending' or $job_state eq 'running' or $job_state eq 'completed' or $job_state eq 'cancelled') { $sql_statement = "SELECT job_id\n" . "FROM jobs\n" . "WHERE job_state == ?;\n"; @sql_value = ( $job_state ); if (($rc=ADE::select_sql_qm($errstack_ref, $dbh, \@select_results, $sql_statement, \@sql_value)) != $ADE::OK) { return $rc; } # Extract the first column out of each row, assemble an array out of that # and write it where to caller told us to write it. @{$job_ids_arrayref} = map { ${$_}[0] } @select_results; } else { ADE::internal($errstack_ref, "get_job_ids: $job_state: invalid job_state"); } return $ADE::OK; } sub get_next_runnable_job_id { my($errstack_ref, $dbh, $next_runnable_job_id_ref) = @_; my($rc, @select_results, $sql_statement, @sql_value); $sql_statement = "SELECT job_id\n" . "FROM next_runnable_job;\n"; if (($rc=ADE::select_sql($errstack_ref, $dbh, \@select_results, $sql_statement)) != $ADE::OK) { return $rc; } ${$next_runnable_job_id_ref} = ($#select_results+1 == 0) ? undef : ${$select_results[0]}[0]; return $ADE::OK; } sub process_message { my($errstack_ref, $callback_args_ref, $message_hash_ref, $reply_hash_ref) = @_; my($dbh, $quit_flag_ref, $schedule_flag_ref, $rc, $show_queue_output); my($job_state, $sql_statement, @select_results, @sql_value, @show_job_states, $pid, %command_handlers); %command_handlers = ( undef => { 'handler' => \&command_handler_undef, 'reqpars' => [ ] }, 'quit' => { 'handler' => \&command_handler_quit, 'reqpars' => [ ] }, 'suspend' => { 'handler' => \&command_handler_suspend, 'reqpars' => [ ] }, 'resume' => { 'handler' => \&command_handler_resume, 'reqpars' => [ ] }, 'ping' => { 'handler' => \&command_handler_ping, 'reqpars' => [ ] }, 'show-queue' => { 'handler' => \&command_handler_show_queue, 'reqpars' => [ 'show-job-states', 'verboselevel' ] }, 'submit-job' => { 'handler' => \&command_handler_submit_job, 'reqpars' => [ 'job-command', 'resource-list', 'job-name' ] }, 'cancel-job' => { 'handler' => \&command_handler_cancel_job, 'reqpars' => [ 'job-id' ] }, 'chprio-job' => { 'handler' => \&command_handler_chprio_job, 'reqpars' => [ 'job-id', 'priority' ] }, 'chres' => { 'handler' => \&command_handler_chres, 'reqpars' => [ 'resource-list' ] }, ); # Initialise client error stack. ${$reply_hash_ref}{'errstack-ref'} = []; # Note use of 'scalar ~~ [ not_a_list_but_something_that_returns_a_list ]'. The [...] force # keys to see array context. See https://stackoverflow.com/questions/18243584/. ADE::debug($errstack_ref, 10, 'process_message: checking command is known ...'); if (!(${$message_hash_ref}{'command'} ~~ [keys %command_handlers])) { ADE::error(${$reply_hash_ref}{'errstack-ref'}, 'js_err_misc', "${$message_hash_ref}{'command'}: command unknown on server"); ${$reply_hash_ref}{'rc'} = $ADE::FAIL; return $ADE::OK; } ADE::debug($errstack_ref, 10, 'process_message: checking required parameters are in message ...'); foreach my $reqpar (@{$command_handlers{${$message_hash_ref}{'command'}}{'reqpars'}}) { if (not defined ${$message_hash_ref}{$reqpar}) { ADE::error(${$reply_hash_ref}{'errstack-ref'}, 'js_err_misc', "${$message_hash_ref}{'command'} command has no $reqpar parameter set"); ${$reply_hash_ref}{'rc'} = $ADE::FAIL; return $ADE::OK; } } ADE::debug($errstack_ref, 10, 'process_message: calling handler ...'); if (($rc=$command_handlers{${$message_hash_ref}{'command'}}{'handler'}($errstack_ref, $callback_args_ref, $message_hash_ref, $reply_hash_ref)) != $ADE::OK) { return $rc; } return $ADE::OK; } sub command_handler_undef { my($errstack_ref, $callback_args_ref, $message_hash_ref, $reply_hash_ref) = @_; # Guts # Prepare reply to client ADE::error(${$reply_hash_ref}{'errstack-ref'}, 'js_err_misc', 'message has no command'); ${$reply_hash_ref}{'rc'} = $ADE::OK; return $ADE::OK; } sub command_handler_quit { my($errstack_ref, $callback_args_ref, $message_hash_ref, $reply_hash_ref) = @_; my($dbh, $quit_flag_ref, $schedule_flag_ref); # Unpack callback argument list. ($dbh, $quit_flag_ref, $schedule_flag_ref) = @{$callback_args_ref}; # Guts ${$quit_flag_ref} = 1; # Prepare reply to client ${$reply_hash_ref}{'rc'} = $ADE::OK; return $ADE::OK; } sub command_handler_suspend { my($errstack_ref, $callback_args_ref, $message_hash_ref, $reply_hash_ref) = @_; my($dbh, $quit_flag_ref, $schedule_flag_ref); # Unpack callback argument list. ($dbh, $quit_flag_ref, $schedule_flag_ref) = @{$callback_args_ref}; # Guts ${$schedule_flag_ref} = 0; # Prepare reply to client ${$reply_hash_ref}{'rc'} = $ADE::OK; return $ADE::OK; } sub command_handler_resume { my($errstack_ref, $callback_args_ref, $message_hash_ref, $reply_hash_ref) = @_; my($dbh, $quit_flag_ref, $schedule_flag_ref); # Unpack callback argument list. ($dbh, $quit_flag_ref, $schedule_flag_ref) = @{$callback_args_ref}; # Guts ${$schedule_flag_ref} = 1; # Prepare reply to client ${$reply_hash_ref}{'rc'} = $ADE::OK; return $ADE::OK; } sub command_handler_ping { my($errstack_ref, $callback_args_ref, $message_hash_ref, $reply_hash_ref) = @_; # Guts ${$reply_hash_ref}{'stdout'} = "pong\n"; # Prepare reply to client ${$reply_hash_ref}{'rc'} = $ADE::OK; return $ADE::OK; } sub command_handler_show_queue { my($errstack_ref, $callback_args_ref, $message_hash_ref, $reply_hash_ref) = @_; my($dbh, $quit_flag_ref, $schedule_flag_ref, @show_job_states, $rc, $show_queue_output); my($sql_statement, $pending_jobs_count, $running_jobs_count, $cancelled_jobs_count, $completed_jobs_count); my(@select_results, $job_id, $job_name, $submit_timestamp, $job_state, $pid, $job_rc, $command, @sql_value); my($client_side_verboselevel); # Unpack callback argument list. ($dbh, $quit_flag_ref, $schedule_flag_ref) = @{$callback_args_ref}; # Guts @show_job_states = split /,/, ${$message_hash_ref}{'show-job-states'}, -1; $client_side_verboselevel = ${$message_hash_ref}{'verboselevel'}; ADE::debug($errstack_ref, 10, "command_handler_show_queue: client_side_verboselevel=$client_side_verboselevel"); # Initialise output string; after this we append. ${$reply_hash_ref}{'stdout'} = ''; $sql_statement = "SELECT count(*)\n" . "FROM jobs\n" . "WHERE job_state == ?\n"; @sql_value = ('pending'); if (($rc=ADE::select_sql_qm($errstack_ref, $dbh, \@select_results, $sql_statement, \@sql_value)) != $ADE::OK) { return $rc; } $pending_jobs_count = ${$select_results[0]}[0]; @sql_value = ('running'); if (($rc=ADE::select_sql_qm($errstack_ref, $dbh, \@select_results, $sql_statement, \@sql_value)) != $ADE::OK) { return $rc; } $running_jobs_count = ${$select_results[0]}[0]; @sql_value = ('completed'); if (($rc=ADE::select_sql_qm($errstack_ref, $dbh, \@select_results, $sql_statement, \@sql_value)) != $ADE::OK) { return $rc; } $completed_jobs_count = ${$select_results[0]}[0]; @sql_value = ('cancelled'); if (($rc=ADE::select_sql_qm($errstack_ref, $dbh, \@select_results, $sql_statement, \@sql_value)) != $ADE::OK) { return $rc; } $cancelled_jobs_count = ${$select_results[0]}[0]; ${$reply_hash_ref}{'stdout'} .= "jobs ($running_jobs_count running, $pending_jobs_count pending, $completed_jobs_count completed, $cancelled_jobs_count cancelled)\n----\n\n"; if ($client_side_verboselevel <= 2) { ${$reply_hash_ref}{'stdout'} .= sprintf "%-${job_id_fmtlen}.${job_id_fmtlen}s %-${job_name_fmtlen}.${job_name_fmtlen}s %-${job_state_fmtlen}.${job_state_fmtlen}s %${job_rc_fmtlen}.${job_rc_fmtlen}s\n%s\n", 'id','name', 'state', 'rc', '-'x78; } else { ${$reply_hash_ref}{'stdout'} .= sprintf "%-${job_id_fmtlen}.${job_id_fmtlen}s %-${job_name_fmtlen}.${job_name_fmtlen}s %${job_state_fmtlen}.${job_state_fmtlen}s %${submit_timestamp_fmtlen}.${submit_timestamp_fmtlen}s %${job_pid_fmtlen}.${job_pid_fmtlen}s %${job_rc_fmtlen}.${job_rc_fmtlen}s %s\n%s\n", 'id','name', 'state','submit-time', 'pid','rc','command','-'x78; } $sql_statement = "SELECT job_id,\n" . " job_name,\n" . " datetime(submit_timestamp, 'unixepoch', 'localtime'),\n" . " job_state,\n" . " pid,\n" . " job_rc,\n" . " command\n" . "FROM jobs\n" . "ORDER BY job_id;\n"; if (($rc=ADE::select_sql($errstack_ref, $dbh, \@select_results, $sql_statement)) != $ADE::OK) { return $rc; } foreach my $select_result (@select_results) { ($job_id, $job_name, $submit_timestamp, $job_state, $pid, $job_rc, $command) = @{$select_result}; next if not $job_state ~~ @show_job_states; if ($client_side_verboselevel <= 2) { ${$reply_hash_ref}{'stdout'} .= sprintf "%0${job_id_fmtlen}d %-${job_name_fmtlen}.${job_name_fmtlen}s %-${job_state_fmtlen}.${job_state_fmtlen}s %${job_rc_fmtlen}d\n", $job_id, $job_name, $job_state, $job_rc; } else { ${$reply_hash_ref}{'stdout'} .= sprintf "%0${job_id_fmtlen}d %-${job_name_fmtlen}.${job_name_fmtlen}s %${job_state_fmtlen}s %${submit_timestamp_fmtlen}.${submit_timestamp_fmtlen}s %${job_pid_fmtlen}d %${job_rc_fmtlen}d %s\n", $job_id, $job_name, $job_state, $submit_timestamp, $pid, $job_rc, $command; } } ${$reply_hash_ref}{'stdout'} .= "\n"; ${$reply_hash_ref}{'stdout'} .= "resources\n---------\n\n"; ${$reply_hash_ref}{'stdout'} .= sprintf "%-${resource_id_fmtlen}.${resource_id_fmtlen}s %${quantity_fmtlen}.${quantity_fmtlen}s %${quantity_fmtlen}.${quantity_fmtlen}s\n%s\n",'id','max','curr','-'x78; $sql_statement = "SELECT resources.resource_id,resources.total,in_use_resource_quantities.total\n" . "FROM resources, in_use_resource_quantities\n" . "WHERE resources.resource_id=in_use_resource_quantities.resource_id\n" . "ORDER BY resources.resource_id;\n"; if (($rc=ADE::select_sql($errstack_ref, $dbh, \@select_results, $sql_statement)) != $ADE::OK) { return $rc; } foreach my $select_result (@select_results) { ${$reply_hash_ref}{'stdout'} .= sprintf "%-${resource_id_fmtlen}s %${quantity_fmtlen}d %${quantity_fmtlen}d\n", @{$select_result}; } ${$reply_hash_ref}{'stdout'} .= "\n"; # Prepare reply to client ${$reply_hash_ref}{'rc'} = $ADE::OK; return $ADE::OK; } sub command_handler_submit_job { my($errstack_ref, $callback_args_ref, $message_hash_ref, $reply_hash_ref) = @_; my($command, $rc, $sql_statement, @select_results, $job_id, $job_name, $user, $submit_timestamp); my($dbh, $quit_flag_ref, $schedule_flag_ref, %required_resources); my(@sql_value, %resources_in_database); # Unpack callback argument list. ($dbh, $quit_flag_ref, $schedule_flag_ref) = @{$callback_args_ref}; # Guts # Chop up the passed resources string. if (($rc=read_resources_hash_from_resources_str($errstack_ref, ${$message_hash_ref}{'resource-list'}, \%required_resources)) != $ADE::OK) { ADE::error(${$reply_hash_ref}{'errstack-ref'}, 'js_err_misc', '?1'); ${$reply_hash_ref}{'rc'} = $rc; return $ADE::OK; } # Load the real resources string from the database. if (($rc=read_resource_hash_from_database($errstack_ref, $dbh, \%resources_in_database)) != $ADE::OK) { ADE::error(${$reply_hash_ref}{'errstack-ref'}, 'js_err_misc', '?2'); ${$reply_hash_ref}{'rc'} = $rc; return $ADE::OK; } # Check that all real resources' names a specified in the # new resources string (quantities are allowed to differ, # but not the list of names). if (($rc=cross_check_new_resources_hash_with_old_resources_hash(${$reply_hash_ref}{'errstack-ref'}, \%required_resources, \%resources_in_database)) != $ADE::OK) { # We passed the client stack to cross_check_new_resources_hash_with_old_resources_hash(), so # it put a message on the stack and we don't need to again here. ${$reply_hash_ref}{'rc'} = $rc; return $ADE::OK; } # Add the job to the queue. $sql_statement = "BEGIN TRANSACTION;\n"; if (($rc=ADE::execute_sql($errstack_ref, $dbh, $sql_statement)) != $ADE::OK) { return $rc; } $sql_statement = 'INSERT INTO jobs (command, [user], submit_timestamp, job_name) VALUES(?, ?, ?, ?);'; $submit_timestamp = time; $user = getpwuid $<; @sql_value = ( ${$message_hash_ref}{'job-command'}, $user, $submit_timestamp, ${$message_hash_ref}{'job-name'}); if (($rc=ADE::execute_sql_qm($errstack_ref, $dbh, $sql_statement, \@sql_value)) != $ADE::OK) { $sql_statement = "ROLLBACK TRANSACTION;\n"; ADE::execute_sql($errstack_ref, $dbh, $sql_statement); return $rc; } $job_id = $dbh->last_insert_id('','','',''); ADE::debug($errstack_ref, 10, "submit_job: job_id=$job_id"); foreach my $resource_id (keys %required_resources) { ADE::debug($errstack_ref, 10, "submit_job: resource_id=$resource_id, quantity=$required_resources{$resource_id}"); $sql_statement = 'INSERT INTO job_resource_requirements VALUES (?, ?, ?);'; @sql_value = ( $job_id, $resource_id, $required_resources{$resource_id} ); if (($rc=ADE::execute_sql_qm($errstack_ref, $dbh, $sql_statement, \@sql_value)) != $ADE::OK) { $sql_statement = 'ROLLBACK TRANSACTION;'; ADE::execute_sql($errstack_ref, $dbh, $sql_statement); return $rc; } } $sql_statement = "END TRANSACTION;\n"; if (($rc=ADE::execute_sql($errstack_ref, $dbh, $sql_statement)) != $ADE::OK) { return $rc; } # Prepare reply to client ${$reply_hash_ref}{'rc'} = $ADE::OK; ${$reply_hash_ref}{'stdout'} = sprintf "%0${job_id_fmtlen}d: job submitted\n", $job_id; return $ADE::OK; } sub command_handler_chres { my($errstack_ref, $callback_args_ref, $message_hash_ref, $reply_hash_ref) = @_; my($rc, $sql_statement, %old_resources, %new_resources, $dbh, $quit_flag_ref, $schedule_flag_ref); # Unpack callback argument list. ($dbh, $quit_flag_ref, $schedule_flag_ref) = @{$callback_args_ref}; # Guts # Chop up the passed resources string. if (($rc=read_resources_hash_from_resources_str($errstack_ref, ${$message_hash_ref}{'resource-list'}, \%new_resources)) != $ADE::OK) { ADE::error(${$reply_hash_ref}{'errstack-ref'}, 'js_err_misc', '?11'); ${$reply_hash_ref}{'rc'} = $rc; return $ADE::OK; } # Load the real resources hash from the database. if (($rc=read_resource_hash_from_database($errstack_ref, $dbh, \%old_resources)) != $ADE::OK) { ADE::error(${$reply_hash_ref}{'errstack-ref'}, 'js_err_misc', '?21'); ${$reply_hash_ref}{'rc'} = $rc; return $ADE::OK; } # Check that all old resources' names a specified in the # new resources string (quantities are allowed to differ, # but not the list of names). if (($rc=cross_check_new_resources_hash_with_old_resources_hash(${$reply_hash_ref}{'errstack-ref'}, \%old_resources, \%new_resources)) != $ADE::OK) { # We passed the client stack to cross_check_new_resources_hash_with_old_resources_hash(), so # it put a message on the stack and we don't need to again here. ${$reply_hash_ref}{'rc'} = $rc; return $ADE::OK; } # Write the new resource quantities to the database. $sql_statement = "BEGIN TRANSACTION;\n"; if (($rc=ADE::execute_sql($errstack_ref, $dbh, $sql_statement)) != $ADE::OK) { return $rc; } if (($rc=write_resources_hash_to_database($errstack_ref, $dbh, \%new_resources)) != $ADE::OK) { $sql_statement = "ROLLBACK TRANSACTION;\n"; return $rc; } $sql_statement = "END TRANSACTION;\n"; if (($rc=ADE::execute_sql($errstack_ref, $dbh, $sql_statement)) != $ADE::OK) { return $rc; } # Prepare reply to client ${$reply_hash_ref}{'rc'} = $ADE::OK; #${$reply_hash_ref}{'stdout'} = sprintf "%0${job_id_fmtlen}d: job submitted\n", $job_id; return $ADE::OK; } sub command_handler_cancel_job { my($errstack_ref, $callback_args_ref, $message_hash_ref, $reply_hash_ref) = @_; my($dbh, $quit_flag_ref, $schedule_flag_ref, $rc, $sql_statement, $job_state, $pid, @select_results, @sql_value); # Unpack callback argument list. ($dbh, $quit_flag_ref, $schedule_flag_ref) = @{$callback_args_ref}; # Guts # Is job running? ADE::debug($errstack_ref, 10, "command_handler_cancel_job: checking status of job ${$message_hash_ref}{'job-id'} ..."); $sql_statement = "SELECT job_state, pid\n" . "FROM jobs\n" . "WHERE job_id == ?;\n"; # No need to strip leading zeros; SQL can handle it. @sql_value = ( ${$message_hash_ref}{'job-id'} ); if (($rc=ADE::select_sql_qm($errstack_ref, $dbh, \@select_results, $sql_statement, \@sql_value)) != $ADE::OK) { return $rc; } if ($#select_results+1 == 0) { ADE::error(${$reply_hash_ref}{'errstack-ref'}, 'js_err_misc', "${$message_hash_ref}{'job-id'}: no such job"); ${$reply_hash_ref}{'rc'} = $ADE::FAIL; return $ADE::OK; } ( $job_state, $pid ) = @{$select_results[0]}; ADE::debug($errstack_ref, 10, "command_handler_cancel_job: job_state=$job_state, pid=$pid"); if ($job_state eq 'running') { if (kill 'TERM', $pid) { ${$reply_hash_ref}{'info'} = "${$message_hash_ref}{'job-id'}: job killed"; ${$reply_hash_ref}{'rc'} = $ADE::OK; } else { ADE::error(${$reply_hash_ref}{'errstack-ref'}, 'js_err_misc', "${$message_hash_ref}{'job-id'}: failed to kill job (hint: permissions issue?)"); ${$reply_hash_ref}{'rc'} = $ADE::FAIL; } } elsif ($job_state eq 'completed') { ADE::error(${$reply_hash_ref}{'errstack-ref'}, 'js_err_misc', "${$message_hash_ref}{'job-id'}: cannot be cancelled because already completed"); ${$reply_hash_ref}{'rc'} = $ADE::FAIL; } elsif ($job_state eq 'pending') { # THIS SHOULD BE DONE IN A TRANSACTION! $sql_statement = "DELETE FROM job_resource_requirements\n" . "WHERE job_id = ?;\n"; @sql_value = ( ${$message_hash_ref}{'job-id'} ); if (($rc=ADE::execute_sql_qm($errstack_ref, $dbh, $sql_statement, \@sql_value)) != $ADE::OK) { return $rc; } $sql_statement = "UPDATE jobs\n" . "SET job_state = ?\n" . "WHERE job_id == ?;\n"; @sql_value = ( 'cancelled', ${$message_hash_ref}{'job-id'} ); if (($rc=ADE::execute_sql_qm($errstack_ref, $dbh, $sql_statement, \@sql_value)) != $ADE::OK) { return $rc; } # THAT SHOULD HAVE BEEN DONE IN A TRANSACTION! ${$reply_hash_ref}{'info'} = "${$message_hash_ref}{'job-id'}: cancelled"; ${$reply_hash_ref}{'rc'} = $ADE::OK; } return $ADE::OK; } sub command_handler_chprio_job { my($errstack_ref, $callback_args_ref, $message_hash_ref, $reply_hash_ref) = @_; my($dbh, $quit_flag_ref, $schedule_flag_ref, $rc, $sql_statement, $job_state, $pid, @select_results, @sql_value); # Unpack callback argument list. ($dbh, $quit_flag_ref, $schedule_flag_ref) = @{$callback_args_ref}; # Guts # Is job running? ADE::debug($errstack_ref, 10, "command_handler_cancel_job: checking status of job ${$message_hash_ref}{'job-id'} ..."); $sql_statement = "SELECT job_state, pid\n" . "FROM jobs\n" . "WHERE job_id == ?;\n"; @sql_value = ( ${$message_hash_ref}{'job-id'} ); if (($rc=ADE::select_sql_qm($errstack_ref, $dbh, \@select_results, $sql_statement, \@sql_value)) != $ADE::OK) { return $rc; } if ($#select_results+1 == 0) { ADE::error(${$reply_hash_ref}{'errstack-ref'}, 'js_err_misc', "${$message_hash_ref}{'job-id'}: no such job"); ${$reply_hash_ref}{'rc'} = $ADE::FAIL; return $ADE::OK; } ( $job_state, $pid ) = @{$select_results[0]}; ADE::debug($errstack_ref, 10, "command_handler_cancel_job: job_state=$job_state, pid=$pid"); if ($job_state eq 'running') { ADE::error(${$reply_hash_ref}{'errstack-ref'}, 'js_err_misc', "${$message_hash_ref}{'job-id'}: cannot change priority because already running"); ${$reply_hash_ref}{'rc'} = $ADE::FAIL; return $ADE::OK; } elsif ($job_state eq 'completed') { ADE::error(${$reply_hash_ref}{'errstack-ref'}, 'js_err_misc', "${$message_hash_ref}{'job-id'}: cannot change priority because already completed"); ${$reply_hash_ref}{'rc'} = $ADE::FAIL; } elsif ($job_state eq 'pending') { $sql_statement = "UPDATE jobs\n" . "SET priority = ?\n" . "WHERE job_id == ?;\n"; @sql_value = ( ${$message_hash_ref}{'priority'}, ${$message_hash_ref}{'job-id'} ); if (($rc=ADE::execute_sql_qm($errstack_ref, $dbh, $sql_statement, \@sql_value)) != $ADE::OK) { return $rc; } ${$reply_hash_ref}{'info'} = "${$message_hash_ref}{'job-id'}: priority changed"; ${$reply_hash_ref}{'rc'} = $ADE::OK; } return $ADE::OK; } ADE::main(\&jsd);