mirror of
https://github.com/git/git.git
synced 2026-03-05 14:59:04 +01:00
Merge branch 'ar/run-command-hook' into next
Use hook API to replace ad-hoc invocation of hook scripts with the run_command() API. * ar/run-command-hook: receive-pack: convert receive hooks to hook API receive-pack: convert update hooks to new API hooks: allow callers to capture output run-command: allow capturing of collated output hook: allow overriding the ungroup option reference-transaction: use hook API instead of run-command transport: convert pre-push to hook API hook: convert 'post-rewrite' hook in sequencer.c to hook API hook: provide stdin via callback run-command: add stdin callback for parallelization run-command: add first helper for pp child states
This commit is contained in:
142
run-command.c
142
run-command.c
@@ -1478,15 +1478,32 @@ enum child_state {
|
||||
GIT_CP_WAIT_CLEANUP,
|
||||
};
|
||||
|
||||
struct parallel_child {
|
||||
enum child_state state;
|
||||
struct child_process process;
|
||||
struct strbuf err;
|
||||
void *data;
|
||||
};
|
||||
|
||||
static int child_is_working(const struct parallel_child *pp_child)
|
||||
{
|
||||
return pp_child->state == GIT_CP_WORKING;
|
||||
}
|
||||
|
||||
static int child_is_ready_for_cleanup(const struct parallel_child *pp_child)
|
||||
{
|
||||
return child_is_working(pp_child) && !pp_child->process.in;
|
||||
}
|
||||
|
||||
static int child_is_receiving_input(const struct parallel_child *pp_child)
|
||||
{
|
||||
return child_is_working(pp_child) && pp_child->process.in > 0;
|
||||
}
|
||||
|
||||
struct parallel_processes {
|
||||
size_t nr_processes;
|
||||
|
||||
struct {
|
||||
enum child_state state;
|
||||
struct child_process process;
|
||||
struct strbuf err;
|
||||
void *data;
|
||||
} *children;
|
||||
struct parallel_child *children;
|
||||
/*
|
||||
* The struct pollfd is logically part of *children,
|
||||
* but the system call expects it as its own array.
|
||||
@@ -1509,7 +1526,7 @@ static void kill_children(const struct parallel_processes *pp,
|
||||
int signo)
|
||||
{
|
||||
for (size_t i = 0; i < opts->processes; i++)
|
||||
if (pp->children[i].state == GIT_CP_WORKING)
|
||||
if (child_is_working(&pp->children[i]))
|
||||
kill(pp->children[i].process.pid, signo);
|
||||
}
|
||||
|
||||
@@ -1578,7 +1595,10 @@ static void pp_cleanup(struct parallel_processes *pp,
|
||||
* When get_next_task added messages to the buffer in its last
|
||||
* iteration, the buffered output is non empty.
|
||||
*/
|
||||
strbuf_write(&pp->buffered_output, stderr);
|
||||
if (opts->consume_output)
|
||||
opts->consume_output(&pp->buffered_output, opts->data);
|
||||
else
|
||||
strbuf_write(&pp->buffered_output, stderr);
|
||||
strbuf_release(&pp->buffered_output);
|
||||
|
||||
sigchain_pop_common();
|
||||
@@ -1652,6 +1672,44 @@ static int pp_start_one(struct parallel_processes *pp,
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void pp_buffer_stdin(struct parallel_processes *pp,
|
||||
const struct run_process_parallel_opts *opts)
|
||||
{
|
||||
/* Buffer stdin for each pipe. */
|
||||
for (size_t i = 0; i < opts->processes; i++) {
|
||||
struct child_process *proc = &pp->children[i].process;
|
||||
int ret;
|
||||
|
||||
if (!child_is_receiving_input(&pp->children[i]))
|
||||
continue;
|
||||
|
||||
/*
|
||||
* child input is provided via path_to_stdin when the feed_pipe cb is
|
||||
* missing, so we just signal an EOF.
|
||||
*/
|
||||
if (!opts->feed_pipe) {
|
||||
close(proc->in);
|
||||
proc->in = 0;
|
||||
continue;
|
||||
}
|
||||
|
||||
/**
|
||||
* Feed the pipe:
|
||||
* ret < 0 means error
|
||||
* ret == 0 means there is more data to be fed
|
||||
* ret > 0 means feeding finished
|
||||
*/
|
||||
ret = opts->feed_pipe(proc->in, opts->data, pp->children[i].data);
|
||||
if (ret < 0)
|
||||
die_errno("feed_pipe");
|
||||
|
||||
if (ret) {
|
||||
close(proc->in);
|
||||
proc->in = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static void pp_buffer_stderr(struct parallel_processes *pp,
|
||||
const struct run_process_parallel_opts *opts,
|
||||
int output_timeout)
|
||||
@@ -1665,7 +1723,7 @@ static void pp_buffer_stderr(struct parallel_processes *pp,
|
||||
|
||||
/* Buffer output from all pipes. */
|
||||
for (size_t i = 0; i < opts->processes; i++) {
|
||||
if (pp->children[i].state == GIT_CP_WORKING &&
|
||||
if (child_is_working(&pp->children[i]) &&
|
||||
pp->pfd[i].revents & (POLLIN | POLLHUP)) {
|
||||
int n = strbuf_read_once(&pp->children[i].err,
|
||||
pp->children[i].process.err, 0);
|
||||
@@ -1679,13 +1737,17 @@ static void pp_buffer_stderr(struct parallel_processes *pp,
|
||||
}
|
||||
}
|
||||
|
||||
static void pp_output(const struct parallel_processes *pp)
|
||||
static void pp_output(const struct parallel_processes *pp,
|
||||
const struct run_process_parallel_opts *opts)
|
||||
{
|
||||
size_t i = pp->output_owner;
|
||||
|
||||
if (pp->children[i].state == GIT_CP_WORKING &&
|
||||
if (child_is_working(&pp->children[i]) &&
|
||||
pp->children[i].err.len) {
|
||||
strbuf_write(&pp->children[i].err, stderr);
|
||||
if (opts->consume_output)
|
||||
opts->consume_output(&pp->children[i].err, opts->data);
|
||||
else
|
||||
strbuf_write(&pp->children[i].err, stderr);
|
||||
strbuf_reset(&pp->children[i].err);
|
||||
}
|
||||
}
|
||||
@@ -1722,6 +1784,7 @@ static int pp_collect_finished(struct parallel_processes *pp,
|
||||
pp->children[i].state = GIT_CP_FREE;
|
||||
if (pp->pfd)
|
||||
pp->pfd[i].fd = -1;
|
||||
pp->children[i].process.in = 0;
|
||||
child_process_init(&pp->children[i].process);
|
||||
|
||||
if (opts->ungroup) {
|
||||
@@ -1732,11 +1795,15 @@ static int pp_collect_finished(struct parallel_processes *pp,
|
||||
} else {
|
||||
const size_t n = opts->processes;
|
||||
|
||||
strbuf_write(&pp->children[i].err, stderr);
|
||||
/* Output errors, then all other finished child processes */
|
||||
if (opts->consume_output) {
|
||||
opts->consume_output(&pp->children[i].err, opts->data);
|
||||
opts->consume_output(&pp->buffered_output, opts->data);
|
||||
} else {
|
||||
strbuf_write(&pp->children[i].err, stderr);
|
||||
strbuf_write(&pp->buffered_output, stderr);
|
||||
}
|
||||
strbuf_reset(&pp->children[i].err);
|
||||
|
||||
/* Output all other finished child processes */
|
||||
strbuf_write(&pp->buffered_output, stderr);
|
||||
strbuf_reset(&pp->buffered_output);
|
||||
|
||||
/*
|
||||
@@ -1748,7 +1815,7 @@ static int pp_collect_finished(struct parallel_processes *pp,
|
||||
* running process time.
|
||||
*/
|
||||
for (i = 0; i < n; i++)
|
||||
if (pp->children[(pp->output_owner + i) % n].state == GIT_CP_WORKING)
|
||||
if (child_is_working(&pp->children[(pp->output_owner + i) % n]))
|
||||
break;
|
||||
pp->output_owner = (pp->output_owner + i) % n;
|
||||
}
|
||||
@@ -1756,6 +1823,27 @@ static int pp_collect_finished(struct parallel_processes *pp,
|
||||
return result;
|
||||
}
|
||||
|
||||
static void pp_handle_child_IO(struct parallel_processes *pp,
|
||||
const struct run_process_parallel_opts *opts,
|
||||
int output_timeout)
|
||||
{
|
||||
/*
|
||||
* First push input, if any (it might no-op), to child tasks to avoid them blocking
|
||||
* after input. This also prevents deadlocks when ungrouping below, if a child blocks
|
||||
* while the parent also waits for them to finish.
|
||||
*/
|
||||
pp_buffer_stdin(pp, opts);
|
||||
|
||||
if (opts->ungroup) {
|
||||
for (size_t i = 0; i < opts->processes; i++)
|
||||
if (child_is_ready_for_cleanup(&pp->children[i]))
|
||||
pp->children[i].state = GIT_CP_WAIT_CLEANUP;
|
||||
} else {
|
||||
pp_buffer_stderr(pp, opts, output_timeout);
|
||||
pp_output(pp, opts);
|
||||
}
|
||||
}
|
||||
|
||||
void run_processes_parallel(const struct run_process_parallel_opts *opts)
|
||||
{
|
||||
int i, code;
|
||||
@@ -1775,6 +1863,16 @@ void run_processes_parallel(const struct run_process_parallel_opts *opts)
|
||||
"max:%"PRIuMAX,
|
||||
(uintmax_t)opts->processes);
|
||||
|
||||
if (opts->ungroup && opts->consume_output)
|
||||
BUG("ungroup and reading output are mutualy exclusive");
|
||||
|
||||
/*
|
||||
* Child tasks might receive input via stdin, terminating early (or not), so
|
||||
* ignore the default SIGPIPE which gets handled by each feed_pipe_fn which
|
||||
* actually writes the data to children stdin fds.
|
||||
*/
|
||||
sigchain_push(SIGPIPE, SIG_IGN);
|
||||
|
||||
pp_init(&pp, opts, &pp_sig);
|
||||
while (1) {
|
||||
for (i = 0;
|
||||
@@ -1792,13 +1890,7 @@ void run_processes_parallel(const struct run_process_parallel_opts *opts)
|
||||
}
|
||||
if (!pp.nr_processes)
|
||||
break;
|
||||
if (opts->ungroup) {
|
||||
for (size_t i = 0; i < opts->processes; i++)
|
||||
pp.children[i].state = GIT_CP_WAIT_CLEANUP;
|
||||
} else {
|
||||
pp_buffer_stderr(&pp, opts, output_timeout);
|
||||
pp_output(&pp);
|
||||
}
|
||||
pp_handle_child_IO(&pp, opts, output_timeout);
|
||||
code = pp_collect_finished(&pp, opts);
|
||||
if (code) {
|
||||
pp.shutdown = 1;
|
||||
@@ -1809,6 +1901,8 @@ void run_processes_parallel(const struct run_process_parallel_opts *opts)
|
||||
|
||||
pp_cleanup(&pp, opts);
|
||||
|
||||
sigchain_pop(SIGPIPE);
|
||||
|
||||
if (do_trace2)
|
||||
trace2_region_leave(tr2_category, tr2_label, NULL);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user